Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
aaf9897
Add tests for TTL EXPORT partition (test-first)
mkmkme May 19, 2026
399bd77
Add `EXPORT` TTL mode for partition export to a destination table
mkmkme May 19, 2026
0fb5adb
Add per-part `export_ttl` info for partition export TTLs
mkmkme May 19, 2026
be213b7
Switch TTL EXPORT syntax to `EXPORT TO TABLE`
mkmkme May 20, 2026
0664ab8
Add `export_origin` to partition export manifests
mkmkme May 20, 2026
97d6ae1
Add `export_merge_tree_partition_mark_as_ttl` and ttl-marker invariant
mkmkme May 20, 2026
77df5e6
Verify export destination compatibility at TTL DDL time
mkmkme May 20, 2026
7a4acf4
Add `TTLExportScheduler` + table-level backoff settings
mkmkme May 20, 2026
f7d8a27
Wire `TTLExportScheduler` into `StorageReplicatedMergeTree`
mkmkme May 20, 2026
b876bb3
fix a test broken during rebase
mkmkme May 20, 2026
3e6e29e
use export_merge_tree_partition_task_entries instead of requests to z…
mkmkme May 20, 2026
0065763
Defer ttl-marker cleanup until after the new manifest is durable
mkmkme May 20, 2026
e7e7287
Register new TTL EXPORT settings in SettingsChangesHistory
mkmkme May 20, 2026
1ebd610
processExportTTL: fall back to storage.getStorageID().getDatabaseName()
mkmkme May 21, 2026
c3b83ac
Walk TTL EXPORT partitions in expiration-max order, not lex order
mkmkme May 21, 2026
48097f2
Store TTL EXPORT destination database and table separately
mkmkme May 21, 2026
e148b48
Guard ttl-marker move by expiration max, not partition_id lex order
mkmkme May 21, 2026
3428e69
Reject TTL ... EXPORT TO TABLE on non-replicated MergeTree at DDL time
mkmkme May 21, 2026
dd3a2ac
Tighten ttl-marker guard: collect only the two partitions of interest
mkmkme May 21, 2026
335938f
Fix use-after-free in TTL EXPORT backward-marker guard
mkmkme May 21, 2026
153b4a1
Resolve TTL EXPORT destination at the interpreter level
mkmkme May 21, 2026
f6cbe79
Persist TTL EXPORT high-water mark on the manifest
mkmkme May 21, 2026
45e0442
Tie-break TTL EXPORT scheduler walk by `(expiration_max, partition_id)`
mkmkme May 21, 2026
0469250
Wake TTL EXPORT scheduler on replicas receiving MODIFY TTL via replic…
mkmkme May 21, 2026
37eca94
drop the unused parameter from getPartitionExportTTLMax
mkmkme May 22, 2026
c7ed5f1
TTLExportScheduler: drop the backoff scheduler and reschedule agressi…
mkmkme May 22, 2026
3767581
Verify Iceberg partition compatibility at TTL DDL time
mkmkme May 22, 2026
14e5461
added tests for schema changing in-flight
mkmkme May 22, 2026
4abd8db
Merge remote-tracking branch 'origin/antalya-26.3' into mkmkme/antaly…
mkmkme May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7531,6 +7531,9 @@ Overwrite file if it already exists when exporting a merge tree part
)", 0) \
DECLARE(Bool, export_merge_tree_partition_force_export, false, R"(
Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
DECLARE(Bool, export_merge_tree_partition_mark_as_ttl, false, R"(
When set on `ALTER ... EXPORT PARTITION`, marks the manifest with `export_origin = 'ttl'` so it is treated as if submitted by the TTL scheduler: it is exempt from manifest-TTL eviction and participates in the cross-partition ordering check against other ttl-origin manifests. The TTL scheduler always sets this implicitly when it submits.
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
Maximum number of retries for exporting a merge tree part in an export partition task
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya",
{
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"export_merge_tree_partition_mark_as_ttl", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down Expand Up @@ -1151,6 +1152,7 @@ const VersionToSettingsChangesMap & getMergeTreeSettingsChangesHistory()
{
addSettingsChanges(merge_tree_settings_changes_history, "26.3",
{
{"export_merge_tree_partition_ttl_poll_interval_seconds", 5, 5, "New setting for the TTL EXPORT scheduler poll interval."},
{"vertical_merge_optimize_ttl_delete", false, true, "Allow vertical merge algorithm for merges that need to remove rows expired by TTL"},
{"shared_merge_tree_replica_set_max_lifetime_seconds", 300, 300, "New setting"},
{"auto_statistics_types", "", "minmax, uniq", "Enable auto statistics by default"},
Expand Down
5 changes: 3 additions & 2 deletions src/Databases/DatabasesCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ void validateCreateQuery(const ASTCreateQuery & query, ContextPtr context)
primary_key = KeyDescription::getKeyFromAST(storage.order_by->ptr(), columns_desc, context);
if (storage.primary_key)
primary_key = KeyDescription::getKeyFromAST(storage.primary_key->ptr(), columns_desc, context);
KeyDescription partition_key;
if (storage.partition_by)
KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context);
partition_key = KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context);
if (storage.sample_by)
KeyDescription::getKeyFromAST(storage.sample_by->ptr(), columns_desc, context);
if (storage.ttl_table && primary_key.has_value())
TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, true);
TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, partition_key, true);
}
}

Expand Down
12 changes: 11 additions & 1 deletion src/Interpreters/AddDefaultDatabaseVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class AddDefaultDatabaseVisitor
if (!tryVisitDynamicCast<ASTAlterQuery>(parent, ast) &&
!tryVisitDynamicCast<ASTQueryWithTableAndOutput>(parent, ast) &&
!tryVisitDynamicCast<ASTRenameQuery>(parent, ast) &&
!tryVisitDynamicCast<ASTFunction>(parent, ast))
!tryVisitDynamicCast<ASTFunction>(parent, ast) &&
!tryVisitDynamicCast<ASTTTLElement>(parent, ast))
{}
}

Expand Down Expand Up @@ -314,6 +315,15 @@ class AddDefaultDatabaseVisitor
}
}

void visitDDL(ASTPtr & /* parent */, ASTTTLElement & node, ASTPtr & /* ast */) const
{
if (only_replace_current_database_function)
return;

if (node.mode == TTLMode::EXPORT && node.destination_database.empty())
node.destination_database = database_name;
}

void visitDDL(ASTPtr & parent, ASTFunction & function, ASTPtr & node) const
{
if (function.name == "currentDatabase")
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
visitor.visit(*create.refresh_strategy);
}

if (create.storage && create.storage->ttl_table)
{
ASTPtr ttl_table_ptr = create.storage->ttl_table->ptr();
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
visitor.visitDDL(ttl_table_ptr);
}

if (create.columns_list)
{
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
Expand Down
12 changes: 12 additions & 0 deletions src/Parsers/ASTTTLElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ void ASTTTLElement::formatImpl(WriteBuffer & ostr, const FormatSettings & settin
ostr << " RECOMPRESS ";
recompression_codec->format(ostr, settings, state, frame);
}
else if (mode == TTLMode::EXPORT)
{
if (destination_type != DataDestinationType::TABLE)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unsupported destination type {} for TTL EXPORT",
magic_enum::enum_name(destination_type));

ostr << " EXPORT TO TABLE ";
if (!destination_database.empty())
ostr << backQuoteIfNeed(destination_database) << '.';
ostr << backQuoteIfNeed(destination_name);
}
else if (mode == TTLMode::DELETE)
{
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions.
Expand Down
14 changes: 13 additions & 1 deletion src/Parsers/ASTTTLElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ class ASTTTLElement : public IAST
public:
TTLMode mode;
DataDestinationType destination_type;
/// For TTLMode::EXPORT: the destination database; empty if the user wrote an unqualified
/// table name. For other modes (MOVE), unused.
String destination_database;
/// For TTLMode::EXPORT: just the destination table name. Never the joined `db.table` form,
/// so quoted table names that legitimately contain dots round-trip losslessly.
/// For TTLMode::MOVE: the disk or volume name.
String destination_name;
bool if_exists = false;

Expand All @@ -23,9 +29,15 @@ class ASTTTLElement : public IAST

ASTPtr recompression_codec;

ASTTTLElement(TTLMode mode_, DataDestinationType destination_type_, const String & destination_name_, bool if_exists_)
ASTTTLElement(
TTLMode mode_,
DataDestinationType destination_type_,
const String & destination_database_,
const String & destination_name_,
bool if_exists_)
: mode(mode_)
, destination_type(destination_type_)
, destination_database(destination_database_)
, destination_name(destination_name_)
, if_exists(if_exists_)
, ttl_expr_pos(-1)
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ namespace DB
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(EXPORT_TO_TABLE, "EXPORT TO TABLE") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
15 changes: 14 additions & 1 deletion src/Parsers/ExpressionElementParsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>

#include <Interpreters/StorageID.h>

Expand Down Expand Up @@ -2449,6 +2450,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_set(Keyword::SET);
ParserKeyword s_recompress(Keyword::RECOMPRESS);
ParserKeyword s_codec(Keyword::CODEC);
ParserKeyword s_export_to_table(Keyword::EXPORT_TO_TABLE);
ParserKeyword s_materialize_ttl(Keyword::MATERIALIZE_TTL);
ParserKeyword s_remove_ttl(Keyword::REMOVE_TTL);
ParserKeyword s_modify_ttl(Keyword::MODIFY_TTL);
Expand Down Expand Up @@ -2476,6 +2478,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)

TTLMode mode;
DataDestinationType destination_type = DataDestinationType::DELETE;
String destination_database;
String destination_name;

if (s_to_disk.ignore(pos, expected))
Expand All @@ -2496,6 +2499,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
mode = TTLMode::RECOMPRESS;
}
else if (s_export_to_table.ignore(pos, expected))
{
mode = TTLMode::EXPORT;
destination_type = DataDestinationType::TABLE;
}
else
{
/// DELETE is the default mode.
Expand Down Expand Up @@ -2547,8 +2555,13 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!parser_codec.parse(pos, recompression_codec, expected))
return false;
}
else if (mode == TTLMode::EXPORT)
{
if (!parseDatabaseAndTableName(pos, expected, destination_database, destination_name))
return false;
}

auto ttl_element = make_intrusive<ASTTTLElement>(mode, destination_type, destination_name, if_exists);
auto ttl_element = make_intrusive<ASTTTLElement>(mode, destination_type, destination_database, destination_name, if_exists);
ttl_element->setTTL(std::move(ttl_expr));
if (where_expr)
ttl_element->setWhere(std::move(where_expr));
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) cons
data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
else if (ttl_update_field == TTLUpdateField::EXPORT_TTL)
{
data_part->ttl_infos.export_ttl[ttl_update_key] = new_ttl_info;
}

}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ enum class TTLUpdateField : uint8_t
MOVES_TTL,
RECOMPRESSION_TTL,
GROUP_BY_TTL,
EXPORT_TTL,
};

/// Calculates new ttl_info and does nothing with data.
Expand Down
8 changes: 8 additions & 0 deletions src/Processors/Transforms/TTLCalcTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ TTLCalcTransform::TTLCalcTransform(
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));

for (const auto & export_ttl : metadata_snapshot_->getExportTTLs())
{
const auto export_key = export_ttl.getExportKey();
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(export_ttl, subqueries_for_sets, context), export_ttl,
TTLUpdateField::EXPORT_TTL, export_key, old_ttl_infos.export_ttl[export_key], current_time_, force_));
}
}

void TTLCalcTransform::consume(Chunk chunk)
Expand Down
8 changes: 8 additions & 0 deletions src/Processors/Transforms/TTLTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ TTLTransform::TTLTransform(
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));

for (const auto & export_ttl : metadata_snapshot_->getExportTTLs())
{
const auto export_key = export_ttl.getExportKey();
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(export_ttl, subqueries_for_sets, context), export_ttl,
TTLUpdateField::EXPORT_TTL, export_key, old_ttl_infos.export_ttl[export_key], current_time_, force_));
}
}

Block reorderColumns(Block block, const Block & header)
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/AlterCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,8 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else if (type == MODIFY_TTL)
{
metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST(
ttl, metadata.columns, context, metadata.primary_key, context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);
ttl, metadata.columns, context, metadata.primary_key, metadata.partition_key,
context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);
}
else if (type == REMOVE_TTL)
{
Expand Down Expand Up @@ -1393,6 +1394,7 @@ void AlterCommands::apply(StorageInMemoryMetadata & metadata, ContextPtr context
metadata_copy.columns,
context,
metadata_copy.primary_key,
metadata_copy.partition_key,
context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);

metadata = std::move(metadata_copy);
Expand Down
27 changes: 27 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
namespace DB
{

/// Distinguishes manifests submitted by manual `ALTER ... EXPORT PARTITION` from those
/// submitted by the TTL scheduler. Persisted in the manifest body and surfaced through
/// `system.replicated_partition_exports.export_origin`.
enum class ExportOrigin : int8_t
{
alter = 0,
ttl = 1,
};

struct ExportReplicatedMergeTreePartitionProcessingPartEntry
{

Expand Down Expand Up @@ -175,6 +184,11 @@ struct ExportReplicatedMergeTreePartitionManifest
String filename_pattern;
bool write_full_path_in_iceberg_metadata = false;
String iceberg_metadata_json;
ExportOrigin export_origin = ExportOrigin::alter;
/// Partition-wide max of the EXPORT TTL expression at submission time.
/// Populated only when `export_origin == ttl`; together with `status = COMPLETED`
/// it is the scheduler's durable high-water mark (see `TTLExportScheduler`).
time_t export_ttl_max = 0;

std::string toJsonString() const
{
Expand Down Expand Up @@ -208,6 +222,9 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("ttl_seconds", ttl_seconds);
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
json.set("export_origin", String(magic_enum::enum_name(export_origin)));
if (export_origin == ExportOrigin::ttl)
json.set("export_ttl_max", export_ttl_max);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -262,6 +279,16 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

/// Manifests written before this field existed default to `alter`.
if (json->has("export_origin"))
{
if (auto parsed = magic_enum::enum_cast<ExportOrigin>(json->getValue<String>("export_origin")))
manifest.export_origin = *parsed;
}

if (json->has("export_ttl_max"))
manifest.export_ttl_max = json->getValue<time_t>("export_ttl_max");

return manifest;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ namespace
auto & entries_by_key
)
{
bool has_expired = metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);
/// Manifests submitted by the TTL scheduler are durable by design: the scheduler relies on the
/// last manifest for `(src, dest)` to know where to resume, so manifest-TTL eviction must skip them.
bool has_expired = metadata.export_origin != ExportOrigin::ttl
&& metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);

bool task_timed_out = is_pending
&& metadata.task_timeout_seconds > 0
Expand Down Expand Up @@ -321,6 +324,7 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
info.parts_to_do = entry.manifest.parts.size();
info.parts = entry.manifest.parts;
info.status = magic_enum::enum_name(entry.status);
info.export_origin = entry.manifest.export_origin;

info.last_exception_per_replica.reserve(entry.last_exception_per_replica.size());
size_t total_exception_count = 0;
Expand Down
Loading
Loading