Skip to content

Commit d373e9b

Browse files
committed
adding c++ support for pooling
1 parent a9ee888 commit d373e9b

6 files changed

Lines changed: 296 additions & 17 deletions

File tree

mssql_python/pybind/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ execute_process(
9090
)
9191

9292
# Add module library
93-
add_library(ddbc_bindings MODULE ddbc_bindings.cpp connection/connection.cpp)
93+
add_library(ddbc_bindings MODULE ddbc_bindings.cpp connection/connection.cpp connection/connection_pool.cpp)
9494

9595
# Add include directories for your project
9696
target_include_directories(ddbc_bindings PRIVATE

mssql_python/pybind/connection/connection.cpp

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// taken up in future
66

77
#include "connection.h"
8+
#include "connection_pool.h"
89
#include <vector>
910
#include <pybind11/pybind11.h>
1011

@@ -16,8 +17,8 @@ SqlHandlePtr Connection::_envHandle = nullptr;
1617
// This class wraps low-level ODBC operations like connect/disconnect,
1718
// transaction control, and autocommit configuration.
1819
//-------------------------------------------------------------------------------------------------
19-
Connection::Connection(const std::wstring& conn_str, bool autocommit)
20-
: _connStr(conn_str) , _autocommit(autocommit) {
20+
Connection::Connection(const std::wstring& conn_str, bool use_pool)
21+
: _connStr(conn_str), _autocommit(false), _fromPool(use_pool) {
2122
if (!_envHandle) {
2223
LOG("Allocating environment handle");
2324
SQLHANDLE env = nullptr;
@@ -64,6 +65,7 @@ void Connection::connect(const py::dict& attrs_before) {
6465
(SQLWCHAR*)_connStr.c_str(), SQL_NTS,
6566
nullptr, 0, nullptr, SQL_DRIVER_NOPROMPT);
6667
checkError(ret);
68+
updateLastUsed();
6769
}
6870

6971
void Connection::disconnect() {
@@ -91,6 +93,7 @@ void Connection::commit() {
9193
if (!_dbcHandle) {
9294
ThrowStdException("Connection handle not allocated");
9395
}
96+
updateLastUsed();
9497
LOG("Committing transaction");
9598
SQLRETURN ret = SQLEndTran_ptr(SQL_HANDLE_DBC, _dbcHandle->get(), SQL_COMMIT);
9699
checkError(ret);
@@ -100,6 +103,7 @@ void Connection::rollback() {
100103
if (!_dbcHandle) {
101104
ThrowStdException("Connection handle not allocated");
102105
}
106+
updateLastUsed();
103107
LOG("Rolling back transaction");
104108
SQLRETURN ret = SQLEndTran_ptr(SQL_HANDLE_DBC, _dbcHandle->get(), SQL_ROLLBACK);
105109
checkError(ret);
@@ -132,6 +136,7 @@ SqlHandlePtr Connection::allocStatementHandle() {
132136
if (!_dbcHandle) {
133137
ThrowStdException("Connection handle not allocated");
134138
}
139+
updateLastUsed();
135140
LOG("Allocating statement handle");
136141
SQLHANDLE stmt = nullptr;
137142
SQLRETURN ret = SQLAllocHandle_ptr(SQL_HANDLE_STMT, _dbcHandle->get(), &stmt);
@@ -185,4 +190,99 @@ void Connection::applyAttrsBefore(const py::dict& attrs) {
185190
}
186191
}
187192
}
188-
}
193+
}
194+
195+
bool Connection::isAlive() const {
196+
if (!_dbcHandle) {
197+
ThrowStdException("Connection handle not allocated");
198+
}
199+
SQLUINTEGER status;
200+
SQLRETURN ret = SQLGetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_CONNECTION_DEAD,
201+
&status, 0, nullptr);
202+
return SQL_SUCCEEDED(ret) && status == SQL_CD_FALSE;
203+
}
204+
205+
bool Connection::reset() {
206+
if (!_dbcHandle) {
207+
ThrowStdException("Connection handle not allocated");
208+
}
209+
LOG("Resetting connection via SQL_ATTR_RESET_CONNECTION");
210+
SQLULEN reset = SQL_TRUE;
211+
SQLRETURN ret = SQLSetConnectAttr_ptr(
212+
_dbcHandle->get(),
213+
SQL_ATTR_RESET_CONNECTION,
214+
(SQLPOINTER)SQL_RESET_CONNECTION_YES,
215+
SQL_IS_INTEGER);
216+
if (!SQL_SUCCEEDED(ret)) {
217+
LOG("Failed to reset connection. Marking as dead.");
218+
disconnect();
219+
return false;
220+
}
221+
updateLastUsed();
222+
return true;
223+
}
224+
225+
void Connection::updateLastUsed() {
226+
_lastUsed = std::chrono::steady_clock::now();
227+
}
228+
229+
std::chrono::steady_clock::time_point Connection::lastUsed() const {
230+
return _lastUsed;
231+
}
232+
233+
ConnectionHandle::ConnectionHandle(const std::wstring& connStr, bool usePool, const py::dict& attrsBefore)
234+
: _connStr(connStr), _usePool(usePool) {
235+
if (_usePool) {
236+
_conn = ConnectionPoolManager::getInstance().acquireConnection(connStr, attrsBefore);
237+
} else {
238+
_conn = std::make_shared<Connection>(connStr, false);
239+
_conn->connect(attrsBefore);
240+
}
241+
}
242+
243+
void ConnectionHandle::close() {
244+
if (!_conn) {
245+
ThrowStdException("Connection object is not initialized");
246+
}
247+
if (_usePool) {
248+
ConnectionPoolManager::getInstance().returnConnection(_connStr, _conn);
249+
} else {
250+
_conn->disconnect();
251+
}
252+
_conn = nullptr;
253+
}
254+
255+
void ConnectionHandle::commit() {
256+
if (!_conn) {
257+
ThrowStdException("Connection object is not initialized");
258+
}
259+
_conn->commit();
260+
}
261+
262+
void ConnectionHandle::rollback() {
263+
if (!_conn) {
264+
ThrowStdException("Connection object is not initialized");
265+
}
266+
_conn->rollback();
267+
}
268+
269+
void ConnectionHandle::setAutocommit(bool enabled) {
270+
if (!_conn) {
271+
ThrowStdException("Connection object is not initialized");
272+
}
273+
_conn->setAutocommit(enabled);
274+
}
275+
276+
bool ConnectionHandle::getAutocommit() const {
277+
if (!_conn) {
278+
ThrowStdException("Connection object is not initialized");
279+
}
280+
return _conn->getAutocommit();
281+
}
282+
283+
SqlHandlePtr ConnectionHandle::allocStatementHandle() {
284+
if (!_conn) {
285+
ThrowStdException("Connection object is not initialized");
286+
}
287+
return _conn->allocStatementHandle();
288+
}

mssql_python/pybind/connection/connection.h

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
class Connection {
1515
public:
16-
Connection(const std::wstring& conn_str, bool autocommit = false);
16+
Connection(const std::wstring& connStr, bool fromPool);
1717
~Connection();
1818

1919
// Establish the connection using the stored connection string.
@@ -34,8 +34,16 @@ class Connection {
3434
// Check whether autocommit is enabled.
3535
bool getAutocommit() const;
3636

37+
bool isAlive() const;
38+
39+
bool reset();
40+
41+
void updateLastUsed();
42+
43+
std::chrono::steady_clock::time_point lastUsed() const;
44+
3745
// Allocate a new statement handle on this connection.
38-
SqlHandlePtr allocStatementHandle();
46+
SqlHandlePtr allocStatementHandle();
3947

4048
private:
4149
void allocateDbcHandle();
@@ -44,9 +52,26 @@ class Connection {
4452
void applyAttrsBefore(const py::dict& attrs_before);
4553

4654
std::wstring _connStr;
47-
bool _usePool = false;
55+
bool _fromPool = false;
4856
bool _autocommit = true;
4957
SqlHandlePtr _dbcHandle;
50-
5158
static SqlHandlePtr _envHandle;
59+
std::chrono::steady_clock::time_point _lastUsed;
5260
};
61+
62+
class ConnectionHandle {
63+
public:
64+
ConnectionHandle(const std::wstring& connStr, bool usePool, const py::dict& attrsBefore = py::dict());
65+
66+
void close();
67+
void commit();
68+
void rollback();
69+
void setAutocommit(bool enabled);
70+
bool getAutocommit() const;
71+
SqlHandlePtr allocStatementHandle();
72+
73+
private:
74+
std::shared_ptr<Connection> _conn;
75+
bool _usePool;
76+
std::wstring _connStr;
77+
};
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include "connection_pool.h"
2+
#include <iostream>
3+
#include <exception>
4+
5+
ConnectionPool::ConnectionPool(size_t max_size, int idle_timeout_secs)
6+
: _max_size(max_size), _idle_timeout_secs(idle_timeout_secs), _current_size(0) {}
7+
8+
std::shared_ptr<Connection> ConnectionPool::acquire(const std::wstring& connStr, const py::dict& attrs_before) {
9+
std::lock_guard<std::mutex> lock(_mutex);
10+
auto now = std::chrono::steady_clock::now();
11+
size_t before = _pool.size();
12+
_pool.erase(std::remove_if(_pool.begin(), _pool.end(), [&](const std::shared_ptr<Connection>& conn) {
13+
auto idle_time = std::chrono::duration_cast<std::chrono::seconds>(now - conn->lastUsed()).count();
14+
if (idle_time > _idle_timeout_secs) {
15+
conn->disconnect();
16+
return true;
17+
}
18+
return false;
19+
}), _pool.end());
20+
size_t pruned = before - _pool.size();
21+
_current_size = (_current_size >= pruned) ? (_current_size - pruned) : 0;
22+
23+
while (!_pool.empty()) {
24+
auto conn = _pool.front();
25+
_pool.pop_front();
26+
if (conn->isAlive()) {
27+
if (!conn->reset()) {
28+
continue;
29+
}
30+
return conn;
31+
} else {
32+
conn->disconnect();
33+
--_current_size;
34+
}
35+
}
36+
if (_current_size < _max_size) {
37+
auto conn = std::make_shared<Connection>(connStr, true);
38+
conn->connect(attrs_before);
39+
return conn;
40+
} else {
41+
LOG("Cannot acquire connection: pool size limit reached");
42+
return nullptr;
43+
}
44+
}
45+
46+
void ConnectionPool::release(std::shared_ptr<Connection> conn) {
47+
std::lock_guard<std::mutex> lock(_mutex);
48+
if (_pool.size() < _max_size) {
49+
conn->updateLastUsed();
50+
_pool.push_back(conn);
51+
}
52+
else {
53+
conn->disconnect();
54+
if (_current_size > 0) --_current_size;
55+
}
56+
}
57+
58+
ConnectionPoolManager& ConnectionPoolManager::getInstance() {
59+
static ConnectionPoolManager manager;
60+
return manager;
61+
}
62+
63+
std::shared_ptr<Connection> ConnectionPoolManager::acquireConnection(const std::wstring& connStr, const py::dict& attrs_before) {
64+
std::lock_guard<std::mutex> lock(_manager_mutex);
65+
66+
auto& pool = _pools[connStr];
67+
if (!pool) {
68+
LOG("Creating new connection pool");
69+
pool = std::make_shared<ConnectionPool>(_default_max_size, _default_idle_secs);
70+
}
71+
return pool->acquire(connStr, attrs_before);
72+
}
73+
74+
void ConnectionPoolManager::returnConnection(const std::wstring& conn_str, const std::shared_ptr<Connection> conn) {
75+
std::lock_guard<std::mutex> lock(_manager_mutex);
76+
if (_pools.find(conn_str) != _pools.end()) {
77+
_pools[conn_str]->release((conn));
78+
}
79+
}
80+
81+
void ConnectionPoolManager::configure(int max_size, int idle_timeout_secs) {
82+
std::lock_guard<std::mutex> lock(_manager_mutex);
83+
_default_max_size = max_size;
84+
_default_idle_secs = idle_timeout_secs;
85+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
// INFO|TODO - Note that is file is Windows specific right now. Making it arch agnostic will be
5+
// taken up in future.
6+
7+
#pragma once
8+
#include <deque>
9+
#include <unordered_map>
10+
#include <memory>
11+
#include <mutex>
12+
#include <string>
13+
#include <chrono>
14+
#include "connection.h"
15+
16+
// Manages a fixed-size pool of reusable database connections for a single connection string
17+
class ConnectionPool {
18+
public:
19+
ConnectionPool(size_t max_size, int idle_timeout_secs);
20+
21+
// Acquires a connection from the pool or creates a new one if under limit
22+
std::shared_ptr<Connection> acquire(const std::wstring& connStr, const py::dict& attrs_before = py::dict());
23+
24+
// Returns a connection to the pool for reuse
25+
void release(std::shared_ptr<Connection> conn);
26+
27+
private:
28+
size_t _max_size; // Maximum number of connections allowed
29+
int _idle_timeout_secs; // Idle time before connections are considered stale
30+
size_t _current_size = 0;
31+
std::deque<std::shared_ptr<Connection>> _pool; // Available connections
32+
std::mutex _mutex; // Mutex for thread-safe access
33+
};
34+
35+
// Singleton manager that handles multiple pools keyed by connection string
36+
class ConnectionPoolManager {
37+
public:
38+
// Returns the singleton instance of the manager
39+
static ConnectionPoolManager& getInstance();
40+
41+
void configure(int max_size, int idle_timeout);
42+
43+
// Gets a connection from the appropriate pool (creates one if none exists)
44+
std::shared_ptr<Connection> acquireConnection(const std::wstring& conn_str, const py::dict& attrs_before = py::dict());
45+
46+
// Returns a connection to its original pool
47+
void returnConnection(const std::wstring& conn_str, std::shared_ptr<Connection> conn);
48+
49+
private:
50+
ConnectionPoolManager() = default;
51+
~ConnectionPoolManager() = default;
52+
53+
// Map from connection string to connection pool
54+
std::unordered_map<std::wstring, std::shared_ptr<ConnectionPool>> _pools;
55+
56+
// Protects access to the _pools map
57+
std::mutex _manager_mutex;
58+
size_t _default_max_size = 10;
59+
int _default_idle_secs = 300;
60+
61+
// Prevent copying
62+
ConnectionPoolManager(const ConnectionPoolManager&) = delete;
63+
ConnectionPoolManager& operator=(const ConnectionPoolManager&) = delete;
64+
};

0 commit comments

Comments
 (0)