Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,9 @@ def fetchall_with_mapping():
self.fetchmany = fetchmany_with_mapping
self.fetchall = fetchall_with_mapping

Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_prepare_metadata_result_set() resets internal rownumber state, but it does not reset self.rowcount. Since _reset_cursor() doesn’t touch rowcount, catalog methods can temporarily expose a stale rowcount value from a prior statement until the first fetch updates it. Consider setting self.rowcount = -1 (consistent with execute() for result-set-producing statements) when preparing metadata result sets.

Suggested change
# Metadata methods produce a new result set, so rowcount is unknown
# until rows are fetched. Reset it to avoid exposing a stale value
# from a previous statement.
self.rowcount = -1

Copilot uses AI. Check for mistakes.
# Initialize rownumber tracking so fetchone() and iteration work
self._reset_rownumber()

# Return the cursor itself for method chaining
return self

Expand Down
145 changes: 145 additions & 0 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15961,3 +15961,148 @@ def reader(reader_id):
finally:
stop_event.set()
mssql_python.native_uuid = original


def test_catalog_fetchone_iteration_setup(cursor, db_connection):
"""Create test objects for catalog fetchone/iteration testing"""
try:
cursor.execute(
"IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'pytest_cat_fetch') "
"EXEC('CREATE SCHEMA pytest_cat_fetch')"
)
cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test_child")
cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test")

cursor.execute("""
CREATE TABLE pytest_cat_fetch.fetch_test (
id INT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
value DECIMAL(10,2),
ts DATETIME DEFAULT GETDATE()
)
""")
cursor.execute("""
CREATE TABLE pytest_cat_fetch.fetch_test_child (
child_id INT PRIMARY KEY,
parent_id INT NOT NULL,
CONSTRAINT fk_parent FOREIGN KEY (parent_id)
REFERENCES pytest_cat_fetch.fetch_test(id)
)
""")
db_connection.commit()
except Exception as e:
pytest.fail(f"Catalog fetchone/iteration setup failed: {e}")

Comment on lines +15966 to +15995
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These catalog tests depend on this separate setup test running before the other new tests. Pytest order can change with -k selection, parallelization, or plugins, which would make the other tests fail due to missing schema/tables. Consider making the setup a fixture (e.g., module-scoped, yield for teardown) or invoking a non-test_* helper setup function from each test that needs it (and similarly ensuring teardown runs via a finalizer).

Copilot uses AI. Check for mistakes.

def test_tables_fetchone(cursor, db_connection):
"""Test that fetchone() works on tables() result set (GH-505)"""
cursor.tables(table="fetch_test", schema="pytest_cat_fetch")
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row"
assert row.table_name.lower() == "fetch_test"
assert row.table_schem.lower() == "pytest_cat_fetch"
assert cursor.fetchone() is None


def test_tables_iteration(cursor, db_connection):
"""Test that 'for row in cursor.tables()' works (GH-505)"""
rows = list(cursor.tables(table="fetch_test", schema="pytest_cat_fetch"))
assert len(rows) == 1, "Iteration should yield 1 row"
assert rows[0].table_name.lower() == "fetch_test"


def test_columns_fetchone(cursor, db_connection):
"""Test that fetchone() works on columns() result set (GH-505)"""
cursor.columns(table="fetch_test", schema="pytest_cat_fetch")
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from columns()"
assert hasattr(row, "column_name")
assert row.table_name.lower() == "fetch_test"


def test_primarykeys_fetchone(cursor, db_connection):
"""Test that fetchone() works on primaryKeys() result set (GH-505)"""
cursor.primaryKeys(table="fetch_test", schema="pytest_cat_fetch")
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from primaryKeys()"
assert row.column_name.lower() == "id"
assert cursor.fetchone() is None


def test_foreignkeys_fetchone(cursor, db_connection):
"""Test that fetchone() works on foreignKeys() result set (GH-505)"""
cursor.foreignKeys(
table="fetch_test_child",
schema="pytest_cat_fetch",
)
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from foreignKeys()"
assert row.pkcolumn_name.lower() == "id"
assert row.fkcolumn_name.lower() == "parent_id"
assert cursor.fetchone() is None


def test_statistics_fetchone(cursor, db_connection):
"""Test that fetchone() works on statistics() result set (GH-505)"""
cursor.statistics(table="fetch_test", schema="pytest_cat_fetch")
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from statistics()"
assert row.table_name.lower() == "fetch_test"


def test_procedures_fetchone(cursor, db_connection):
"""Test that fetchone() works on procedures() result set (GH-505)"""
cursor.procedures()
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from procedures()"
assert hasattr(row, "procedure_name")


Comment on lines +16055 to +16060
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_procedures_fetchone assumes cursor.procedures() will always return at least one row in the current database, which may not hold on clean/minimal DBs or environments where system procedures aren’t returned. To make this deterministic, create a known test procedure (or reuse the existing test_procedures_setup objects) and call procedures(procedure=..., schema=...) so fetchone() is guaranteed to have a row to return.

Suggested change
cursor.procedures()
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from procedures()"
assert hasattr(row, "procedure_name")
proc_name = f"fetch_proc_{uuid.uuid4().hex[:8]}"
qualified_proc_name = f"pytest_cat_fetch.{proc_name}"
try:
cursor.execute(
f"""
CREATE PROCEDURE {qualified_proc_name}
AS
BEGIN
SELECT 1 AS value
END
"""
)
db_connection.commit()
cursor.procedures(procedure=proc_name, schema="pytest_cat_fetch")
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from procedures()"
assert hasattr(row, "procedure_name")
assert row.procedure_name.lower() == proc_name.lower()
assert cursor.fetchone() is None
finally:
cursor.execute(f"DROP PROCEDURE IF EXISTS {qualified_proc_name}")
db_connection.commit()

Copilot uses AI. Check for mistakes.
def test_rowid_columns_fetchone(cursor, db_connection):
"""Test that fetchone() works on rowIdColumns() result set (GH-505)"""
cursor.rowIdColumns(table="fetch_test", schema="pytest_cat_fetch")
# May or may not have rowid columns; just verify no InterfaceError
row = cursor.fetchone()
if row is not None:
assert hasattr(row, "column_name")


def test_rowver_columns_fetchone(cursor, db_connection):
"""Test that fetchone() works on rowVerColumns() result set (GH-505)"""
cursor.rowVerColumns(table="fetch_test", schema="pytest_cat_fetch")
# May or may not have rowver columns; just verify no InterfaceError
row = cursor.fetchone()
if row is not None:
assert hasattr(row, "column_name")


def test_gettypeinfo_fetchone(cursor, db_connection):
"""Test that fetchone() works on getTypeInfo() result set (GH-505)"""
cursor.getTypeInfo()
row = cursor.fetchone()
assert row is not None, "fetchone() should return a row from getTypeInfo()"
assert hasattr(row, "type_name")


def test_catalog_rownumber_increments_correctly(cursor, db_connection):
"""Test that rownumber increments correctly during fetchone() on catalog results (GH-505)"""
cursor.columns(table="fetch_test", schema="pytest_cat_fetch")
assert cursor.rownumber == -1

for expected_idx in range(4):
row = cursor.fetchone()
assert row is not None, f"Expected row at index {expected_idx}"
assert cursor.rownumber == expected_idx

assert cursor.fetchone() is None


def test_catalog_fetchone_iteration_cleanup(cursor, db_connection):
"""Clean up test objects for catalog fetchone/iteration testing"""
try:
cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test_child")
cursor.execute("DROP TABLE IF EXISTS pytest_cat_fetch.fetch_test")
cursor.execute("DROP SCHEMA IF EXISTS pytest_cat_fetch")
db_connection.commit()
except Exception as e:
pytest.fail(f"Catalog fetchone/iteration cleanup failed: {e}")
Loading