Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion keepercommander/discovery_common/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.1.14'
__version__ = '1.1.15'
6 changes: 4 additions & 2 deletions keepercommander/discovery_common/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,9 @@ def to_dot(self, graph_format: str = "svg", show_hex_uid: bool = False,
head_uids.append(edge.head_uid)

def _render_edge(e):

# _render_edge is invoked immediately within the loop below, so capturing the
# loop variables v/content is safe.
# pylint: disable=cell-var-from-loop
edge_color = "grey"
style = "solid"

Expand Down Expand Up @@ -439,7 +441,7 @@ def _render_edge(e):
tooltip=edge_tip)

for head_uid in head_uids:
version, edge = v.get_highest_edge_version(head_uid)
_, edge = v.get_highest_edge_version(head_uid)
_render_edge(edge)

data_edge = v.get_data()
Expand Down
6 changes: 3 additions & 3 deletions keepercommander/discovery_common/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def _directory_exists(self, domain: str, directory_info_func: Callable, context:
for provider_vertex in provider_vertices:
content = DiscoveryObject.get_discovery_object(provider_vertex)
found = False
for domain in domains:
for domain in domains: # pylint: disable=redefined-argument-from-local
for provider_domain in content.item.info.get("domains", []):
if domain.lower() in provider_domain.lower():
found = True
Expand Down Expand Up @@ -453,7 +453,7 @@ def _find_directory_user(self,

found_vertex = None
if find_user is not None:
user, domain = split_user_and_domain(find_user)
user, _ = split_user_and_domain(find_user)
if user_content.item.user.lower() == user.lower():
found_vertex = user_vertex
elif user_content.item.user.lower() == find_user.lower():
Expand Down Expand Up @@ -1185,7 +1185,7 @@ def _process_admin_user(self,
# We need to populate the id and uid of the content, now that we have data in the content.
self.populate_admin_content_ids(admin_content, resource_vertex)

ad_user, ad_domain = split_user_and_domain(admin_content.item.user)
_, ad_domain = split_user_and_domain(admin_content.item.user)
if ad_domain is not None and admin_content.item.source == LOCAL_USER:
self.logger.debug("The admin is an directory user, but the source is set to a local user")

Expand Down
2 changes: 1 addition & 1 deletion keepercommander/discovery_common/rm_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ class RmOracleUserAddMeta(RmMetaBase):


class RmOracleRoleAddMeta(RmMetaBase):
not_identified: bool = False,
not_identified: bool = False
identified_by_password: Optional[str] = None
identified_using: Optional[str] = None
identified_externally: bool = False
Expand Down
2 changes: 1 addition & 1 deletion keepercommander/discovery_common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ def has_dn(self, user) -> bool:

return False


class PromptResult(BaseModel):

# "add" and "ignore" are the only action
Expand Down
2 changes: 1 addition & 1 deletion keepercommander/discovery_common/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def _check(vertex: DAGVertex, indent: int = 0):
# Get all the child vertices, allow self ref, so we can delete it if not already deleted.
for next_vertex in vertex.has_vertices(allow_self_ref=True):
if next_vertex.uid == vertex.uid:
version, edge = next_vertex.get_highest_edge_version(vertex.uid)
_, edge = next_vertex.get_highest_edge_version(vertex.uid)
if edge.edge_type == EdgeType.DELETION:
continue
else:
Expand Down
2 changes: 1 addition & 1 deletion keepercommander/keeper_dag/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.1.10' # pragma: no cover
__version__ = '1.1.11' # pragma: no cover
151 changes: 144 additions & 7 deletions keepercommander/keeper_dag/connection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class ConnectionBase:

ADD_DATA = "/add_data"
SYNC = "/sync"
MULTI_SYNC = "/multi_sync"
GET_LEAFS = "/get_leafs"

TIMEOUT = 30

Expand Down Expand Up @@ -59,7 +61,7 @@ def __init__(self,
if self.log_transactions_dir is None:
self.log_transactions_dir = "."

if self.log_transactions is True:
if self.log_transactions:
self.logger.info("keeper-dag transaction logging is ENABLED; "
f"write directory at {self.log_transactions_dir}")

Expand Down Expand Up @@ -99,8 +101,11 @@ def get_encrypted_payload_data(encrypted_payload_data: bytes) -> bytes:

@staticmethod
def get_router_host(server_hostname: str):
if server_hostname and '://' in server_hostname: # accept URL-formatted inputs
# Defensive: accept URL-formatted inputs (e.g. "https://keepersecurity.com")
# and extract the bare hostname before the GovCloud subdomain check.
if server_hostname and '://' in server_hostname:
server_hostname = server_hostname.split('://', 1)[1].split('/', 1)[0]

# Only PROD GovCloud strips the subdomain (workaround for prod infrastructure).
# DEV/QA GOV (govcloud.dev.keepersecurity.us, govcloud.qa.keepersecurity.us) keep govcloud.
if server_hostname == 'govcloud.keepersecurity.us':
Expand Down Expand Up @@ -222,10 +227,10 @@ def sync(self,
sync_query: Union[SyncQuery, gs_pb2.GraphSyncQuery],
graph_id: Optional[int] = None,
endpoint: Optional[str] = None,
agent: Optional[str] = None) -> bytes:
agent: Optional[str] = None) -> Optional[bytes]:

if agent is None:
f"keeper-dag/{__version__}"
agent = f"keeper-dag/{__version__}"

endpoint = self._endpoint(ConnectionBase.SYNC, endpoint)
self.logger.debug(f"endpoint {endpoint}")
Expand All @@ -238,13 +243,13 @@ def sync(self,
headers=headers,
payload=sync_query)

if self.use_read_protobuf:
if payload is not None and self.use_read_protobuf:
try:
self.logger.debug(f"decrypt payload with transmission key {kotlin_bytes(self.transmission_key)}")
payload = self.get_encrypted_payload_data(payload)
payload = decrypt_aes(payload, self.transmission_key)
except Exception as err:
self.logger.error(f"Could not decrypt protobuf graph sync response: {type(err)}, {err}")
self.logger.error(f"Could not decrypt protobuf graph sync response: {err}")

self.write_transaction_log(
graph_id=graph_id,
Expand Down Expand Up @@ -290,7 +295,7 @@ def add_data(self,
agent: Optional[str] = None):

if agent is None:
f"keeper-dag/{__version__}"
agent = f"keeper-dag/{__version__}"

endpoint = self._endpoint(ConnectionBase.ADD_DATA, endpoint)
self.logger.debug(f"endpoint {endpoint}")
Expand Down Expand Up @@ -331,3 +336,135 @@ def add_data(self,
error=str(err)
)
raise DAGException(f"Could not create a new DAG structure: {err}")

def multi_sync(self,
multi_query: Union[BaseModel, gs_pb2.GraphSyncMultiQuery],
graph_id: Optional[int] = None,
endpoint: Optional[str] = None,
agent: Optional[str] = None) -> bytes:
"""POST a GraphSyncMultiQuery to <endpoint>/multi_sync.

Used by per-graph reads: after `get_leafs` discovers the stream refs
rooted at the graph's origin, `multi_sync` fetches sync data for all
those streams in one round-trip. Mirrors `sync()` in transport shape
(encrypt/headers, decrypt-on-read, transaction log, error handling).
"""
if agent is None:
agent = f"keeper-dag/{__version__}"

endpoint = self._endpoint(ConnectionBase.MULTI_SYNC, endpoint)
self.logger.debug(f"endpoint {endpoint}")

try:
multi_query, headers = self.payload_and_headers(multi_query)
payload = self.rest_call_to_router(http_method="POST",
endpoint=endpoint,
agent=agent,
headers=headers,
payload=multi_query)

if self.use_read_protobuf:
try:
self.logger.debug(f"decrypt payload with transmission key {kotlin_bytes(self.transmission_key)}")
payload = self.get_encrypted_payload_data(payload)
payload = decrypt_aes(payload, self.transmission_key)
except Exception as err:
self.logger.error(f"Could not decrypt protobuf graph multi-sync response: {type(err)}, {err}")

self.write_transaction_log(
graph_id=graph_id,
request=multi_query,
response=payload,
agent=agent,
endpoint=endpoint,
error=None
)

return payload

except DAGConnectionException as err:
self.write_transaction_log(
graph_id=graph_id,
request=multi_query,
response=None,
agent=agent,
endpoint=endpoint,
error=str(err)
)
raise err
except Exception as err:
self.write_transaction_log(
graph_id=graph_id,
request=multi_query,
response=None,
agent=agent,
endpoint=endpoint,
error=str(err)
)
raise DAGException(f"Could not load the DAG structure (multi_sync): {err}")

def get_leafs(self,
leafs_query: Union[BaseModel, gs_pb2.GraphSyncLeafsQuery],
graph_id: Optional[int] = None,
endpoint: Optional[str] = None,
agent: Optional[str] = None) -> bytes:
"""POST a GraphSyncLeafsQuery to <endpoint>/get_leafs.

Returns the serialized GraphSyncRefsResult — the list of stream refs
rooted at the queried vertices. Used as the discovery step before a
`multi_sync` call (per the per-graph read pattern that Web Vault
already uses).
"""
if agent is None:
agent = f"keeper-dag/{__version__}"

endpoint = self._endpoint(ConnectionBase.GET_LEAFS, endpoint)
self.logger.debug(f"endpoint {endpoint}")

try:
leafs_query, headers = self.payload_and_headers(leafs_query)
payload = self.rest_call_to_router(http_method="POST",
endpoint=endpoint,
agent=agent,
headers=headers,
payload=leafs_query)

if self.use_read_protobuf:
try:
self.logger.debug(f"decrypt payload with transmission key {kotlin_bytes(self.transmission_key)}")
payload = self.get_encrypted_payload_data(payload)
payload = decrypt_aes(payload, self.transmission_key)
except Exception as err:
self.logger.error(f"Could not decrypt protobuf get_leafs response: {type(err)}, {err}")

self.write_transaction_log(
graph_id=graph_id,
request=leafs_query,
response=payload,
agent=agent,
endpoint=endpoint,
error=None
)

return payload

except DAGConnectionException as err:
self.write_transaction_log(
graph_id=graph_id,
request=leafs_query,
response=None,
agent=agent,
endpoint=endpoint,
error=str(err)
)
raise err
except Exception as err:
self.write_transaction_log(
graph_id=graph_id,
request=leafs_query,
response=None,
agent=agent,
endpoint=endpoint,
error=str(err)
)
raise DAGException(f"Could not get leafs: {err}")
1 change: 1 addition & 0 deletions keepercommander/keeper_dag/connection/ksm.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def authenticate(self,

attempt = 0
while True:
err_msg = "no error message"
try:
attempt += 1
response = requests.get(url,
Expand Down
35 changes: 35 additions & 0 deletions keepercommander/keeper_dag/connection/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,41 @@ def sync(self,
hasMore=has_more
).model_dump_json().encode()

def multi_sync(self,
multi_query: Union[gs_pb2.GraphSyncMultiQuery, Any],
graph_id: Optional[int] = None,
endpoint: Optional[str] = None,
agent: Optional[str] = None) -> bytes:
"""Local mirror of the network per-graph ``multi_sync``.

The local SQLite store has no per-graph URL routing, so each sub-query
in the ``GraphSyncMultiQuery`` is run through this connection's own
``sync()`` — identical stream / sync-point / graph-id semantics as the
single-stream read and the save path — and the per-stream results are
assembled into the same multi-stream envelope the network endpoint
returns: ``GraphSyncMultiResult`` (protobuf) or ``{"results": [...]}``
(JSON).
"""
is_protobuf = isinstance(multi_query, gs_pb2.GraphSyncMultiQuery)
queries = list(multi_query.queries)

if is_protobuf:
multi = gs_pb2.GraphSyncMultiResult()
for sub_query in queries:
single = self.sync(sub_query, graph_id=graph_id,
endpoint=endpoint, agent=agent)
result = gs_pb2.GraphSyncResult()
result.ParseFromString(single)
multi.results.add().CopyFrom(result)
return multi.SerializeToString()

results = []
for sub_query in queries:
single = self.sync(sub_query, graph_id=graph_id,
endpoint=endpoint, agent=agent)
results.append(json.loads(single))
return json.dumps({"results": results}).encode()

def debug_dump(self) -> str:

ret = ""
Expand Down
Loading
Loading