1- use crate :: connection:: db_connector:: DatabaseConnection ;
21use crate :: connection:: database_type:: DatabaseType ;
32use crate :: connection:: datasources:: DatasourceConfig ;
3+ use crate :: connection:: db_connector:: DatabaseConnection ;
44use std:: collections:: HashMap ;
55use std:: collections:: VecDeque ;
6- use std:: sync:: Arc ;
76use std:: error:: Error ;
8- use tokio :: sync:: Mutex ;
7+ use std :: sync:: Arc ;
98use std:: time:: Duration ;
9+ use tokio:: sync:: Mutex ;
1010
1111/// A simple, efficient connection pool for Canyon-SQL
12- ///
12+ ///
1313/// This pool maintains a collection of database connections that can be
1414/// reused across multiple operations, significantly improving performance
1515/// by avoiding the overhead of creating new connections for each query.
@@ -24,14 +24,18 @@ pub struct ConnectionPool {
2424 /// Database type for this pool
2525 db_type : DatabaseType ,
2626 /// Connection factory function
27- factory : Box < dyn Fn ( ) -> Result < DatabaseConnection , Box < dyn Error + Send + Sync > > + Send + Sync > ,
27+ factory :
28+ Box < dyn Fn ( ) -> Result < DatabaseConnection , Box < dyn Error + Send + Sync > > + Send + Sync > ,
2829}
2930
3031impl ConnectionPool {
3132 /// Creates a new connection pool
3233 pub fn new (
3334 db_type : DatabaseType ,
34- factory : impl Fn ( ) -> Result < DatabaseConnection , Box < dyn Error + Send + Sync > > + Send + Sync + ' static ,
35+ factory : impl Fn ( ) -> Result < DatabaseConnection , Box < dyn Error + Send + Sync > >
36+ + Send
37+ + Sync
38+ + ' static ,
3539 min_size : usize ,
3640 max_size : usize ,
3741 ) -> Self {
@@ -45,12 +49,14 @@ impl ConnectionPool {
4549 }
4650
4751 /// Gets a connection from the pool
48- ///
52+ ///
4953 /// If a connection is available, it's returned immediately.
5054 /// If no connections are available and the pool hasn't reached max_size,
5155 /// a new connection is created.
5256 /// If the pool is at max_size, this will wait for a connection to become available.
53- pub async fn get_connection ( & mut self ) -> Result < DatabaseConnection , Box < dyn Error + Send + Sync > > {
57+ pub async fn get_connection (
58+ & mut self ,
59+ ) -> Result < DatabaseConnection , Box < dyn Error + Send + Sync > > {
5460 // Try to get an existing connection
5561 if let Some ( conn) = self . connections . pop_front ( ) {
5662 return Ok ( conn) ;
@@ -64,13 +70,13 @@ impl ConnectionPool {
6470 // Wait for a connection to become available
6571 // This is a simple implementation - in production you might want more sophisticated waiting
6672 tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
67-
73+
6874 // Use Box::pin to avoid recursion issues
6975 Box :: pin ( self . get_connection ( ) ) . await
7076 }
7177
7278 /// Returns a connection to the pool
73- ///
79+ ///
7480 /// If the pool is at max_size, the connection is dropped.
7581 /// Otherwise, it's added back to the pool for reuse.
7682 pub fn return_connection ( & mut self , conn : DatabaseConnection ) {
@@ -104,12 +110,14 @@ pub struct PooledConnection {
104110
105111impl PooledConnection {
106112 /// Creates a new pooled connection wrapper
107- pub async fn new ( pool : Arc < Mutex < ConnectionPool > > ) -> Result < Self , Box < dyn Error + Send + Sync > > {
113+ pub async fn new (
114+ pool : Arc < Mutex < ConnectionPool > > ,
115+ ) -> Result < Self , Box < dyn Error + Send + Sync > > {
108116 let connection = {
109117 let mut pool_guard = pool. lock ( ) . await ;
110118 pool_guard. get_connection ( ) . await ?
111119 } ;
112-
120+
113121 Ok ( Self {
114122 connection : Some ( connection) ,
115123 pool,
@@ -160,7 +168,7 @@ impl PoolManager {
160168 datasource : & DatasourceConfig ,
161169 ) -> Result < ( ) , Box < dyn Error + Send + Sync > > {
162170 let db_type = datasource. get_db_type ( ) ;
163-
171+
164172 // Create a factory function for this datasource
165173 let factory = {
166174 let datasource = datasource. clone ( ) ;
@@ -172,32 +180,38 @@ impl PoolManager {
172180 } ;
173181
174182 let pool = ConnectionPool :: new (
175- db_type,
176- factory,
177- 2 , // min_size
183+ db_type, factory, 2 , // min_size
178184 10 , // max_size
179185 ) ;
180186
181- self . pools . insert ( name. to_string ( ) , Arc :: new ( Mutex :: new ( pool) ) ) ;
187+ self . pools
188+ . insert ( name. to_string ( ) , Arc :: new ( Mutex :: new ( pool) ) ) ;
182189 Ok ( ( ) )
183190 }
184191
185192 /// Gets a pooled connection by name
186- pub async fn get_connection ( & self , name : & str ) -> Result < PooledConnection , Box < dyn Error + Send + Sync > > {
187- let pool = self . pools
193+ pub async fn get_connection (
194+ & self ,
195+ name : & str ,
196+ ) -> Result < PooledConnection , Box < dyn Error + Send + Sync > > {
197+ let pool = self
198+ . pools
188199 . get ( name)
189200 . ok_or_else ( || format ! ( "Pool '{}' not found" , name) ) ?;
190-
201+
191202 PooledConnection :: new ( pool. clone ( ) ) . await
192203 }
193204
194205 /// Gets the default connection pool
195- pub async fn get_default_connection ( & self ) -> Result < PooledConnection , Box < dyn Error + Send + Sync > > {
196- let pool = self . pools
206+ pub async fn get_default_connection (
207+ & self ,
208+ ) -> Result < PooledConnection , Box < dyn Error + Send + Sync > > {
209+ let pool = self
210+ . pools
197211 . values ( )
198212 . next ( )
199213 . ok_or ( "No connection pools available" ) ?;
200-
214+
201215 PooledConnection :: new ( pool. clone ( ) ) . await
202216 }
203217
@@ -212,9 +226,9 @@ static POOL_MANAGER: std::sync::OnceLock<Arc<Mutex<PoolManager>>> = std::sync::O
212226
213227/// Gets the global pool manager instance
214228pub fn get_pool_manager ( ) -> Arc < Mutex < PoolManager > > {
215- POOL_MANAGER . get_or_init ( || {
216- Arc :: new ( Mutex :: new ( PoolManager :: new ( ) ) )
217- } ) . clone ( )
229+ POOL_MANAGER
230+ . get_or_init ( || Arc :: new ( Mutex :: new ( PoolManager :: new ( ) ) ) )
231+ . clone ( )
218232}
219233
220234// Implement DbConnection trait for PooledConnection
@@ -223,11 +237,11 @@ impl crate::connection::contracts::DbConnection for PooledConnection {
223237 & self ,
224238 stmt : & str ,
225239 params : & [ & dyn crate :: query:: parameters:: QueryParameter ] ,
226- ) -> impl std:: future:: Future < Output = Result < crate :: rows:: CanyonRows , Box < dyn Error + Send + Sync > > > + Send {
240+ ) -> impl std:: future:: Future <
241+ Output = Result < crate :: rows:: CanyonRows , Box < dyn Error + Send + Sync > > ,
242+ > + Send {
227243 let conn = self . connection ( ) ;
228- async move {
229- conn. query_rows ( stmt, params) . await
230- }
244+ async move { conn. query_rows ( stmt, params) . await }
231245 }
232246
233247 fn query < S , R > (
@@ -241,9 +255,7 @@ impl crate::connection::contracts::DbConnection for PooledConnection {
241255 Vec < R > : std:: iter:: FromIterator < <R as crate :: mapper:: RowMapper >:: Output > ,
242256 {
243257 let conn = self . connection ( ) ;
244- async move {
245- conn. query ( stmt, params) . await
246- }
258+ async move { conn. query ( stmt, params) . await }
247259 }
248260
249261 fn query_one < R > (
@@ -255,9 +267,7 @@ impl crate::connection::contracts::DbConnection for PooledConnection {
255267 R : crate :: mapper:: RowMapper ,
256268 {
257269 let conn = self . connection ( ) ;
258- async move {
259- conn. query_one :: < R > ( stmt, params) . await
260- }
270+ async move { conn. query_one :: < R > ( stmt, params) . await }
261271 }
262272
263273 fn query_one_for < T : crate :: rows:: FromSqlOwnedValue < T > > (
@@ -266,9 +276,7 @@ impl crate::connection::contracts::DbConnection for PooledConnection {
266276 params : & [ & dyn crate :: query:: parameters:: QueryParameter ] ,
267277 ) -> impl std:: future:: Future < Output = Result < T , Box < dyn Error + Send + Sync > > > + Send {
268278 let conn = self . connection ( ) ;
269- async move {
270- conn. query_one_for ( stmt, params) . await
271- }
279+ async move { conn. query_one_for ( stmt, params) . await }
272280 }
273281
274282 fn execute (
@@ -277,14 +285,12 @@ impl crate::connection::contracts::DbConnection for PooledConnection {
277285 params : & [ & dyn crate :: query:: parameters:: QueryParameter ] ,
278286 ) -> impl std:: future:: Future < Output = Result < u64 , Box < dyn Error + Send + Sync > > > + Send {
279287 let conn = self . connection ( ) ;
280- async move {
281- conn. execute ( stmt, params) . await
282- }
288+ async move { conn. execute ( stmt, params) . await }
283289 }
284290
285291 fn get_database_type (
286292 & self ,
287293 ) -> Result < crate :: connection:: database_type:: DatabaseType , Box < dyn Error + Send + Sync > > {
288294 self . connection ( ) . get_database_type ( )
289295 }
290- }
296+ }
0 commit comments