Skip to content

Commit c740dd4

Browse files
committed
PYCBC-1744: Do not allow cluster to reconnect after close
Motivation ========== We want all APIs in the SDK (acouchbase, couchbase & txcouchbase) to be as consistent as possible. Currently how the SDK handles attempted operations after a close() has been called is not consistent across operations and/or APIs. This change is to enforce not allowing any operations once a close() has been called. The only way to reconnect is to create another cluster instance. The previous behavior would not have been good user experience b/c once close() has completed the underlying C++ core stops its connection and does not allow further operations. Modification ============ * Consistenlyt raise a RuntimeError w/ a consistent message if an operation is attempted after close() has completed. * close() is idempotent * Add tests to confirm functionality (across all operations) Results ======= All tests pass. Change-Id: I63df727f9c08c5c952402229729e7720de48e867 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/240282 Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 6309017 commit c740dd4

11 files changed

Lines changed: 347 additions & 24 deletions

File tree

acouchbase/logic/bucket_impl.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,9 @@ async def ping(self, req: PingRequest) -> PingResult:
117117

118118
def view_query(self, req: ViewQueryRequest) -> ViewResult:
119119
"""**INTERNAL**"""
120+
self._client_adapter._ensure_not_closed()
120121
if not self.connected or not self.bucket_connected:
121-
raise RuntimeError('Cannot attempt to execute a view query to establishing a connection.')
122+
raise RuntimeError('Cannot perform operations without first establishing a connection.')
122123
# If the view_query was provided a timeout we will use that value for the streaming timeout
123124
# when the streaming object is created in the bindings. If the view_query does not specify a
124125
# timeout, the streaming_timeout defaults to cluster's view_timeout (set here). If the cluster

acouchbase/logic/client_adapter.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def __init__(self,
6060
self._binding_map = BindingMap()
6161
self._close_ft: Optional[Future[None]] = None
6262
self._connect_ft: Optional[Future[None]] = None
63+
self._closed = False
6364
self._create_connection()
6465

6566
@property
@@ -78,13 +79,26 @@ def connect_ft(self) -> Optional[Future[None]]:
7879
def loop(self) -> AbstractEventLoop:
7980
return self._loop
8081

82+
def _ensure_not_closed(self) -> None:
83+
if self._closed:
84+
raise RuntimeError(
85+
'Cannot perform operations on a closed cluster. Create a new cluster instance to reconnect.')
86+
87+
def _ensure_connected(self) -> None:
88+
if not self.connected:
89+
raise RuntimeError('Cannot perform operations without first establishing a connection.')
90+
8191
async def close_connection(self) -> None:
92+
if self._closed:
93+
return # Already closed, idempotent behavior
94+
8295
self._close_ft = self._execute_close_connection_request()
8396
await self._close_ft
97+
self._closed = True
8498

8599
def execute_bucket_request(self, req: BucketRequest) -> Future[Any]:
86-
if not self.connected:
87-
raise RuntimeError('Cannot attempt bucket request without first establishing a connection.')
100+
self._ensure_not_closed()
101+
self._ensure_connected()
88102

89103
ft = self._loop.create_future()
90104

@@ -104,6 +118,8 @@ def execute_close_bucket_request(self, bucket_name: str) -> Future[None]:
104118
return self.execute_bucket_request(req)
105119

106120
def execute_cluster_request(self, req: ClusterRequest) -> Future[Any]:
121+
self._ensure_not_closed()
122+
107123
ft = self._loop.create_future()
108124

109125
def _callback(result) -> None:
@@ -122,15 +138,16 @@ def _errback(exc) -> None:
122138
return ft
123139

124140
def execute_cluster_request_sync(self, req: ClusterRequest) -> Any:
141+
self._ensure_not_closed()
125142
req_dict = req.req_to_dict(self._connection)
126143
ret = self._execute_req_sync(req.op_name, req_dict)
127144
if isinstance(ret, BaseCouchbaseException):
128145
raise ErrorMapper.build_exception(ret)
129146
return ret
130147

131148
def execute_collection_request(self, req: CollectionRequest) -> Future[Any]:
132-
if not self.connected:
133-
raise RuntimeError('Cannot attempt collection request without first establishing a connection.')
149+
self._ensure_not_closed()
150+
self._ensure_connected()
134151

135152
ft = self._loop.create_future()
136153

@@ -146,6 +163,8 @@ def _errback(exc) -> None:
146163
return ft
147164

148165
def execute_connect_bucket_request(self, bucket_name: str) -> Future[None]:
166+
self._ensure_not_closed()
167+
149168
req = OpenBucketRequest(bucket_name, OpenOrCloseBucket.OPEN)
150169
ft = self._loop.create_future()
151170

@@ -167,6 +186,8 @@ def _errback(ret: Any) -> None:
167186
return ft
168187

169188
def execute_mgmt_request(self, req: MgmtRequest) -> Future[Any]:
189+
self._ensure_not_closed()
190+
170191
ft = self._loop.create_future()
171192

172193
def _callback(ret: Any) -> None:
@@ -187,13 +208,15 @@ def _errback(ret: Any) -> None:
187208
return ft
188209

189210
async def wait_until_connected(self) -> None:
211+
self._ensure_not_closed()
190212
if self.connected:
191213
return
192214
if self._connect_ft is None:
193215
self._create_connection()
194216
await self._connect_ft
195217

196218
def _create_connection(self) -> None:
219+
self._ensure_not_closed()
197220
if self._close_ft is not None and self._close_ft.done() is not True:
198221
raise RuntimeError('Cannot attempt to connect when close attempt is pending.')
199222

@@ -235,6 +258,11 @@ def _errback(ret: Any) -> None:
235258

236259
req_dict = req.req_to_dict(self._connection, callback=_callback, errback=_errback)
237260
if not self.connected:
261+
# If we're closed, don't try to reconnect just to close again
262+
if self._closed:
263+
ft.set_result(None)
264+
return ft
265+
238266
chained_ft = self._execute_connect_request() if self._connect_ft is None else self._connect_ft
239267
chained_ft.add_done_callback(partial(self._execute_chained_req, ft, req.op_name, req_dict))
240268
else:

acouchbase/logic/cluster_impl.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ def transactions(self) -> Transactions:
145145

146146
def analytics_query(self, req: AnalyticsQueryRequest) -> AnalyticsResult:
147147
"""**INTERNAL**"""
148-
if not self.connected:
149-
raise RuntimeError('Cannot attempt to execute an analytics query prior to establishing a connection.')
148+
self._client_adapter._ensure_not_closed()
149+
self._client_adapter._ensure_connected()
150150
# If the analytics_query was provided a timeout we will use that value for the streaming timeout
151151
# when the streaming object is created in the bindings. If the analytics_query does not specify a
152152
# timeout, the streaming_timeout defaults to cluster's analytics_timeout (set here). If the cluster
@@ -196,8 +196,8 @@ async def ping(self, req: PingRequest) -> PingResult:
196196

197197
def query(self, req: QueryRequest) -> QueryResult:
198198
"""**INTERNAL**"""
199-
if not self.connected:
200-
raise RuntimeError('Cannot attempt to execute a query prior to establishing a connection.')
199+
self._client_adapter._ensure_not_closed()
200+
self._client_adapter._ensure_connected()
201201
# If the n1ql_query was provided a timeout we will use that value for the streaming timeout
202202
# when the streaming object is created in the bindings. If the n1ql_query does not specify a
203203
# timeout, the streaming_timeout defaults to cluster's query_timeout (set here). If the cluster
@@ -213,8 +213,8 @@ def query(self, req: QueryRequest) -> QueryResult:
213213

214214
def search(self, req: SearchQueryRequest) -> SearchResult:
215215
"""**INTERNAL**"""
216-
if not self.connected:
217-
raise RuntimeError('Cannot attempt to execute a search prior to establishing a connection.')
216+
self._client_adapter._ensure_not_closed()
217+
self._client_adapter._ensure_connected()
218218
# If the search_query was provided a timeout we will use that value for the streaming timeout
219219
# when the streaming object is created in the bindings. If the search_query does not specify a
220220
# timeout, the streaming_timeout defaults to cluster's search_timeout (set here). If the cluster

acouchbase/logic/scope_impl.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ def name(self) -> str:
101101
return self._scope_name
102102

103103
def analytics_query(self, req: AnalyticsQueryRequest) -> AnalyticsResult:
104-
if not self.connected:
105-
raise RuntimeError('Cannot attempt to execute an analytics query prior to establishing a connection.')
104+
self._client_adapter._ensure_not_closed()
105+
self._client_adapter._ensure_connected()
106106
# If the analytics_query was provided a timeout we will use that value for the streaming timeout
107107
# when the streaming object is created in the bindings. If the analytics_query does not specify a
108108
# timeout, the streaming_timeout defaults to cluster's analytics_timeout (set here). If the cluster
@@ -117,8 +117,8 @@ def analytics_query(self, req: AnalyticsQueryRequest) -> AnalyticsResult:
117117
num_workers=req.num_workers))
118118

119119
def query(self, req: QueryRequest) -> QueryResult:
120-
if not self.connected:
121-
raise RuntimeError('Cannot attempt to execute a query prior to establishing a connection.')
120+
self._client_adapter._ensure_not_closed()
121+
self._client_adapter._ensure_connected()
122122
# If the n1ql_query was provided a timeout we will use that value for the streaming timeout
123123
# when the streaming object is created in the bindings. If the n1ql_query does not specify a
124124
# timeout, the streaming_timeout defaults to cluster's query_timeout (set here). If the cluster
@@ -133,8 +133,8 @@ def query(self, req: QueryRequest) -> QueryResult:
133133
num_workers=req.num_workers))
134134

135135
def search(self, req: SearchQueryRequest) -> SearchResult:
136-
if not self.connected:
137-
raise RuntimeError('Cannot attempt to execute a search prior to establishing a connection.')
136+
self._client_adapter._ensure_not_closed()
137+
self._client_adapter._ensure_connected()
138138
# If the search_query was provided a timeout we will use that value for the streaming timeout
139139
# when the streaming object is created in the bindings. If the search_query does not specify a
140140
# timeout, the streaming_timeout defaults to cluster's search_timeout (set here). If the cluster

acouchbase/tests/cluster_t.py

Lines changed: 141 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import pytest
2121
import pytest_asyncio
2222

23-
from acouchbase.cluster import get_event_loop
23+
from acouchbase.cluster import Cluster, get_event_loop
24+
from couchbase.auth import PasswordAuthenticator
2425
from couchbase.diagnostics import (ClusterState,
2526
EndpointPingReport,
2627
EndpointState,
@@ -30,7 +31,9 @@
3031
InvalidArgumentException,
3132
ParsingFailedException,
3233
QueryIndexNotFoundException)
33-
from couchbase.options import DiagnosticsOptions, PingOptions
34+
from couchbase.options import (ClusterOptions,
35+
DiagnosticsOptions,
36+
PingOptions)
3437
from couchbase.result import DiagnosticsResult, PingResult
3538

3639
from ._test_utils import TestEnvironment
@@ -218,3 +221,139 @@ async def test_diagnostics_as_json(self, cb_env):
218221
assert data[0]['remote'] is not None
219222
assert data[0]['local'] is not None
220223
assert data[0]['state'] is not None
224+
225+
@pytest.mark.asyncio
226+
async def test_await_on_close_cluster(self, cb_env):
227+
conn_string = cb_env.config.get_connection_string()
228+
username, pw = cb_env.config.get_username_and_pw()
229+
auth = PasswordAuthenticator(username, pw)
230+
opts = ClusterOptions(auth)
231+
cluster = Cluster(conn_string, opts)
232+
assert cluster._impl.connected is False
233+
await cluster.close()
234+
assert cluster._impl.connected is False
235+
236+
@pytest.mark.asyncio
237+
async def test_multiple_close_cluster(self, cb_env):
238+
conn_string = cb_env.config.get_connection_string()
239+
username, pw = cb_env.config.get_username_and_pw()
240+
auth = PasswordAuthenticator(username, pw)
241+
opts = ClusterOptions(auth)
242+
cluster = await Cluster.connect(conn_string, opts)
243+
assert cluster._impl.connected is True
244+
for _ in range(10):
245+
await cluster.close()
246+
assert cluster._impl.connected is False
247+
248+
@pytest.mark.asyncio
249+
async def test_operations_after_close(self, cb_env):
250+
conn_string = cb_env.config.get_connection_string()
251+
username, pw = cb_env.config.get_username_and_pw()
252+
auth = PasswordAuthenticator(username, pw)
253+
opts = ClusterOptions(auth)
254+
cluster = await Cluster.connect(conn_string, opts)
255+
assert cluster._impl.connected is True
256+
257+
# Close the cluster
258+
await cluster.close()
259+
assert cluster._impl.connected is False
260+
261+
# Verify operations after close raise RuntimeError
262+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
263+
await cluster.ping()
264+
265+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
266+
await cluster.diagnostics()
267+
268+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
269+
cluster.bucket(cb_env.bucket.name)
270+
271+
@pytest.mark.asyncio
272+
async def test_connected_property(self, cb_env):
273+
conn_string = cb_env.config.get_connection_string()
274+
username, pw = cb_env.config.get_username_and_pw()
275+
auth = PasswordAuthenticator(username, pw)
276+
opts = ClusterOptions(auth)
277+
cluster = await Cluster.connect(conn_string, opts)
278+
assert cluster._impl.connected is True
279+
280+
# Close the cluster
281+
await cluster.close()
282+
283+
assert cluster._impl.connected is False
284+
285+
@pytest.mark.asyncio
286+
async def test_streaming_operations_after_close(self, cb_env):
287+
"""Verify all streaming operations raise RuntimeError after cluster close"""
288+
from couchbase.search import MatchQuery
289+
290+
conn_string = cb_env.config.get_connection_string()
291+
username, pw = cb_env.config.get_username_and_pw()
292+
auth = PasswordAuthenticator(username, pw)
293+
opts = ClusterOptions(auth)
294+
cluster = await Cluster.connect(conn_string, opts)
295+
296+
# Close the cluster
297+
await cluster.close()
298+
299+
# Test cluster.query
300+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
301+
cluster.query("SELECT 1=1")
302+
303+
# Test cluster.analytics_query
304+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
305+
cluster.analytics_query("SELECT 1=1")
306+
307+
# Test cluster.search
308+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
309+
cluster.search_query("dummy-index", MatchQuery("test"))
310+
311+
# Test bucket.view_query
312+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
313+
bucket = cluster.bucket(cb_env.bucket.name)
314+
bucket.view_query("dummy-design", "dummy-view")
315+
316+
@pytest.mark.flaky(reruns=5, reruns_delay=1)
317+
@pytest.mark.asyncio
318+
async def test_management_operations_after_close(self, cb_env):
319+
from couchbase.management.views import DesignDocumentNamespace
320+
321+
conn_string = cb_env.config.get_connection_string()
322+
username, pw = cb_env.config.get_username_and_pw()
323+
auth = PasswordAuthenticator(username, pw)
324+
opts = ClusterOptions(auth)
325+
cluster = await Cluster.connect(conn_string, opts)
326+
bucket_name = cb_env.bucket.name
327+
328+
# Get bucket instance before closing (for bucket-level manager tests)
329+
bucket = cluster.bucket(bucket_name)
330+
331+
# Close the cluster
332+
await cluster.close()
333+
assert cluster._impl.connected is False
334+
335+
# Test cluster-level management APIs
336+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
337+
await cluster.buckets().get_all_buckets()
338+
339+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
340+
await cluster.users().get_all_users()
341+
342+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
343+
await cluster.query_indexes().get_all_indexes(bucket_name)
344+
345+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
346+
await cluster.analytics_indexes().get_all_datasets()
347+
348+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
349+
await cluster.search_indexes().get_all_indexes()
350+
351+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
352+
await cluster.eventing_functions().get_all_functions()
353+
354+
# Test bucket-level management APIs
355+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
356+
await bucket.collections().get_all_scopes()
357+
358+
with pytest.raises(RuntimeError, match="Cannot perform operations on a closed cluster"):
359+
await bucket.view_indexes().get_all_design_documents(DesignDocumentNamespace.DEVELOPMENT)

couchbase/logic/bucket_impl.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ def ping(self, req: PingRequest) -> PingResult:
9292

9393
def view_query(self, req: ViewQueryRequest) -> ViewResult:
9494
"""**INTERNAL**"""
95+
self._client_adapter._ensure_not_closed()
96+
self._client_adapter._ensure_connected()
9597
# If the view_query was provided a timeout we will use that value for the streaming timeout
9698
# when the streaming object is created in the bindings. If the view_query does not specify a
9799
# timeout, the streaming_timeout defaults to cluster's view_timeout (set here). If the cluster

0 commit comments

Comments
 (0)