Skip to content

Commit 64a401a

Browse files
committed
PYCBC-1756: Streaming API Improvements
Motivation ========== Some final cleanup/improvement changes to the streaming API prior to releasing the next dot-minor that includes significant changes to the underlying c-extension and the overall shape of the SDK's internal logic. Modification ============ * Handle metadata metrics that might be set to `None` * Use default C++ core timeouts for streaming timeouts (if not provided) * Update query params property to not JSON-ify positional, named and raw options each time it is called * Add query test to confirm metrics handling is correct Change-Id: I42bf644a1085fbc812fe0a3f94437b96c17bf2be Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/242421 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Brett Lawson <brett19@gmail.com>
1 parent d37b524 commit 64a401a

7 files changed

Lines changed: 77 additions & 32 deletions

File tree

acouchbase/logic/cluster_impl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class AsyncClusterImpl:
5757
def __init__(self, connstr: str, *options: object, **kwargs: object) -> None:
5858
loop: Optional[AbstractEventLoop] = kwargs.pop('loop', None)
5959
loop_validator = kwargs.pop('loop_validator', None)
60+
kwargs['_default_timeouts'] = pycbc_connection.pycbc_get_default_timeouts()
6061
self._cluster_settings = ClusterSettings.build_cluster_settings(connstr, *options, **kwargs)
6162
connect_request = CreateConnectionRequest(self._cluster_settings.connstr,
6263
self._cluster_settings.auth,

couchbase/logic/analytics.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ def errors(self) -> List[AnalyticsError]:
151151
)
152152

153153
def metrics(self) -> Optional[AnalyticsMetrics]:
154-
if "metrics" in self._raw:
155-
return AnalyticsMetrics(self._raw.get("metrics", {}))
154+
raw_metrics = self._raw.get('metrics', None)
155+
if raw_metrics:
156+
return AnalyticsMetrics(raw_metrics)
156157
return None
157158

158159
def __repr__(self):
@@ -166,6 +167,7 @@ class AnalyticsQuery:
166167
'read_only': {'readonly': lambda x: x},
167168
'scan_consistency': {'consistency': lambda x: x.value},
168169
'client_context_id': {'client_context_id': lambda x: x},
170+
'metrics': {'metrics': lambda x: x},
169171
'priority': {'priority': lambda x: x},
170172
'query_context': {'query_context': lambda x: x},
171173
'serializer': {'serializer': lambda x: x},
@@ -196,10 +198,10 @@ def _set_named_args(self, **kv):
196198
`$` identifier.
197199
198200
"""
201+
arg_dict = self._params.setdefault("named_parameters", {})
199202
# C++ core wants all args JSONified bytes
200203
named_params = {f'${k}': json.dumps(v).encode('utf-8') for k, v in kv.items()}
201-
202-
self._params["named_parameters"] = named_params
204+
arg_dict.update(named_params)
203205
return self
204206

205207
def _add_pos_args(self, *args):

couchbase/logic/cluster_impl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
class ClusterImpl:
5858
def __init__(self, connstr: str, *options: object, **kwargs: object) -> None:
5959
skip_connect = kwargs.pop('skip_connect', None)
60+
kwargs['_default_timeouts'] = pycbc_connection.pycbc_get_default_timeouts()
6061
self._cluster_settings = ClusterSettings.build_cluster_settings(connstr, *options, **kwargs)
6162
connect_request = CreateConnectionRequest(self._cluster_settings.connstr,
6263
self._cluster_settings.auth,

couchbase/logic/cluster_settings.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ class ClusterSettings:
335335
cluster_options: Dict[str, Any]
336336
default_transcoder: Transcoder
337337
default_serializer: Serializer
338+
default_timeouts: Dict[str, int]
338339
metrics_options: Dict[str, Any]
339340
orphan_options: Dict[str, Any]
340341
streaming_timeouts: StreamingTimeouts
@@ -352,6 +353,7 @@ def build_cluster_settings(cls,
352353
*options, # type: ClusterOptions
353354
**kwargs # type: Dict[str, Any]
354355
) -> ClusterSettings:
356+
default_timeouts = kwargs.pop('_default_timeouts', {})
355357
# parse query string prior to parsing ClusterOptions
356358
connection_str, query_opts, legacy_opts = parse_connection_string(connstr)
357359

@@ -375,10 +377,11 @@ def build_cluster_settings(cls,
375377

376378
timeout_opts = build_timeout_options(cluster_opts)
377379
streaming_timeouts: StreamingTimeouts = {
378-
'analytics_timeout': timeout_opts.get('analytics_timeout', None),
379-
'query_timeout': timeout_opts.get('query_timeout', None),
380-
'search_timeout': timeout_opts.get('search_timeout', None),
381-
'view_timeout': timeout_opts.get('view_timeout', None),
380+
'analytics_timeout': timeout_opts.get('analytics_timeout',
381+
default_timeouts.get('analytics_timeout', None)),
382+
'query_timeout': timeout_opts.get('query_timeout', default_timeouts.get('query_timeout', None)),
383+
'search_timeout': timeout_opts.get('search_timeout', default_timeouts.get('search_timeout', None)),
384+
'view_timeout': timeout_opts.get('view_timeout', default_timeouts.get('view_timeout', None)),
382385
}
383386
tracing_opts, orphan_opts, tracer = build_tracing_and_orphan_options(cluster_opts)
384387
metrics_opts, meter = build_metrics_options(cluster_opts)
@@ -389,6 +392,7 @@ def build_cluster_settings(cls,
389392
cluster_opts,
390393
default_transcoder,
391394
default_serializer,
395+
default_timeouts,
392396
metrics_opts,
393397
orphan_opts,
394398
streaming_timeouts,

couchbase/logic/n1ql.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
from __future__ import annotations
1717

18-
import functools
1918
import json
2019
from datetime import timedelta
2120
from enum import Enum
@@ -363,7 +362,10 @@ def _set_named_args(self, **kv):
363362
364363
"""
365364
arg_dict = self._params.setdefault("named_parameters", {})
366-
arg_dict.update(kv)
365+
# C++ core wants all args JSONified bytes
366+
named_params = {f'${k}': json.dumps(v).encode('utf-8') for k, v in kv.items()}
367+
arg_dict.update(named_params)
368+
return self
367369

368370
def _add_pos_args(self, *args):
369371
"""
@@ -372,7 +374,9 @@ def _add_pos_args(self, *args):
372374
:param args: Values to be used
373375
"""
374376
arg_array = self._params.setdefault("positional_parameters", [])
375-
arg_array.extend(args)
377+
# C++ core wants all args JSONified bytes
378+
json_args = [json.dumps(arg).encode('utf-8') for arg in args]
379+
arg_array.extend(json_args)
376380

377381
def set_option(self, name, value):
378382
"""
@@ -386,24 +390,9 @@ def set_option(self, name, value):
386390
"""
387391
self._params[name] = value
388392

389-
@functools.cached_property
390-
def params(self) -> Dict[str, Any]:
391-
params = self._params
392-
393-
# couchbase++ wants all args JSONified,
394-
# For now encode to bytes to make couchbase::json_string <--> std::vector<std::byte> easier
395-
raw = params.pop('raw', None)
396-
if raw:
397-
params['raw'] = {f'{k}': self._serializer.serialize(v) for k, v in raw.items()}
398-
399-
positional_args = params.pop('positional_parameters', None)
400-
if positional_args:
401-
params['positional_parameters'] = [self._serializer.serialize(arg) for arg in positional_args]
402-
403-
named_params = params.pop('named_parameters', None)
404-
if named_params:
405-
params['named_parameters'] = {f'${k}': self._serializer.serialize(v) for k, v in named_params.items()}
406-
return params
393+
@property
394+
def params(self):
395+
return self._params
407396

408397
@property
409398
def metrics(self) -> bool:
@@ -668,7 +657,8 @@ def raw(self, value # type: Dict[str, Any]
668657
for k in value.keys():
669658
if not isinstance(k, str):
670659
raise TypeError("key for raw value must be str")
671-
self.set_option('raw', value)
660+
raw_params = {f'{k}': json.dumps(v).encode('utf-8') for k, v in value.items()}
661+
self.set_option('raw', raw_params)
672662

673663
@property
674664
def span(self) -> Optional[SpanProtocol]:

couchbase/logic/search.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,9 @@ def errors(self) -> Dict[str, str]:
366366
return self._raw.get('errors', {})
367367

368368
def metrics(self) -> Optional[SearchMetrics]:
369-
if 'metrics' in self._raw:
370-
return SearchMetrics(self._raw.get('metrics', {}))
369+
raw_metrics = self._raw.get('metrics', None)
370+
if raw_metrics:
371+
return SearchMetrics(raw_metrics)
371372
return None
372373

373374
def client_context_id(self) -> Optional[str]:

couchbase/tests/query_t.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class QueryCollectionTestSuite:
4848
'test_query_fully_qualified',
4949
'test_query_in_thread',
5050
'test_query_metadata',
51+
'test_query_metadata_no_metrics',
5152
'test_query_ryow',
5253
'test_query_with_metrics',
5354
'test_scope_query',
@@ -134,6 +135,28 @@ def test_query_metadata(self, cb_env):
134135
assert isinstance(warning.message(), str)
135136
assert isinstance(warning.code(), int)
136137

138+
def test_query_metadata_no_metrics(self, cb_env):
139+
result = cb_env.scope.query(f"SELECT * FROM `{cb_env.collection.name}` LIMIT 2",
140+
QueryOptions(metrics=False))
141+
cb_env.assert_rows(result, 2)
142+
metadata = result.metadata() # type: QueryMetaData
143+
for id_meth in (metadata.client_context_id, metadata.request_id):
144+
id_res = id_meth()
145+
fail_msg = "{} failed".format(id_meth)
146+
assert isinstance(id_res, str), fail_msg
147+
148+
metrics = metadata.metrics()
149+
assert metrics is None
150+
151+
assert metadata.status() == QueryStatus.SUCCESS
152+
assert isinstance(metadata.signature(), (str, dict))
153+
assert isinstance(metadata.warnings(), list)
154+
155+
for warning in metadata.warnings():
156+
assert isinstance(warning, QueryWarning)
157+
assert isinstance(warning.message(), str)
158+
assert isinstance(warning.code(), int)
159+
137160
def test_query_ryow(self, cb_env):
138161
key, value = cb_env.get_new_doc()
139162
result = cb_env.scope.query(f'SELECT * FROM `{cb_env.collection.name}` USE KEYS "{key}"')
@@ -199,6 +222,7 @@ class QueryTestSuite:
199222
'test_query_error_context',
200223
'test_query_in_thread',
201224
'test_query_metadata',
225+
'test_query_metadata_no_metrics',
202226
'test_query_raw_options',
203227
'test_query_ryow',
204228
'test_query_timeout',
@@ -319,6 +343,28 @@ def test_query_metadata(self, cb_env):
319343
assert isinstance(warning.message(), str)
320344
assert isinstance(warning.code(), int)
321345

346+
def test_query_metadata_no_metrics(self, cb_env):
347+
result = cb_env.cluster.query(f"SELECT * FROM `{cb_env.bucket.name}` LIMIT 2",
348+
QueryOptions(metrics=False))
349+
cb_env.assert_rows(result, 2)
350+
metadata = result.metadata() # type: QueryMetaData
351+
for id_meth in (metadata.client_context_id, metadata.request_id):
352+
id_res = id_meth()
353+
fail_msg = "{} failed".format(id_meth)
354+
assert isinstance(id_res, str), fail_msg
355+
356+
metrics = metadata.metrics()
357+
assert metrics is None
358+
359+
assert metadata.status() == QueryStatus.SUCCESS
360+
assert isinstance(metadata.signature(), (str, dict))
361+
assert isinstance(metadata.warnings(), list)
362+
363+
for warning in metadata.warnings():
364+
assert isinstance(warning, QueryWarning)
365+
assert isinstance(warning.message(), str)
366+
assert isinstance(warning.code(), int)
367+
322368
def test_query_raw_options(self, cb_env):
323369
# via raw, we should be able to pass any option
324370
# if using named params, need to match full name param in query

0 commit comments

Comments
 (0)