Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 2 additions & 71 deletions keepercommander/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@


import argparse
import certifi
import json
import logging
import os
import re
import shlex
import sys
import ssl
import platform

from pathlib import Path
from typing import Optional
Expand Down Expand Up @@ -178,69 +175,6 @@ def handle_exceptions(exc_type, exc_value, exc_traceback):
sys.exit(-1)


def get_ssl_cert_file():
"""Get SSL certificate file path, preferring system CA store for corporate environments like Zscaler"""

# Allow user to override via environment variable
user_cert_file = os.getenv('KEEPER_SSL_CERT_FILE')
if user_cert_file:
if user_cert_file.lower() == 'system':
# User explicitly wants system certs
pass # Continue with system detection below
elif user_cert_file.lower() == 'certifi':
# User explicitly wants certifi
return certifi.where()
elif user_cert_file.lower() == 'none' or user_cert_file.lower() == 'false':
# User wants to disable SSL verification (not recommended)
return None
elif os.path.exists(user_cert_file):
# User provided specific cert file
return user_cert_file
else:
logging.warning(f"SSL cert file specified in KEEPER_SSL_CERT_FILE not found: {user_cert_file}")

# Try to use system CA store first for corporate environments
try:
# On macOS, try Homebrew certificates first (better for corporate environments like Zscaler)
if platform.system() == 'Darwin':
system_ca_paths = [
'/opt/homebrew/etc/ca-certificates/cert.pem', # Homebrew CA bundle (best for Zscaler)
'/usr/local/etc/ssl/cert.pem', # Homebrew SSL (older location)
'/etc/ssl/cert.pem', # macOS system CA bundle
]
for ca_path in system_ca_paths:
if os.path.exists(ca_path):
return ca_path

# On Linux/Unix systems
elif platform.system() == 'Linux':
system_ca_paths = [
'/etc/ssl/certs/ca-certificates.crt', # Debian/Ubuntu
'/etc/pki/tls/certs/ca-bundle.crt', # RHEL/CentOS
'/etc/ssl/ca-bundle.pem', # OpenSUSE
'/etc/ssl/cert.pem', # Generic
]
for ca_path in system_ca_paths:
if os.path.exists(ca_path):
return ca_path

# Try to get default SSL context locations
try:
default_locations = ssl.get_default_verify_paths()
if default_locations.cafile and os.path.exists(default_locations.cafile):
return default_locations.cafile
if default_locations.capath and os.path.exists(default_locations.capath):
return default_locations.capath
except:
pass

except Exception:
pass

# Fall back to certifi if system CA not available
return certifi.where()


def main(from_package=False):
if sys.platform == 'win32':
try:
Expand All @@ -253,15 +187,12 @@ def main(from_package=False):
if logger:
logger.name = 'keepercommander'

# Use system CA certificates when available (supports Zscaler), fallback to certifi
ssl_cert_file = get_ssl_cert_file()
ssl_cert_file = utils.get_ssl_cert_file()
if ssl_cert_file:
os.environ['SSL_CERT_FILE'] = ssl_cert_file
else:
# User explicitly disabled SSL verification
logging.warning("Warning: SSL certificate verification has been disabled. This is not recommended for production use.")
if 'SSL_CERT_FILE' in os.environ:
del os.environ['SSL_CERT_FILE']
os.environ.pop('SSL_CERT_FILE', None)

errno = 0

Expand Down
134 changes: 119 additions & 15 deletions keepercommander/commands/_cloud_import_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import json
import logging
import os
import re
from typing import Callable, Dict, List, Optional, Tuple

Expand Down Expand Up @@ -272,7 +273,8 @@ def _validate_folder(params, folder_uid, command_name):
'Use "list-sf" to find the correct shared folder UID, '
'or run "sync-down" if the folder was recently shared.'
)
if not isinstance(folder, (SharedFolderNode, SharedFolderFolderNode)):
if not (isinstance(folder, (SharedFolderNode, SharedFolderFolderNode)) or
(hasattr(folder, 'type') and folder.type == 'nested_share_folder')):
raise CommandError(
command_name,
f'"{folder_uid}" is a personal folder. '
Expand All @@ -290,8 +292,12 @@ def _run_import(self, params, secrets, folder_uid, record_type, filter_name,
# type: (KeeperParams, list, str, str, Optional[str], Optional[str], Optional[str], Optional[str], List[Tuple[str, str]], bool, str, Optional[Callable]) -> None
"""
Iterate *secrets* (list of dicts with at minimum 'name' and 'tags'),
apply all filters, then create Keeper records via batched vault/records_add
calls (up to BATCH_SIZE records per request).
apply all filters, then create Keeper records via batched API calls
(up to BATCH_SIZE records per request).

When *folder_uid* belongs to a nested_share_folder the records are
created via ``vault/records/v3/add``; otherwise the legacy
``vault/records_add`` endpoint is used.

*value_fetcher*, when provided, is called as ``value_fetcher(name) -> str``
for each secret that passes filters and is not a dry-run. This enables
Expand Down Expand Up @@ -320,10 +326,9 @@ def _run_import(self, params, secrets, folder_uid, record_type, filter_name,
if dry_run:
return

# Phase 2 – fetch values (if needed) and build TypedRecord + protobuf
# objects without touching the Keeper API yet.
# Phase 2 – fetch values (if needed) and build TypedRecord objects.
skipped = 0
pending = [] # type: List[Tuple[vault.TypedRecord, record_pb2.RecordAdd]]
typed_records = [] # type: List[vault.TypedRecord]

for item in matched:
name = item['name']
Expand All @@ -339,17 +344,118 @@ def _run_import(self, params, secrets, folder_uid, record_type, filter_name,
value = item.get('value', '')

fields = self._parse_secret_string(value)
record = self._build_keeper_record(name, fields, record_type)
pb = record_management.add_record_to_folder(params, record, folder_uid, pb_only=True)
if pb is not None:
pending.append((record, pb))
typed_records.append(self._build_keeper_record(name, fields, record_type))

if not pending:
if not typed_records:
print(f'{command_name}: 0 record(s) created, {skipped} skipped.')
return

# Phase 3 – send in batches of up to BATCH_SIZE to vault/records_add.
# Phase 3 – send in batches, routing to the appropriate API endpoint.
is_nsf = folder_uid in getattr(params, 'nested_share_folders', {})
if is_nsf:
created, nsf_skipped = self._send_nsf_batches(
params, typed_records, folder_uid, command_name)
skipped += nsf_skipped
else:
created, legacy_skipped = self._send_legacy_batches(
params, typed_records, folder_uid, command_name)
skipped += legacy_skipped

if created:
params.sync_data = True
print(f'{command_name}: {created} record(s) created, {skipped} skipped.')

# ------------------------------------------------------------------
# Phase-3 dispatch helpers
# ------------------------------------------------------------------

@staticmethod
def _typed_record_to_data(record):
# type: (vault.TypedRecord) -> dict
"""Serialise a TypedRecord to the dict format expected by the v3 record API."""
data = {
'type': record.type_name,
'title': record.title,
'fields': [
{'type': f.type, 'label': f.label or '', 'value': list(f.value)}
for f in record.fields
],
'custom': [
{'type': f.type, 'label': f.label or '', 'value': list(f.value)}
for f in record.custom
],
}
if record.notes:
data['notes'] = record.notes
return data

@staticmethod
def _send_nsf_batches(params, typed_records, folder_uid, command_name):
# type: (KeeperParams, List[vault.TypedRecord], str, str) -> Tuple[int, int]
"""Send *typed_records* to a nested_share_folder via ``vault/records/v3/add``."""
from ..nested_share_folder.common import get_folder_key
from ..nested_share_folder.record_api import create_record_data_v3, record_add_v3

folder_key = get_folder_key(params, folder_uid, raise_on_missing=True)

created = 0
skipped = 0

for batch_start in range(0, len(typed_records), BATCH_SIZE):
batch = typed_records[batch_start:batch_start + BATCH_SIZE]
batch_num = batch_start // BATCH_SIZE + 1
logging.info('%s: sending batch %d (%d record(s))', command_name, batch_num, len(batch))

uid_to_title = {}
adds = []
for record in batch:
uid = utils.generate_uid()
rk = os.urandom(32)
data = CloudImportMixin._typed_record_to_data(record)
ra = create_record_data_v3(
record_uid=uid, record_key=rk, data=data,
folder_uid=folder_uid, folder_key=folder_key,
data_key=params.data_key,
client_modified_time=utils.current_milli_time(),
)
adds.append(ra)
uid_to_title[uid] = record.title

try:
rs = record_add_v3(params, adds)
except Exception as exc:
logging.warning('%s: batch %d failed: %s', command_name, batch_num, exc)
skipped += len(batch)
continue

for r in rs.records:
uid = utils.base64_url_encode(r.record_uid)
title = uid_to_title.get(uid, uid)
if r.status == record_pb2.RS_SUCCESS:
logging.debug('%s: created record "%s"', command_name, title)
created += 1
else:
logging.warning('%s: failed to create record "%s": status=%s',
command_name, title, r.status)
skipped += 1

return created, skipped

@staticmethod
def _send_legacy_batches(params, typed_records, folder_uid, command_name):
# type: (KeeperParams, List[vault.TypedRecord], str, str) -> Tuple[int, int]
"""Send *typed_records* to a regular shared folder via ``vault/records_add``."""
created = 0
skipped = 0

pending = [] # type: List[Tuple[vault.TypedRecord, record_pb2.RecordAdd]]
for record in typed_records:
pb = record_management.add_record_to_folder(params, record, folder_uid, pb_only=True)
if pb is not None:
pending.append((record, pb))

if not pending:
return 0, 0

for batch_start in range(0, len(pending), BATCH_SIZE):
batch = pending[batch_start:batch_start + BATCH_SIZE]
Expand Down Expand Up @@ -385,6 +491,4 @@ def _run_import(self, params, secrets, folder_uid, record_type, filter_name,
command_name, record.title, rs_rec.status)
skipped += 1

if created:
params.sync_data = True
print(f'{command_name}: {created} record(s) created, {skipped} skipped.')
return created, skipped
Loading
Loading