2929 ErrorMapper ,
3030 ExceptionMap ,
3131 InternalSDKException )
32- from couchbase .exceptions import exception as BaseCouchbaseException
3332from couchbase .logic .binding_map import BindingMap
3433from couchbase .logic .bucket_types import CloseBucketRequest , OpenBucketRequest
3534from couchbase .logic .cluster_types import CloseConnectionRequest
36- from couchbase .logic .top_level_types import OpenOrCloseBucket , PyCapsuleType
35+ from couchbase .logic .pycbc_core import pycbc_connection
36+ from couchbase .logic .pycbc_core import pycbc_exception as PycbcCoreException
3737
3838if TYPE_CHECKING :
3939 from asyncio import AbstractEventLoop
@@ -51,24 +51,25 @@ def __init__(self,
5151 connect_req : CreateConnectionRequest ,
5252 loop_validator : Optional [Callable [[Optional [AbstractEventLoop ]], AbstractEventLoop ]] = None
5353 ) -> None :
54- self ._connection : Optional [PyCapsuleType ] = None
54+ num_io_threads = connect_req .options .get ('num_io_threads' , None )
55+ self ._connection = pycbc_connection (num_io_threads ) if num_io_threads is not None else pycbc_connection ()
5556 if loop_validator :
5657 self ._loop = loop_validator (loop )
5758 else :
5859 self ._loop = self ._get_loop (loop )
5960 self ._connect_req = connect_req
60- self ._binding_map = BindingMap ()
61+ self ._binding_map = BindingMap (self . _connection )
6162 self ._close_ft : Optional [Future [None ]] = None
6263 self ._connect_ft : Optional [Future [None ]] = None
6364 self ._closed = False
6465 self ._create_connection ()
6566
6667 @property
6768 def connected (self ) -> bool :
68- return self ._connection is not None
69+ return self ._connection is not None and self . _connection . connected
6970
7071 @property
71- def connection (self ) -> Optional [ PyCapsuleType ] :
72+ def connection (self ) -> pycbc_connection :
7273 return self ._connection
7374
7475 @property
@@ -109,12 +110,12 @@ def _errback(exc) -> None:
109110 excptn = ErrorMapper .build_exception (exc )
110111 self ._loop .call_soon_threadsafe (ft .set_exception , excptn )
111112
112- req_dict = req .req_to_dict (self . _connection , callback = _callback , errback = _errback )
113+ req_dict = req .req_to_dict (callback = _callback , errback = _errback )
113114 self ._execute_req (ft , req .op_name , req_dict )
114115 return ft
115116
116117 def execute_close_bucket_request (self , bucket_name : str ) -> Future [None ]:
117- req = CloseBucketRequest (bucket_name , OpenOrCloseBucket . CLOSE )
118+ req = CloseBucketRequest (bucket_name )
118119 return self .execute_bucket_request (req )
119120
120121 def execute_cluster_request (self , req : ClusterRequest ) -> Future [Any ]:
@@ -129,7 +130,7 @@ def _errback(exc) -> None:
129130 excptn = ErrorMapper .build_exception (exc )
130131 self ._loop .call_soon_threadsafe (ft .set_exception , excptn )
131132
132- req_dict = req .req_to_dict (self . _connection , callback = _callback , errback = _errback )
133+ req_dict = req .req_to_dict (callback = _callback , errback = _errback )
133134 if not self .connected :
134135 chained_ft = self ._execute_connect_request () if self ._connect_ft is None else self ._connect_ft
135136 chained_ft .add_done_callback (partial (self ._execute_chained_req , ft , req .op_name , req_dict ))
@@ -139,9 +140,9 @@ def _errback(exc) -> None:
139140
140141 def execute_cluster_request_sync (self , req : ClusterRequest ) -> Any :
141142 self ._ensure_not_closed ()
142- req_dict = req .req_to_dict (self . _connection )
143+ req_dict = req .req_to_dict ()
143144 ret = self ._execute_req_sync (req .op_name , req_dict )
144- if isinstance (ret , BaseCouchbaseException ):
145+ if isinstance (ret , PycbcCoreException ):
145146 raise ErrorMapper .build_exception (ret )
146147 return ret
147148
@@ -158,14 +159,14 @@ def _errback(exc) -> None:
158159 excptn = ErrorMapper .build_exception (exc )
159160 self ._loop .call_soon_threadsafe (ft .set_exception , excptn )
160161
161- req_dict = req .req_to_dict (self . _connection , callback = _callback , errback = _errback )
162+ req_dict = req .req_to_dict (callback = _callback , errback = _errback )
162163 self ._execute_req (ft , req .op_name , req_dict )
163164 return ft
164165
165166 def execute_connect_bucket_request (self , bucket_name : str ) -> Future [None ]:
166167 self ._ensure_not_closed ()
167168
168- req = OpenBucketRequest (bucket_name , OpenOrCloseBucket . OPEN )
169+ req = OpenBucketRequest (bucket_name )
169170 ft = self ._loop .create_future ()
170171
171172 def _callback (_ ) -> None :
@@ -177,7 +178,7 @@ def _errback(ret: Any) -> None:
177178 if not ft .done ():
178179 self ._loop .call_soon_threadsafe (ft .set_exception , excptn )
179180
180- req_dict = req .req_to_dict (self . _connection , callback = _callback , errback = _errback )
181+ req_dict = req .req_to_dict (callback = _callback , errback = _errback )
181182 if not self .connected :
182183 chained_ft = self ._execute_connect_request () if self ._connect_ft is None else self ._connect_ft
183184 chained_ft .add_done_callback (partial (self ._execute_chained_req , ft , req .op_name , req_dict ))
@@ -199,7 +200,7 @@ def _errback(ret: Any) -> None:
199200 if not ft .done ():
200201 self ._loop .call_soon_threadsafe (ft .set_exception , excptn )
201202
202- req_dict = req .req_to_dict (self . _connection , callback = _callback , errback = _errback )
203+ req_dict = req .req_to_dict (callback = _callback , errback = _errback )
203204 if not self .connected :
204205 chained_ft = self ._execute_connect_request () if self ._connect_ft is None else self ._connect_ft
205206 chained_ft .add_done_callback (partial (self ._execute_chained_req , ft , req .op_name , req_dict ))
@@ -239,7 +240,6 @@ def _execute_chained_req(self,
239240 ft .set_exception (exc )
240241 return
241242
242- req_dict ['conn' ] = self ._connection
243243 self ._execute_req (ft , op_name , req_dict )
244244
245245 def _execute_close_connection_request (self ) -> Future [None ]:
@@ -256,7 +256,7 @@ def _errback(ret: Any) -> None:
256256 if not ft .done ():
257257 self ._loop .call_soon_threadsafe (ft .set_exception , excptn )
258258
259- req_dict = req .req_to_dict (self . _connection , callback = _callback , errback = _errback )
259+ req_dict = req .req_to_dict (callback = _callback , errback = _errback )
260260 if not self .connected :
261261 # If we're closed, don't try to reconnect just to close again
262262 if self ._closed :
@@ -272,9 +272,8 @@ def _errback(ret: Any) -> None:
272272 def _execute_connect_request (self ) -> Future [None ]:
273273 ft = self ._loop .create_future ()
274274
275- def _callback (ret : PyCapsuleType ) -> None :
275+ def _callback (_ ) -> None :
276276 if not ft .done ():
277- self ._connection = ret
278277 self ._loop .call_soon_threadsafe (ft .set_result , None )
279278
280279 def _errback (ret : Any ) -> None :
0 commit comments