Skip to content

Commit adb6e09

Browse files
committed
PYCBC-1700: Add support for Python 3.14
Motivation ========== Provide good user experience for those that want to use the latest Python version, 3.14. Modification ============ * Updated get_event_loop() utility to handle Python 3.14 behavior if the loop does not exist * Moved loop validation in the async client adapter to occur prior to the first asynchronous operation (previously it was performed in the synchronous __init__()) * Added couchbase_loop_factory to enable users (Python >= 3.11) to use `asyncio.run(main(), loop_factory=couchbase_loop_factory)` * Updated transcoder tests in acouchbase/txcouchbase to close newly created connections * Fixed KeyValueOpTranscoder tests to try and decode in order to raise the expected exception (change from PYCBC-1725) * [many|musl]linux Docker images have been updated to handle cp314 (PYCBC-1724) Results ======= All builds are successful and all tests pass. Change-Id: Id1f86624d5ae77503337502c8e38daf67e3ceddb Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/241880 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com>
1 parent 2ebe738 commit adb6e09

5 files changed

Lines changed: 131 additions & 85 deletions

File tree

acouchbase/__init__.py

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,29 @@
1515

1616
import asyncio
1717
import selectors
18+
from typing import Optional
1819

1920

2021
class LoopValidator:
2122
REQUIRED_METHODS = {'add_reader', 'remove_reader',
2223
'add_writer', 'remove_writer'}
2324

2425
@staticmethod
25-
def _get_working_loop():
26-
evloop = asyncio.get_event_loop()
26+
def _get_working_loop() -> asyncio.AbstractEventLoop:
27+
try:
28+
# Python <= 3.13: Returns existing or auto-creates a new loop.
29+
# Python 3.14+: Returns existing or raises RuntimeError.
30+
evloop = asyncio.get_event_loop()
31+
except RuntimeError:
32+
# No event loop exists in the current thread.
33+
evloop = None
34+
2735
gen_new_loop = not LoopValidator._is_valid_loop(evloop)
2836
if gen_new_loop:
29-
evloop.close()
37+
# Only attempt to close the loop if it actually exists
38+
if evloop is not None and not evloop.is_closed():
39+
evloop.close()
40+
3041
selector = selectors.SelectSelector()
3142
new_loop = asyncio.SelectorEventLoop(selector)
3243
asyncio.set_event_loop(new_loop)
@@ -35,7 +46,7 @@ def _get_working_loop():
3546
return evloop
3647

3748
@staticmethod
38-
def _is_valid_loop(evloop):
49+
def _is_valid_loop(evloop: asyncio.AbstractEventLoop) -> bool:
3950
if not evloop:
4051
return False
4152
for meth in LoopValidator.REQUIRED_METHODS:
@@ -46,28 +57,44 @@ def _is_valid_loop(evloop):
4657
return True
4758

4859
@staticmethod
49-
def get_event_loop(evloop):
60+
def get_event_loop(evloop: asyncio.AbstractEventLoop) -> asyncio.AbstractEventLoop:
5061
if LoopValidator._is_valid_loop(evloop):
5162
return evloop
5263
return LoopValidator._get_working_loop()
5364

5465
@staticmethod
55-
def close_loop():
56-
evloop = asyncio.get_event_loop()
57-
evloop.close()
66+
def close_loop() -> None:
67+
try:
68+
evloop = asyncio.get_event_loop()
69+
if not evloop.is_closed():
70+
evloop.close()
71+
except RuntimeError:
72+
# If there is no loop to get, there is no loop to close.
73+
pass # nosec
5874

5975

60-
def get_event_loop(
61-
evloop=None, # type: asyncio.AbstractEventLoop
62-
):
76+
def couchbase_loop_factory() -> asyncio.AbstractEventLoop:
6377
"""
64-
Get an event loop compatible with acouchbase.
65-
Some Event loops, such as ProactorEventLoop (the default asyncio event
66-
loop for Python 3.8 on Windows) are not compatible with acouchbase as
67-
they don't implement all members in the abstract base class.
78+
Dumb factory that simply builds a SelectorEventLoop.
79+
80+
Returns:
81+
A SelectorEventLoop instance.
82+
"""
83+
selector = selectors.SelectSelector()
84+
return asyncio.SelectorEventLoop(selector)
6885

69-
:param evloop: preferred event loop
70-
:return: The preferred event loop, if compatible, otherwise, a compatible
71-
alternative event loop.
86+
87+
def get_event_loop(evloop: Optional[asyncio.AbstractEventLoop] = None) -> asyncio.AbstractEventLoop:
7288
"""
89+
Get an event loop compatible with acouchbase.
90+
91+
Some Event loops, such as ProactorEventLoop (the default asyncio event loop for Python 3.8 on Windows)
92+
are not compatible with acouchbase as they don't implement all members in the abstract base class.
93+
94+
Args:
95+
evloop (asyncio.AbstractEventLoop, optional): An optional event loop to validate. If not provided, the default event loop will be used.
96+
97+
Returns:
98+
The preferred event loop, if compatible, otherwise, a compatible alternative event loop.
99+
""" # noqa: E501
73100
return LoopValidator.get_event_loop(evloop)

acouchbase/logic/client_adapter.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,16 @@
4747
class AsyncClientAdapter:
4848

4949
def __init__(self,
50-
loop: AbstractEventLoop,
5150
connect_req: CreateConnectionRequest,
51+
loop: Optional[AbstractEventLoop] = None,
5252
loop_validator: Optional[Callable[[Optional[AbstractEventLoop]], AbstractEventLoop]] = None
5353
) -> None:
5454
num_io_threads = connect_req.options.get('num_io_threads', None)
5555
self._connection = pycbc_connection(num_io_threads) if num_io_threads is not None else pycbc_connection()
56-
if loop_validator:
57-
self._loop = loop_validator(loop)
58-
else:
59-
self._loop = self._get_loop(loop)
56+
# The loop will be setup prior to running the first async op.
57+
# This is the preferred mechanism to handling the event loop.
58+
self._loop: Optional[AbstractEventLoop] = loop
59+
self._loop_validator = loop_validator
6060
self._connect_req = connect_req
6161
self._binding_map = BindingMap(self._connection)
6262
self._close_ft: Optional[Future[None]] = None
@@ -78,6 +78,8 @@ def connect_ft(self) -> Optional[Future[None]]:
7878

7979
@property
8080
def loop(self) -> AbstractEventLoop:
81+
if not self._loop:
82+
self._loop = self._get_loop()
8183
return self._loop
8284

8385
def _ensure_not_closed(self) -> None:
@@ -101,14 +103,14 @@ def execute_bucket_request(self, req: BucketRequest) -> Future[Any]:
101103
self._ensure_not_closed()
102104
self._ensure_connected()
103105

104-
ft = self._loop.create_future()
106+
ft = self.loop.create_future()
105107

106108
def _callback(result) -> None:
107-
self._loop.call_soon_threadsafe(ft.set_result, result)
109+
self.loop.call_soon_threadsafe(ft.set_result, result)
108110

109111
def _errback(exc) -> None:
110112
excptn = ErrorMapper.build_exception(exc)
111-
self._loop.call_soon_threadsafe(ft.set_exception, excptn)
113+
self.loop.call_soon_threadsafe(ft.set_exception, excptn)
112114

113115
req_dict = req.req_to_dict(callback=_callback, errback=_errback)
114116
self._execute_req(ft, req.op_name, req_dict)
@@ -121,14 +123,14 @@ def execute_close_bucket_request(self, bucket_name: str) -> Future[None]:
121123
def execute_cluster_request(self, req: ClusterRequest) -> Future[Any]:
122124
self._ensure_not_closed()
123125

124-
ft = self._loop.create_future()
126+
ft = self.loop.create_future()
125127

126128
def _callback(result) -> None:
127-
self._loop.call_soon_threadsafe(ft.set_result, result)
129+
self.loop.call_soon_threadsafe(ft.set_result, result)
128130

129131
def _errback(exc) -> None:
130132
excptn = ErrorMapper.build_exception(exc)
131-
self._loop.call_soon_threadsafe(ft.set_exception, excptn)
133+
self.loop.call_soon_threadsafe(ft.set_exception, excptn)
132134

133135
req_dict = req.req_to_dict(callback=_callback, errback=_errback)
134136
if not self.connected:
@@ -150,14 +152,14 @@ def execute_collection_request(self, req: CollectionRequest) -> Future[Any]:
150152
self._ensure_not_closed()
151153
self._ensure_connected()
152154

153-
ft = self._loop.create_future()
155+
ft = self.loop.create_future()
154156

155157
def _callback(result) -> None:
156-
self._loop.call_soon_threadsafe(ft.set_result, result)
158+
self.loop.call_soon_threadsafe(ft.set_result, result)
157159

158160
def _errback(exc) -> None:
159161
excptn = ErrorMapper.build_exception(exc)
160-
self._loop.call_soon_threadsafe(ft.set_exception, excptn)
162+
self.loop.call_soon_threadsafe(ft.set_exception, excptn)
161163

162164
req_dict = req.req_to_dict(callback=_callback, errback=_errback)
163165
self._execute_req(ft, req.op_name, req_dict)
@@ -167,16 +169,16 @@ def execute_connect_bucket_request(self, bucket_name: str) -> Future[None]:
167169
self._ensure_not_closed()
168170

169171
req = OpenBucketRequest(bucket_name)
170-
ft = self._loop.create_future()
172+
ft = self.loop.create_future()
171173

172174
def _callback(_) -> None:
173175
if not ft.done():
174-
self._loop.call_soon_threadsafe(ft.set_result, None)
176+
self.loop.call_soon_threadsafe(ft.set_result, None)
175177

176178
def _errback(ret: Any) -> None:
177179
excptn = ErrorMapper.build_exception(ret)
178180
if not ft.done():
179-
self._loop.call_soon_threadsafe(ft.set_exception, excptn)
181+
self.loop.call_soon_threadsafe(ft.set_exception, excptn)
180182

181183
req_dict = req.req_to_dict(callback=_callback, errback=_errback)
182184
if not self.connected:
@@ -189,16 +191,16 @@ def _errback(ret: Any) -> None:
189191
def execute_mgmt_request(self, req: MgmtRequest) -> Future[Any]:
190192
self._ensure_not_closed()
191193

192-
ft = self._loop.create_future()
194+
ft = self.loop.create_future()
193195

194196
def _callback(ret: Any) -> None:
195197
if not ft.done():
196-
self._loop.call_soon_threadsafe(ft.set_result, ret)
198+
self.loop.call_soon_threadsafe(ft.set_result, ret)
197199

198200
def _errback(ret: Any) -> None:
199201
excptn = ErrorMapper.build_exception(ret, mapping=req.error_map)
200202
if not ft.done():
201-
self._loop.call_soon_threadsafe(ft.set_exception, excptn)
203+
self.loop.call_soon_threadsafe(ft.set_exception, excptn)
202204

203205
req_dict = req.req_to_dict(callback=_callback, errback=_errback)
204206
if not self.connected:
@@ -244,17 +246,17 @@ def _execute_chained_req(self,
244246

245247
def _execute_close_connection_request(self) -> Future[None]:
246248
req = CloseConnectionRequest()
247-
ft = self._loop.create_future()
249+
ft = self.loop.create_future()
248250

249251
def _callback(_) -> None:
250252
if not ft.done():
251253
self._reset_connection()
252-
self._loop.call_soon_threadsafe(ft.set_result, None)
254+
self.loop.call_soon_threadsafe(ft.set_result, None)
253255

254256
def _errback(ret: Any) -> None:
255257
excptn = ErrorMapper.build_exception(ret)
256258
if not ft.done():
257-
self._loop.call_soon_threadsafe(ft.set_exception, excptn)
259+
self.loop.call_soon_threadsafe(ft.set_exception, excptn)
258260

259261
req_dict = req.req_to_dict(callback=_callback, errback=_errback)
260262
if not self.connected:
@@ -270,16 +272,16 @@ def _errback(ret: Any) -> None:
270272
return ft
271273

272274
def _execute_connect_request(self) -> Future[None]:
273-
ft = self._loop.create_future()
275+
ft = self.loop.create_future()
274276

275277
def _callback(_) -> None:
276278
if not ft.done():
277-
self._loop.call_soon_threadsafe(ft.set_result, None)
279+
self.loop.call_soon_threadsafe(ft.set_result, None)
278280

279281
def _errback(ret: Any) -> None:
280282
excptn = ErrorMapper.build_exception(ret)
281283
if not ft.done():
282-
self._loop.call_soon_threadsafe(ft.set_exception, excptn)
284+
self.loop.call_soon_threadsafe(ft.set_exception, excptn)
283285

284286
req_dict = self._connect_req.req_to_dict(callback=_callback, errback=_errback)
285287
self._execute_req(ft, self._connect_req.op_name, req_dict)
@@ -316,7 +318,10 @@ def _execute_req_sync(self, op_name: str, req_dict: Dict[str, Any]) -> Any:
316318

317319
def _get_loop(self, loop: Optional[AbstractEventLoop] = None) -> AbstractEventLoop:
318320
if not loop:
319-
loop = get_event_loop()
321+
if self._loop_validator:
322+
loop = self._loop_validator(loop)
323+
else:
324+
loop = get_event_loop()
320325

321326
if not loop.is_running():
322327
raise RuntimeError('Event loop is not running.')

acouchbase/logic/cluster_impl.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
class AsyncClusterImpl:
5555
def __init__(self, connstr: str, *options: object, **kwargs: object) -> None:
56-
loop = kwargs.pop('loop', None)
56+
loop: Optional[AbstractEventLoop] = kwargs.pop('loop', None)
5757
loop_validator = kwargs.pop('loop_validator', None)
5858
self._cluster_settings = ClusterSettings.build_cluster_settings(connstr, *options, **kwargs)
5959
connect_request = CreateConnectionRequest(self._cluster_settings.connstr,
@@ -62,7 +62,7 @@ def __init__(self, connstr: str, *options: object, **kwargs: object) -> None:
6262
# A connection is made when we create the client adapter, but it is an async operation that we cannot await
6363
# b/c the call needs to happen when we initialize a cluster (new cluster -> new client adapter). We await
6464
# the create connection future in whichever operation comes next.
65-
self._client_adapter = AsyncClientAdapter(loop, connect_request, loop_validator=loop_validator)
65+
self._client_adapter = AsyncClientAdapter(connect_request, loop=loop, loop_validator=loop_validator)
6666
self._request_builder = ClusterRequestBuilder()
6767
self._cluster_info: Optional[ClusterInfoResult] = None
6868
self._transactions: Optional[Transactions] = None

0 commit comments

Comments
 (0)