diff --git a/keepercommander/discovery_common/__version__.py b/keepercommander/discovery_common/__version__.py index 87bb711d7..7025f5029 100644 --- a/keepercommander/discovery_common/__version__.py +++ b/keepercommander/discovery_common/__version__.py @@ -1 +1 @@ -__version__ = '1.1.14' +__version__ = '1.1.15' diff --git a/keepercommander/discovery_common/infrastructure.py b/keepercommander/discovery_common/infrastructure.py index 744c7951b..ee2675955 100644 --- a/keepercommander/discovery_common/infrastructure.py +++ b/keepercommander/discovery_common/infrastructure.py @@ -394,7 +394,9 @@ def to_dot(self, graph_format: str = "svg", show_hex_uid: bool = False, head_uids.append(edge.head_uid) def _render_edge(e): - + # _render_edge is invoked immediately within the loop below, so capturing the + # loop variables v/content is safe. + # pylint: disable=cell-var-from-loop edge_color = "grey" style = "solid" @@ -439,7 +441,7 @@ def _render_edge(e): tooltip=edge_tip) for head_uid in head_uids: - version, edge = v.get_highest_edge_version(head_uid) + _, edge = v.get_highest_edge_version(head_uid) _render_edge(edge) data_edge = v.get_data() diff --git a/keepercommander/discovery_common/process.py b/keepercommander/discovery_common/process.py index 836243c41..a99ee2b06 100644 --- a/keepercommander/discovery_common/process.py +++ b/keepercommander/discovery_common/process.py @@ -377,7 +377,7 @@ def _directory_exists(self, domain: str, directory_info_func: Callable, context: for provider_vertex in provider_vertices: content = DiscoveryObject.get_discovery_object(provider_vertex) found = False - for domain in domains: + for domain in domains: # pylint: disable=redefined-argument-from-local for provider_domain in content.item.info.get("domains", []): if domain.lower() in provider_domain.lower(): found = True @@ -453,7 +453,7 @@ def _find_directory_user(self, found_vertex = None if find_user is not None: - user, domain = split_user_and_domain(find_user) + user, _ = split_user_and_domain(find_user) if user_content.item.user.lower() == user.lower(): found_vertex = user_vertex elif user_content.item.user.lower() == find_user.lower(): @@ -1185,7 +1185,7 @@ def _process_admin_user(self, # We need to populate the id and uid of the content, now that we have data in the content. self.populate_admin_content_ids(admin_content, resource_vertex) - ad_user, ad_domain = split_user_and_domain(admin_content.item.user) + _, ad_domain = split_user_and_domain(admin_content.item.user) if ad_domain is not None and admin_content.item.source == LOCAL_USER: self.logger.debug("The admin is an directory user, but the source is set to a local user") diff --git a/keepercommander/discovery_common/rm_types.py b/keepercommander/discovery_common/rm_types.py index a647ca933..1aa513d88 100644 --- a/keepercommander/discovery_common/rm_types.py +++ b/keepercommander/discovery_common/rm_types.py @@ -465,7 +465,7 @@ class RmOracleUserAddMeta(RmMetaBase): class RmOracleRoleAddMeta(RmMetaBase): - not_identified: bool = False, + not_identified: bool = False identified_by_password: Optional[str] = None identified_using: Optional[str] = None identified_externally: bool = False diff --git a/keepercommander/discovery_common/types.py b/keepercommander/discovery_common/types.py index 5a79a9115..6942e87eb 100644 --- a/keepercommander/discovery_common/types.py +++ b/keepercommander/discovery_common/types.py @@ -736,7 +736,7 @@ def has_dn(self, user) -> bool: return False - + class PromptResult(BaseModel): # "add" and "ignore" are the only action diff --git a/keepercommander/discovery_common/verify.py b/keepercommander/discovery_common/verify.py index 5e90ab5a7..dd6e6a3fd 100644 --- a/keepercommander/discovery_common/verify.py +++ b/keepercommander/discovery_common/verify.py @@ -403,7 +403,7 @@ def _check(vertex: DAGVertex, indent: int = 0): # Get all the child vertices, allow self ref, so we can delete it if not already deleted. for next_vertex in vertex.has_vertices(allow_self_ref=True): if next_vertex.uid == vertex.uid: - version, edge = next_vertex.get_highest_edge_version(vertex.uid) + _, edge = next_vertex.get_highest_edge_version(vertex.uid) if edge.edge_type == EdgeType.DELETION: continue else: diff --git a/keepercommander/keeper_dag/__version__.py b/keepercommander/keeper_dag/__version__.py index 874042f3b..ed7133b36 100644 --- a/keepercommander/keeper_dag/__version__.py +++ b/keepercommander/keeper_dag/__version__.py @@ -1 +1 @@ -__version__ = '1.1.10' # pragma: no cover +__version__ = '1.1.11' # pragma: no cover diff --git a/keepercommander/keeper_dag/connection/__init__.py b/keepercommander/keeper_dag/connection/__init__.py index 762acde53..24ef3ba66 100644 --- a/keepercommander/keeper_dag/connection/__init__.py +++ b/keepercommander/keeper_dag/connection/__init__.py @@ -31,6 +31,8 @@ class ConnectionBase: ADD_DATA = "/add_data" SYNC = "/sync" + MULTI_SYNC = "/multi_sync" + GET_LEAFS = "/get_leafs" TIMEOUT = 30 @@ -59,7 +61,7 @@ def __init__(self, if self.log_transactions_dir is None: self.log_transactions_dir = "." - if self.log_transactions is True: + if self.log_transactions: self.logger.info("keeper-dag transaction logging is ENABLED; " f"write directory at {self.log_transactions_dir}") @@ -99,8 +101,11 @@ def get_encrypted_payload_data(encrypted_payload_data: bytes) -> bytes: @staticmethod def get_router_host(server_hostname: str): - if server_hostname and '://' in server_hostname: # accept URL-formatted inputs + # Defensive: accept URL-formatted inputs (e.g. "https://keepersecurity.com") + # and extract the bare hostname before the GovCloud subdomain check. + if server_hostname and '://' in server_hostname: server_hostname = server_hostname.split('://', 1)[1].split('/', 1)[0] + # Only PROD GovCloud strips the subdomain (workaround for prod infrastructure). # DEV/QA GOV (govcloud.dev.keepersecurity.us, govcloud.qa.keepersecurity.us) keep govcloud. if server_hostname == 'govcloud.keepersecurity.us': @@ -222,10 +227,10 @@ def sync(self, sync_query: Union[SyncQuery, gs_pb2.GraphSyncQuery], graph_id: Optional[int] = None, endpoint: Optional[str] = None, - agent: Optional[str] = None) -> bytes: + agent: Optional[str] = None) -> Optional[bytes]: if agent is None: - f"keeper-dag/{__version__}" + agent = f"keeper-dag/{__version__}" endpoint = self._endpoint(ConnectionBase.SYNC, endpoint) self.logger.debug(f"endpoint {endpoint}") @@ -238,13 +243,13 @@ def sync(self, headers=headers, payload=sync_query) - if self.use_read_protobuf: + if payload is not None and self.use_read_protobuf: try: self.logger.debug(f"decrypt payload with transmission key {kotlin_bytes(self.transmission_key)}") payload = self.get_encrypted_payload_data(payload) payload = decrypt_aes(payload, self.transmission_key) except Exception as err: - self.logger.error(f"Could not decrypt protobuf graph sync response: {type(err)}, {err}") + self.logger.error(f"Could not decrypt protobuf graph sync response: {err}") self.write_transaction_log( graph_id=graph_id, @@ -290,7 +295,7 @@ def add_data(self, agent: Optional[str] = None): if agent is None: - f"keeper-dag/{__version__}" + agent = f"keeper-dag/{__version__}" endpoint = self._endpoint(ConnectionBase.ADD_DATA, endpoint) self.logger.debug(f"endpoint {endpoint}") @@ -331,3 +336,135 @@ def add_data(self, error=str(err) ) raise DAGException(f"Could not create a new DAG structure: {err}") + + def multi_sync(self, + multi_query: Union[BaseModel, gs_pb2.GraphSyncMultiQuery], + graph_id: Optional[int] = None, + endpoint: Optional[str] = None, + agent: Optional[str] = None) -> bytes: + """POST a GraphSyncMultiQuery to /multi_sync. + + Used by per-graph reads: after `get_leafs` discovers the stream refs + rooted at the graph's origin, `multi_sync` fetches sync data for all + those streams in one round-trip. Mirrors `sync()` in transport shape + (encrypt/headers, decrypt-on-read, transaction log, error handling). + """ + if agent is None: + agent = f"keeper-dag/{__version__}" + + endpoint = self._endpoint(ConnectionBase.MULTI_SYNC, endpoint) + self.logger.debug(f"endpoint {endpoint}") + + try: + multi_query, headers = self.payload_and_headers(multi_query) + payload = self.rest_call_to_router(http_method="POST", + endpoint=endpoint, + agent=agent, + headers=headers, + payload=multi_query) + + if self.use_read_protobuf: + try: + self.logger.debug(f"decrypt payload with transmission key {kotlin_bytes(self.transmission_key)}") + payload = self.get_encrypted_payload_data(payload) + payload = decrypt_aes(payload, self.transmission_key) + except Exception as err: + self.logger.error(f"Could not decrypt protobuf graph multi-sync response: {type(err)}, {err}") + + self.write_transaction_log( + graph_id=graph_id, + request=multi_query, + response=payload, + agent=agent, + endpoint=endpoint, + error=None + ) + + return payload + + except DAGConnectionException as err: + self.write_transaction_log( + graph_id=graph_id, + request=multi_query, + response=None, + agent=agent, + endpoint=endpoint, + error=str(err) + ) + raise err + except Exception as err: + self.write_transaction_log( + graph_id=graph_id, + request=multi_query, + response=None, + agent=agent, + endpoint=endpoint, + error=str(err) + ) + raise DAGException(f"Could not load the DAG structure (multi_sync): {err}") + + def get_leafs(self, + leafs_query: Union[BaseModel, gs_pb2.GraphSyncLeafsQuery], + graph_id: Optional[int] = None, + endpoint: Optional[str] = None, + agent: Optional[str] = None) -> bytes: + """POST a GraphSyncLeafsQuery to /get_leafs. + + Returns the serialized GraphSyncRefsResult — the list of stream refs + rooted at the queried vertices. Used as the discovery step before a + `multi_sync` call (per the per-graph read pattern that Web Vault + already uses). + """ + if agent is None: + agent = f"keeper-dag/{__version__}" + + endpoint = self._endpoint(ConnectionBase.GET_LEAFS, endpoint) + self.logger.debug(f"endpoint {endpoint}") + + try: + leafs_query, headers = self.payload_and_headers(leafs_query) + payload = self.rest_call_to_router(http_method="POST", + endpoint=endpoint, + agent=agent, + headers=headers, + payload=leafs_query) + + if self.use_read_protobuf: + try: + self.logger.debug(f"decrypt payload with transmission key {kotlin_bytes(self.transmission_key)}") + payload = self.get_encrypted_payload_data(payload) + payload = decrypt_aes(payload, self.transmission_key) + except Exception as err: + self.logger.error(f"Could not decrypt protobuf get_leafs response: {type(err)}, {err}") + + self.write_transaction_log( + graph_id=graph_id, + request=leafs_query, + response=payload, + agent=agent, + endpoint=endpoint, + error=None + ) + + return payload + + except DAGConnectionException as err: + self.write_transaction_log( + graph_id=graph_id, + request=leafs_query, + response=None, + agent=agent, + endpoint=endpoint, + error=str(err) + ) + raise err + except Exception as err: + self.write_transaction_log( + graph_id=graph_id, + request=leafs_query, + response=None, + agent=agent, + endpoint=endpoint, + error=str(err) + ) + raise DAGException(f"Could not get leafs: {err}") diff --git a/keepercommander/keeper_dag/connection/ksm.py b/keepercommander/keeper_dag/connection/ksm.py index 58fa4fac0..e24b85592 100644 --- a/keepercommander/keeper_dag/connection/ksm.py +++ b/keepercommander/keeper_dag/connection/ksm.py @@ -165,6 +165,7 @@ def authenticate(self, attempt = 0 while True: + err_msg = "no error message" try: attempt += 1 response = requests.get(url, diff --git a/keepercommander/keeper_dag/connection/local.py b/keepercommander/keeper_dag/connection/local.py index 6a0dac42b..0567860d5 100644 --- a/keepercommander/keeper_dag/connection/local.py +++ b/keepercommander/keeper_dag/connection/local.py @@ -582,6 +582,41 @@ def sync(self, hasMore=has_more ).model_dump_json().encode() + def multi_sync(self, + multi_query: Union[gs_pb2.GraphSyncMultiQuery, Any], + graph_id: Optional[int] = None, + endpoint: Optional[str] = None, + agent: Optional[str] = None) -> bytes: + """Local mirror of the network per-graph ``multi_sync``. + + The local SQLite store has no per-graph URL routing, so each sub-query + in the ``GraphSyncMultiQuery`` is run through this connection's own + ``sync()`` — identical stream / sync-point / graph-id semantics as the + single-stream read and the save path — and the per-stream results are + assembled into the same multi-stream envelope the network endpoint + returns: ``GraphSyncMultiResult`` (protobuf) or ``{"results": [...]}`` + (JSON). + """ + is_protobuf = isinstance(multi_query, gs_pb2.GraphSyncMultiQuery) + queries = list(multi_query.queries) + + if is_protobuf: + multi = gs_pb2.GraphSyncMultiResult() + for sub_query in queries: + single = self.sync(sub_query, graph_id=graph_id, + endpoint=endpoint, agent=agent) + result = gs_pb2.GraphSyncResult() + result.ParseFromString(single) + multi.results.add().CopyFrom(result) + return multi.SerializeToString() + + results = [] + for sub_query in queries: + single = self.sync(sub_query, graph_id=graph_id, + endpoint=endpoint, agent=agent) + results.append(json.loads(single)) + return json.dumps({"results": results}).encode() + def debug_dump(self) -> str: ret = "" diff --git a/keepercommander/keeper_dag/dag.py b/keepercommander/keeper_dag/dag.py index 85a2d1296..a9066528e 100644 --- a/keepercommander/keeper_dag/dag.py +++ b/keepercommander/keeper_dag/dag.py @@ -15,7 +15,7 @@ import importlib import traceback import sys -from typing import Optional, Union, List, Any, Tuple, TYPE_CHECKING +from typing import Optional, Union, List, Any, Tuple, Dict, TYPE_CHECKING if TYPE_CHECKING: from .connection import ConnectionBase @@ -225,7 +225,7 @@ def close(self): try: # Safely get the root vertex without creating a new one if hasattr(self, '_vertices') and hasattr(self, 'uid') and hasattr(self, '_uid_lookup'): - if len(self._vertices) > 0 and self.uid in self._uid_lookup: + if len(self._vertices) > 0 and self.uid is not None and self.uid in self._uid_lookup: idx = self._uid_lookup[self.uid] if idx < len(self._vertices): root = self._vertices[idx] @@ -298,7 +298,7 @@ def debug_stacktrace(self): trc = 'Traceback (most recent call last):\n' msg = trc + ''.join(traceback.format_list(stack)) if exc is not None: - msg += ' ' + traceback.format_exc().lstrip(trc) + msg += ' ' + traceback.format_exc().removeprefix(trc) self.debug(msg) def __str__(self): @@ -310,6 +310,8 @@ def __str__(self): for v in self.all_vertices: ret += f" * {v.uid}, Keys: {v.keychain}, Active: {v.active}\n" for e in v.edges: + if e is None: + continue if e.edge_type == EdgeType.DATA: ret += " + has a DATA edge" if e.content is not None: @@ -504,11 +506,28 @@ def get_vertices_by_path_value(self, path: str, inc_deleted: bool = False) -> Li for vertex in vertices: for edge in vertex.edges: - if edge.path == path: + if edge is not None and edge.path == path: results.append(vertex) return results def _sync(self, sync_point: int = 0) -> Tuple[List[DAGData], int]: + """Dispatch to legacy single-stream sync or per-graph multi-stream sync. + + When `read_endpoint` is set, the server uses the per-graph URL pattern + (`/api/user/graph-sync//...`). That model splits the graph across + multiple streams, so a single-stream `sync` returns only a fragment. + Web Vault uses `get_leafs` -> `multi_sync` to read the full graph; + this client follows the same pattern. + + When only `graph_id` is set (legacy single-endpoint transport), the + single-stream sync remains correct. + """ + if self.read_endpoint is not None: + return self._sync_per_graph(sync_point) + return self._sync_legacy(sync_point) + + def _sync_legacy(self, sync_point: int = 0) -> Tuple[List[DAGData], int]: + """Single-stream sync against the legacy `/sync` endpoint.""" # The web service will send 500 items, if there is more the 'has_more' flag is set to True. has_more = True @@ -543,6 +562,61 @@ def _sync(self, sync_point: int = 0) -> Tuple[List[DAGData], int]: return all_data, sync_point + def _sync_per_graph(self, sync_point: int = 0) -> Tuple[List[DAGData], int]: + """Multi-stream read against the per-graph endpoints. + + The graph's data lives in a single stream keyed by the graph's origin + (e.g. the PAM Configuration record's UID for TunnelDAG). We multi_sync + that stream directly — no `get_leafs` discovery step needed for this + caller pattern. (`Connection.get_leafs` remains available for callers + that start from leaf vertices and need to discover stream roots.) + + Returns aggregated (data, max_sync_point) just like `_sync_legacy`. + """ + + origin_bytes = urlsafe_str_to_bytes(self.uid) + + # Stream keyed by the graph's origin (e.g. config_uid for PAM linking). + per_stream_sync_point: Dict[bytes, int] = {origin_bytes: sync_point} + all_data: List[DAGData] = [] + max_sync_point = sync_point + + while per_stream_sync_point: + stream_ids = list(per_stream_sync_point.keys()) + multi_query = self.read_struct_obj.multi_sync_query( + stream_ids=stream_ids, + origin=origin_bytes, + sync_point=sync_point, + ) + # Per-stream syncPoint adjustment so each stream advances + # independently across pagination rounds (proto variant only; + # JSON variant builds via SyncQuery which already carries syncPoint). + try: + for inner, sid in zip(multi_query.queries, stream_ids): + inner.syncPoint = per_stream_sync_point[sid] + except Exception: # pragma: no cover - JSON variant has no .queries + pass + + multi_response = self.conn.multi_sync( + multi_query=multi_query, + graph_id=self.graph_id, + endpoint=self.read_endpoint, + agent=self.agent, + ) + multi_results = self.read_struct_obj.get_multi_sync_result(multi_response) + + next_per_stream: Dict[bytes, int] = {} + for result in multi_results: + all_data += result.data + if result.syncPoint and result.syncPoint > max_sync_point: + max_sync_point = result.syncPoint + if result.hasMore and result.streamId is not None: + next_per_stream[bytes(result.streamId)] = result.syncPoint + + per_stream_sync_point = next_per_stream + + return all_data, max_sync_point + def _load(self, sync_point: int = 0): """ @@ -619,11 +693,12 @@ def _load(self, sync_point: int = 0): name=data.parentRef.name, vertex_type=RefType.GENERAL ) + # Get the head vertex, which will exist now. - head = self.get_vertex(head_uid) + head = self.get_vertex_by_uid(head_uid) if head is None or head == "": head = tail - head = self.get_vertex_by_uid(head_uid) + self.debug(f" * tail {tail_uid} belongs to {head_uid}, " f"edge type {edge_type}", level=3) diff --git a/keepercommander/keeper_dag/struct/__init__.py b/keepercommander/keeper_dag/struct/__init__.py index 53aa771da..ac87baa7b 100644 --- a/keepercommander/keeper_dag/struct/__init__.py +++ b/keepercommander/keeper_dag/struct/__init__.py @@ -54,3 +54,34 @@ def payload(origin_ref: Union[Ref, gs_pb2.GraphSyncRef], graph_id: Optional[int] = None) -> Union[DataPayload, gs_pb2.GraphSyncAddDataRequest]: pass + + # --- Per-graph multi-stream read transport --------------------------- + # Used by DAG._sync_per_graph when read_endpoint is set. Two-step pattern: + # 1. leafs_query(...) -> get_leafs_result(...) discovers stream refs. + # 2. multi_sync_query(...) -> get_multi_sync_result(...) fetches data. + + def leafs_query(self, + vertices: List[str]) -> Union[BaseModel, gs_pb2.GraphSyncLeafsQuery]: + """Build a GraphSyncLeafsQuery from a list of vertex UIDs (URL-safe str).""" + pass + + @staticmethod + def get_leafs_result(results: bytes) -> List[Ref]: + """Parse GraphSyncRefsResult bytes into a list of Ref objects. + Each Ref's `value` is the stream UID rooted under the queried vertex. + """ + pass + + def multi_sync_query(self, + stream_ids: List[bytes], + origin: bytes, + sync_point: int = 0) -> Union[BaseModel, gs_pb2.GraphSyncMultiQuery]: + """Build a GraphSyncMultiQuery wrapping one GraphSyncQuery per stream.""" + pass + + @staticmethod + def get_multi_sync_result(results: bytes): # -> List[SyncData] + """Parse GraphSyncMultiResult bytes into a list of SyncData, one per + inner GraphSyncResult (each carrying its own streamId/syncPoint/hasMore). + """ + pass diff --git a/keepercommander/keeper_dag/struct/default.py b/keepercommander/keeper_dag/struct/default.py index a58558bff..dd23a6256 100644 --- a/keepercommander/keeper_dag/struct/default.py +++ b/keepercommander/keeper_dag/struct/default.py @@ -1,8 +1,10 @@ from __future__ import annotations +import json from . import DataStructBase from ..types import SyncQuery, Ref, RefType, DAGData, DataPayload, EdgeType, SyncData -from ..crypto import generate_random_bytes, generate_uid_str, bytes_to_str +from ..crypto import generate_random_bytes, generate_uid_str, bytes_to_str, bytes_to_urlsafe_str import base64 +from pydantic import BaseModel from typing import Optional, List @@ -79,3 +81,62 @@ def payload(origin_ref: Ref, dataList=data_list, graphId=graph_id ) + + # --- Per-graph multi-stream read transport --------------------------- + + class _LeafsQuery(BaseModel): + vertices: List[str] + + class _MultiSyncQuery(BaseModel): + queries: List[SyncQuery] + + def leafs_query(self, vertices: List[str]) -> 'DataStruct._LeafsQuery': + return DataStruct._LeafsQuery(vertices=list(vertices)) + + @staticmethod + def get_leafs_result(results: bytes) -> List[Ref]: + try: + obj = json.loads(results) + except Exception as err: + raise Exception(f"Could not parse the leafs JSON result: {err}") + refs_list = obj.get("refs", []) if isinstance(obj, dict) else obj + out: List[Ref] = [] + for r in refs_list: + # Server may return either {type, value, name} or just a value str. + if isinstance(r, dict): + value = r.get("value") + if isinstance(value, bytes): + value = bytes_to_urlsafe_str(value) + out.append(Ref( + type=RefType(r["type"]) if r.get("type") is not None else RefType.GENERAL, + value=value, + name=r.get("name") or None, + )) + return out + + def multi_sync_query(self, + stream_ids: List[bytes], + origin: bytes, + sync_point: int = 0) -> 'DataStruct._MultiSyncQuery': + queries = [ + SyncQuery( + streamId=bytes_to_urlsafe_str(sid), + deviceId=bytes_to_urlsafe_str(origin), + syncPoint=sync_point, + graphId=None, + ) + for sid in stream_ids + ] + return DataStruct._MultiSyncQuery(queries=queries) + + @staticmethod + def get_multi_sync_result(results: bytes) -> List[SyncData]: + try: + obj = json.loads(results) + except Exception as err: + raise Exception(f"Could not parse the multi_sync JSON result: {err}") + items = obj.get("results", []) if isinstance(obj, dict) else obj + out: List[SyncData] = [] + for item in items: + out.append(SyncData.model_validate(item)) + return out diff --git a/keepercommander/keeper_dag/struct/protobuf.py b/keepercommander/keeper_dag/struct/protobuf.py index fcd123d5d..7a449f918 100644 --- a/keepercommander/keeper_dag/struct/protobuf.py +++ b/keepercommander/keeper_dag/struct/protobuf.py @@ -58,23 +58,16 @@ def sync_query(self, ) @staticmethod - def get_sync_result(results: bytes) -> SyncData: - - try: - result = gs_pb2.GraphSyncResult() - result.ParseFromString(results) - except Exception as err: - raise Exception(f"Could not parse the GraphSyncResult message: {err}") - - message = gs_pb2.GraphSyncResult() - message.ParseFromString(results) - + def _sync_data_from_result(message: gs_pb2.GraphSyncResult) -> SyncData: + """Convert a single GraphSyncResult protobuf into a SyncData pydantic + model. Extracted so both single-`sync` and multi_sync code paths share + identical per-result decoding. + """ data_list: List[SyncDataItem] = [] for item in message.data: data_list.append( SyncDataItem( type=DataStruct.PB_TO_DATA_MAP.get(item.data.type), - # content=bytes_to_str(item.data.content), content=item.data.content, content_is_base64=False, ref=Ref( @@ -92,9 +85,21 @@ def get_sync_result(results: bytes) -> SyncData: return SyncData( syncPoint=message.syncPoint, data=data_list, - hasMore=message.hasMore + hasMore=message.hasMore, + streamId=bytes(message.streamId) if message.streamId else None, ) + @staticmethod + def get_sync_result(results: bytes) -> SyncData: + + try: + message = gs_pb2.GraphSyncResult() + message.ParseFromString(results) + except Exception as err: + raise Exception(f"Could not parse the GraphSyncResult message: {err}") + + return DataStruct._sync_data_from_result(message) + @staticmethod def origin_ref(origin_ref_value: bytes, name: str) -> gs_pb2.GraphSyncRef: @@ -149,3 +154,49 @@ def payload(origin_ref: gs_pb2.GraphSyncRef, return gs_pb2.GraphSyncAddDataRequest( origin=origin_ref, data=data_list) + + # --- Per-graph multi-stream read transport --------------------------- + + def leafs_query(self, vertices: List[str]) -> gs_pb2.GraphSyncLeafsQuery: + return gs_pb2.GraphSyncLeafsQuery( + vertices=[urlsafe_str_to_bytes(v) for v in vertices] + ) + + @staticmethod + def get_leafs_result(results: bytes) -> List[Ref]: + msg = gs_pb2.GraphSyncRefsResult() + try: + msg.ParseFromString(results) + except Exception as err: + raise Exception(f"Could not parse the GraphSyncRefsResult message: {err}") + return [ + Ref( + type=DataStruct.PB_TO_REF_MAP.get(r.type), + value=bytes_to_urlsafe_str(r.value), + name=r.name or None, + ) + for r in msg.refs + ] + + def multi_sync_query(self, + stream_ids: List[bytes], + origin: bytes, + sync_point: int = 0) -> gs_pb2.GraphSyncMultiQuery: + return gs_pb2.GraphSyncMultiQuery(queries=[ + gs_pb2.GraphSyncQuery( + streamId=sid, + origin=origin, + syncPoint=sync_point, + maxCount=0, # let krouter default (currently 500) + ) + for sid in stream_ids + ]) + + @staticmethod + def get_multi_sync_result(results: bytes) -> List[SyncData]: + msg = gs_pb2.GraphSyncMultiResult() + try: + msg.ParseFromString(results) + except Exception as err: + raise Exception(f"Could not parse the GraphSyncMultiResult message: {err}") + return [DataStruct._sync_data_from_result(r) for r in msg.results] diff --git a/keepercommander/keeper_dag/types.py b/keepercommander/keeper_dag/types.py index 9ab242c88..4b614fbfe 100644 --- a/keepercommander/keeper_dag/types.py +++ b/keepercommander/keeper_dag/types.py @@ -124,6 +124,17 @@ class PamEndpoints(BaseEnum): PamGraphId.SERVICE_LINKS.value: PamEndpoints.SERVICE_LINKS, } +# Inverse map for callers that have a graph_id int and need the PamEndpoints enum +# to address the new /api/user/graph-sync// routes. +GRAPH_ID_TO_ENDPOINT = { + PamGraphId.PAM.value: PamEndpoints.PAM, + PamGraphId.DISCOVERY_RULES.value: PamEndpoints.DISCOVERY_RULES, + PamGraphId.DISCOVERY_JOBS.value: PamEndpoints.DISCOVERY_JOBS, + PamGraphId.INFRASTRUCTURE.value: PamEndpoints.INFRASTRUCTURE, + PamGraphId.SERVICE_LINKS.value: PamEndpoints.SERVICE_LINKS, +} + + class SyncQuery(BaseModel): streamId: Optional[str] = None # base64 of a user's ID who is syncing. deviceId: Optional[str] = None @@ -134,7 +145,10 @@ class SyncQuery(BaseModel): class SyncDataItem(BaseModel): ref: Ref parentRef: Optional[Ref] = None - content: Optional[str] = None + # Either a base64-encoded string (JSON wire format) or raw bytes + # (protobuf wire format). `content_is_base64` distinguishes them so the + # consumer can decode appropriately. + content: Optional[Union[str, bytes]] = None content_is_base64: bool = True type: Optional[str] = None path: Optional[str] = None @@ -145,6 +159,9 @@ class SyncData(BaseModel): syncPoint: int data: List[SyncDataItem] hasMore: bool + # Per-graph multi_sync: identifies which stream this result came from. + # None for single-stream `sync` results (backward compatible). + streamId: Optional[bytes] = None class Ref(BaseModel): diff --git a/keepercommander/keeper_dag/utils.py b/keepercommander/keeper_dag/utils.py index 43ad5a76e..51e33c921 100644 --- a/keepercommander/keeper_dag/utils.py +++ b/keepercommander/keeper_dag/utils.py @@ -55,5 +55,5 @@ def set_file_permissions(file_path): # type: (str) -> None check=False, capture_output=True) subprocess.run(["icacls", file_path, "/grant", f"{username}:M"], check=True, capture_output=True) logging.debug(f'Set secure permissions (owner Modify only) for Windows file: {file_path}') - except Exception: - logging.warning(f'Failed to set file permissions for {file_path}') + except (OSError, subprocess.SubprocessError) as err: + logging.warning(f'Failed to set file permissions for {file_path}: {err}')