Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3062,9 +3062,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
message = ExecuteMessage(
prepared_statement.query_id, query.values, cl,
serial_cl, fetch_size, paging_state, timestamp,
skip_meta=bool(prepared_statement.result_metadata),
continuous_paging_options=continuous_paging_options,
result_metadata_id=prepared_statement.result_metadata_id)
continuous_paging_options=continuous_paging_options)
elif isinstance(query, BatchStatement):
if self._protocol_version < 2:
raise UnsupportedOperation(
Expand Down Expand Up @@ -5008,6 +5006,17 @@ def _query(self, host, message=None, cb=None):
self._connection = connection
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []

if self.prepared_statement and isinstance(message, ExecuteMessage):
has_result_metadata_id = self.prepared_statement.result_metadata_id is not None
has_result_metadata = bool(self.prepared_statement.result_metadata)
can_skip_meta = has_result_metadata_id and has_result_metadata and (
ProtocolVersion.uses_prepared_metadata(connection.protocol_version)
or connection.features.use_metadata_id
)
message.skip_meta = can_skip_meta
message.result_metadata_id = self.prepared_statement.result_metadata_id if can_skip_meta else None
message.use_metadata_id = connection.features.use_metadata_id

if cb is None:
cb = partial(self._set_result, host, connection, pool)

Expand Down Expand Up @@ -5171,6 +5180,27 @@ def _set_result(self, host, connection, pool, response):
self._paging_state = response.paging_state
self._col_names = response.column_names
self._col_types = response.column_types
new_result_metadata_id = getattr(response, 'result_metadata_id', None)
if self.prepared_statement and new_result_metadata_id is not None:
if response.column_metadata:
# Write result_metadata before result_metadata_id intentionally:
# a concurrent reader that still sees the old metadata_id will
# ask the server for full metadata and recover safely; a reader
# that sees the new metadata_id together with the new metadata
# is immediately correct. The opposite write order could expose
# a window where a reader uses a new metadata_id with stale metadata.
# Note: correctness of this ordering relies on CPython's GIL making
# individual attribute reads/writes effectively atomic. Other Python
# implementations (PyPy, Jython, etc.) may not provide this guarantee.
self.prepared_statement.result_metadata = response.column_metadata
else:
log.warning(
"Server sent a new result_metadata_id but no column metadata "
"for prepared statement %r. The cached column metadata will not "
"be updated; only result_metadata_id is refreshed.",
getattr(self.prepared_statement, 'query_id', None)
)
self.prepared_statement.result_metadata_id = new_result_metadata_id
if getattr(self.message, 'continuous_paging_options', None):
self._handle_continuous_paging_first_response(connection, response)
else:
Expand Down
13 changes: 9 additions & 4 deletions cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ def _write_query_params(self, f, protocol_version):
if self.timestamp is not None:
flags |= _PROTOCOL_TIMESTAMP_FLAG

if self.skip_meta:
flags |= _SKIP_METADATA_FLAG

if self.keyspace is not None:
if ProtocolVersion.uses_keyspace_flag(protocol_version):
flags |= _WITH_KEYSPACE_FLAG
Expand Down Expand Up @@ -629,9 +632,11 @@ class ExecuteMessage(_QueryMessage):
def __init__(self, query_id, query_params, consistency_level,
serial_consistency_level=None, fetch_size=None,
paging_state=None, timestamp=None, skip_meta=False,
continuous_paging_options=None, result_metadata_id=None):
continuous_paging_options=None, result_metadata_id=None,
use_metadata_id=False):
self.query_id = query_id
self.result_metadata_id = result_metadata_id
self.use_metadata_id = use_metadata_id
super(ExecuteMessage, self).__init__(query_params, consistency_level, serial_consistency_level, fetch_size,
paging_state, timestamp, skip_meta, continuous_paging_options)

Expand All @@ -640,8 +645,8 @@ def _write_query_params(self, f, protocol_version):

def send_body(self, f, protocol_version):
write_string(f, self.query_id)
if ProtocolVersion.uses_prepared_metadata(protocol_version):
write_string(f, self.result_metadata_id)
if ProtocolVersion.uses_prepared_metadata(protocol_version) or self.use_metadata_id:
write_string(f, self.result_metadata_id if self.result_metadata_id is not None else b'')
self._write_query_params(f, protocol_version)


Expand Down Expand Up @@ -745,7 +750,7 @@ def decode_row(row):

def recv_results_prepared(self, f, protocol_version, protocol_features, user_type_map):
self.query_id = read_binary_string(f)
if ProtocolVersion.uses_prepared_metadata(protocol_version):
if ProtocolVersion.uses_prepared_metadata(protocol_version) or protocol_features.use_metadata_id:
self.result_metadata_id = read_binary_string(f)
else:
self.result_metadata_id = None
Expand Down
16 changes: 14 additions & 2 deletions cassandra/protocol_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,34 @@
LWT_OPTIMIZATION_META_BIT_MASK = "LWT_OPTIMIZATION_META_BIT_MASK"
RATE_LIMIT_ERROR_EXTENSION = "SCYLLA_RATE_LIMIT_ERROR"
TABLETS_ROUTING_V1 = "TABLETS_ROUTING_V1"
USE_METADATA_ID = "SCYLLA_USE_METADATA_ID"

class ProtocolFeatures(object):
rate_limit_error = None
shard_id = 0
sharding_info = None
tablets_routing_v1 = False
lwt_info = None
use_metadata_id = False

def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None):
def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None,
use_metadata_id=False):
self.rate_limit_error = rate_limit_error
self.shard_id = shard_id
self.sharding_info = sharding_info
self.tablets_routing_v1 = tablets_routing_v1
self.lwt_info = lwt_info
self.use_metadata_id = use_metadata_id

@staticmethod
def parse_from_supported(supported):
rate_limit_error = ProtocolFeatures.maybe_parse_rate_limit_error(supported)
shard_id, sharding_info = ProtocolFeatures.parse_sharding_info(supported)
tablets_routing_v1 = ProtocolFeatures.parse_tablets_info(supported)
lwt_info = ProtocolFeatures.parse_lwt_info(supported)
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info)
use_metadata_id = ProtocolFeatures.parse_use_metadata_id(supported)
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info,
use_metadata_id)

@staticmethod
def maybe_parse_rate_limit_error(supported):
Expand All @@ -57,6 +63,8 @@ def add_startup_options(self, options):
options[TABLETS_ROUTING_V1] = ""
if self.lwt_info is not None:
options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask)
if self.use_metadata_id:
options[USE_METADATA_ID] = ""

@staticmethod
def parse_sharding_info(options):
Expand All @@ -81,6 +89,10 @@ def parse_sharding_info(options):
def parse_tablets_info(options):
return TABLETS_ROUTING_V1 in options

@staticmethod
def parse_use_metadata_id(options):
return USE_METADATA_ID in options

@staticmethod
def parse_lwt_info(options):
value_list = options.get(LWT_ADD_METADATA_MARK, [None])
Expand Down
44 changes: 44 additions & 0 deletions docs/scylla-specific.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,47 @@ https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md

Details on the sending tablet information to the drivers
https://github.com/scylladb/scylladb/blob/master/docs/dev/protocol-extensions.md#sending-tablet-info-to-the-drivers


Prepared Statement Metadata Caching (``SCYLLA_USE_METADATA_ID``)
----------------------------------------------------------------

When executing prepared SELECT statements, the driver normally requests the server
to skip sending full result metadata with each response (``skip_meta`` optimization),
relying on the metadata cached from the initial ``PREPARE`` call. However, if the
table schema changes after a statement is prepared (e.g., a column is added, removed,
or its type is altered), this cached metadata becomes stale — leading to decoding
errors or incorrect data.

ScyllaDB solves this by backporting the ``metadata_id`` mechanism from CQL native
protocol v5 as a v4 extension: ``SCYLLA_USE_METADATA_ID``. When this extension is
negotiated, the server includes a hash of the result metadata in the ``PREPARE``
response. The driver sends this hash back with every ``EXECUTE`` request. If the
schema has changed, the server sets the ``METADATA_CHANGED`` flag and returns the
new metadata hash together with the updated column definitions. The driver
automatically updates its cache and uses the new metadata to decode the current
response — all transparently, with no application code change required.

**Behaviour summary:**

- Automatically negotiated at connection time when the ScyllaDB node supports it.
- ``skip_meta`` is enabled (metadata omitted from EXECUTE responses) only when it
is safe: the connection must have negotiated ``SCYLLA_USE_METADATA_ID`` (or use
CQL v5), *and* the prepared statement must carry a ``result_metadata_id`` obtained
from PREPARE.
- When a schema change is detected by the server, the driver refreshes both the
cached column metadata and the metadata hash for that prepared statement so that
all subsequent executions benefit immediately.
- Statements prepared before the extension was negotiated (e.g., during a rolling
upgrade) retain ``result_metadata_id=None`` and fall back to always requesting
full metadata, which is the safest option.

**Current scope:** the optimization applies to any prepared statement whose
``PREPARE`` response includes non-empty result columns — in practice, SELECT
queries. UPDATE/INSERT/DELETE statements naturally return no result columns, so
their ``result_metadata`` is always empty and ``skip_meta`` is never set for
them. There is no code-level restriction to SELECT; the behaviour follows
directly from the data.

For full protocol details see the ScyllaDB CQL extensions documentation:
https://opensource.docs.scylladb.com/stable/cql/cql-extensions.html
Loading
Loading