Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_META
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS = 6;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 7;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH;

static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1;

Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,9 @@ Use multiple threads for azure multipart upload.
)", 0) \
DECLARE(Bool, s3_throw_on_zero_files_match, false, R"(
Throw an error, when ListObjects request cannot match any files
)", 0) \
DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"(
Credentials from the base storage are always propagated to secondary object storages when endpoints match. When this setting is enabled, credentials are also propagated when endpoints differ, including less secure connections (for example, from `https` to plain `http`).
)", 0) \
DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"(
Throw an error if matched zero files according to glob expansion rules.
Expand Down
2 changes: 1 addition & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "26.1.3.20001.altinityantalya",
{
{"iceberg_partition_timezone", "", "", "New setting."},
// {"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
{"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"},
// {"use_parquet_metadata_cache", false, true, "Enables cache of parquet file metadata."},
// {"input_format_parquet_use_metadata_cache", true, false, "Obsolete. No-op"}, // https://github.com/Altinity/ClickHouse/pull/586
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
LOG_DEBUG(log, "Has no credentials");
}
}
else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end())
else if (!lightweight && table_metadata.requiresCredentials()
&& std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()
&& table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
Expand Down
14 changes: 8 additions & 6 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ namespace DB

struct URIConverter
{
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper, bool enable_url_encoding = true)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery());
uri = macros.expand(mapper[uri.getScheme()]).empty()
? uri
: Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding);
}
};

Expand All @@ -32,7 +34,7 @@ namespace ErrorCodes
namespace S3
{

URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters)
URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters, bool enable_url_encoding)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
Expand All @@ -54,9 +56,9 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
else
uri_str = uri_;

uri = Poco::URI(uri_str);
uri = Poco::URI(uri_str, enable_url_encoding);
/// Keep a copy of how Poco parsed the original string before any mapping
Poco::URI original_uri(uri_str);
Poco::URI original_uri(uri_str, enable_url_encoding);
bool looks_like_presigned = false;
for (const auto & [qk, qv] : original_uri.getQueryParameters())
{
Expand Down Expand Up @@ -101,7 +103,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
}

if (!mapper.empty())
URIConverter::modifyURI(uri, mapper);
URIConverter::modifyURI(uri, mapper, enable_url_encoding);
}

storage_name = "S3";
Expand Down
3 changes: 2 additions & 1 deletion src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ struct URI
explicit URI(
const std::string & uri_,
bool allow_archive_path_syntax = false,
bool keep_presigned_query_parameters = true);
bool keep_presigned_query_parameters = true,
bool enable_url_encoding = true);
void addRegionToURI(const std::string & region);

static void validateBucket(const std::string & bucket, const Poco::URI & uri);
Expand Down
6 changes: 2 additions & 4 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
data_lake_metadata = object->data_lake_metadata.value();

#if USE_AVRO
if (std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
{
iceberg_info = dynamic_cast<IcebergDataObjectInfo &>(*object).info;
}
if (auto iceberg_object = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
iceberg_info = iceberg_object->info;
#endif

file_meta_info = object->relative_path_with_metadata.file_meta_info;
Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/IcebergMetadataLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,16 @@ void insertRowToLogTable(
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg metadata log table is not configured");
}

String normalized_table_path = table_path;
while (normalized_table_path.size() > 1 && normalized_table_path.back() == '/')
normalized_table_path.pop_back();

iceberg_metadata_log->add(
DB::IcebergMetadataLogElement{
.current_time = spec.tv_sec,
.query_id = local_context->getCurrentQueryId(),
.content_type = row_log_level,
.table_path = table_path,
.table_path = normalized_table_path,
.file_path = file_path,
.metadata_content = get_row(),
.row_in_file = row_in_file,
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class IDataLakeMetadata : boost::noncopyable

virtual bool operator==(const IDataLakeMetadata & other) const = 0;

/// Returns the full table location URI (e.g. `s3a://bucket/prefix/table/`)
virtual std::string getTableLocation() const { return {}; }

/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
Expand Down
103 changes: 80 additions & 23 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <string>
#include <unordered_set>
#include <Columns/IColumn.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/Settings.h>
Expand Down Expand Up @@ -72,6 +73,9 @@ struct Plan
std::unordered_map<String, std::vector<String>> manifest_list_to_manifest_files;
std::unordered_map<Int64, std::vector<std::shared_ptr<DataFilePlan>>> snapshot_id_to_data_files;
std::unordered_map<String, std::shared_ptr<DataFilePlan>> path_to_data_file;
/// Raw paths of every file referenced by the snapshots being compacted, used at cleanup
/// time to also remove files that live outside the base object_storage.
std::unordered_set<String> referenced_file_paths;
FileNamesGenerator generator;
Poco::JSON::Object::Ptr initial_metadata_object;

Expand Down Expand Up @@ -113,6 +117,7 @@ Plan getPlan(
const DataLakeStorageSettings & data_lake_settings,
const PersistentTableComponents & persistent_table_components,
ObjectStoragePtr object_storage,
SecondaryStorages & secondary_storages,
const String & write_format,
ContextPtr context,
CompressionMethod compression_method)
Expand Down Expand Up @@ -155,14 +160,16 @@ Plan getPlan(
std::unordered_map<String, std::shared_ptr<ManifestFilePlan>> manifest_files;
for (const auto & snapshot : snapshots_info)
{
auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log);
plan.referenced_file_paths.insert(snapshot.manifest_list_path);
auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log, secondary_storages);
for (const auto & manifest_file : manifest_list)
{
plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path);
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path))
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id;
plan.referenced_file_paths.insert(manifest_file.manifest_file_path);
auto files_handle = getManifestFileEntriesHandle(
object_storage, persistent_table_components, context, log, manifest_file, static_cast<Int32>(current_schema_id));
object_storage, persistent_table_components, context, log, manifest_file, static_cast<Int32>(current_schema_id), secondary_storages);

if (!manifest_files.contains(manifest_file.manifest_file_path))
{
Expand All @@ -171,27 +178,39 @@ Plan getPlan(
}
manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path);
for (const auto & pos_delete_file : files_handle.getFilesWithoutDeleted(FileContentType::POSITION_DELETE))
{
all_positional_delete_files.push_back(pos_delete_file);
plan.referenced_file_paths.insert(pos_delete_file->parsed_entry->file_path_key);
}

for (const auto & data_file : files_handle.getFilesWithoutDeleted(FileContentType::DATA))
{
plan.referenced_file_paths.insert(data_file->parsed_entry->file_path_key);
auto partition_index = plan.partition_encoder.encodePartition(data_file->parsed_entry->partition_key_value);
if (plan.partitions.size() <= partition_index)
plan.partitions.push_back({});

IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file, 0);
const auto & raw_metadata_path = data_file->parsed_entry->file_path_key;
auto [resolved_storage, resolved_key] = resolveObjectStorageForPath(
persistent_table_components.table_location,
raw_metadata_path, object_storage, secondary_storages, context);

IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(
data_file, raw_metadata_path, 0, resolved_storage, resolved_key);
std::shared_ptr<DataFilePlan> data_file_ptr;
if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path))
std::string path_identifier
= resolved_storage->getDescription() + '\0' + resolved_storage->getObjectsNamespace() + '\0' + resolved_key;
if (!plan.path_to_data_file.contains(path_identifier))
{
data_file_ptr = std::make_shared<DataFilePlan>(DataFilePlan{
.data_object_info = data_object_info,
.manifest_list = manifest_files[manifest_file.manifest_file_path],
.patched_path = plan.generator.generateDataFileName()});
plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr;
plan.path_to_data_file[path_identifier] = data_file_ptr;
}
else
{
data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path];
data_file_ptr = plan.path_to_data_file[path_identifier];
}
plan.partitions[partition_index].push_back(data_file_ptr);
plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back());
Expand Down Expand Up @@ -224,7 +243,9 @@ static void writeDataFiles(
const std::optional<FormatSettings> & format_settings,
ContextPtr context,
const String & write_format,
CompressionMethod write_compression_method)
CompressionMethod write_compression_method,
const String & table_location,
std::shared_ptr<SecondaryStorages> secondary_storages)
{
for (auto & [_, data_file] : initial_plan.path_to_data_file)
{
Expand All @@ -235,10 +256,15 @@ static void writeDataFiles(
format_settings,
// todo make compaction using same FormatParserSharedResources
std::make_shared<FormatParserSharedResources>(context->getSettingsRef(), 1),
context);
context,
table_location,
secondary_storages);

RelativePathWithMetadata relative_path(data_file->data_object_info->getPath());
auto read_buffer = createReadBuffer(relative_path, object_storage, context, getLogger("IcebergCompaction"));
ObjectStoragePtr storage_to_use = data_file->data_object_info->getResolvedStorage();
if (!storage_to_use)
storage_to_use = object_storage;
RelativePathWithMetadata object_info(data_file->data_object_info->getPath());
auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction"));

const Settings & settings = context->getSettingsRef();
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(
Expand Down Expand Up @@ -392,6 +418,8 @@ void writeMetadataFiles(
{
manifest_entry->patched_path = plan.generator.generateManifestEntryName();
manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path.path_in_metadata;

std::vector<String> manifest_data_filenames(data_filenames.begin(), data_filenames.end());
auto buffer_manifest_entry = object_storage->writeObject(
StoredObject(manifest_entry->patched_path.path_in_storage),
WriteMode::Rewrite,
Expand All @@ -409,7 +437,7 @@ void writeMetadataFiles(
partition_columns,
plan.partition_encoder.getPartitionValue(grouped_by_manifest_files_partitions[manifest_entry]),
ChunkPartitioner(fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes(),
std::vector(data_filenames.begin(), data_filenames.end()),
manifest_data_filenames,
manifest_entry->statistics,
sample_block_,
snapshot,
Expand Down Expand Up @@ -485,29 +513,54 @@ void writeMetadataFiles(
}
}

std::vector<String> getOldFiles(ObjectStoragePtr object_storage, const String & table_path)
/// Files to delete after compaction: a base-storage directory listing under `metadata/` and
/// `data/` (covers historical metadata.json and any orphan files on the base storage), plus
/// any paths from the compacted snapshots that resolve to a secondary storage.
std::vector<std::pair<ObjectStoragePtr, String>> getOldFiles(
ObjectStoragePtr object_storage,
SecondaryStorages & secondary_storages,
ContextPtr context,
const PersistentTableComponents & persistent_table_components,
const Plan & plan)
{
auto metadata_files = listFiles(*object_storage, table_path, "metadata", "");
auto data_files = listFiles(*object_storage, table_path, "data", "");
std::vector<std::pair<ObjectStoragePtr, String>> result;

for (auto && file : listFiles(*object_storage, persistent_table_components.table_path, "metadata", ""))
result.emplace_back(object_storage, std::move(file));
for (auto && file : listFiles(*object_storage, persistent_table_components.table_path, "data", ""))
result.emplace_back(object_storage, std::move(file));

for (const auto & raw_path : plan.referenced_file_paths)
{
auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath(
persistent_table_components.table_location,
raw_path,
object_storage,
secondary_storages,
context);

for (auto && data_file : data_files)
metadata_files.push_back(data_file);
if (storage_to_use.get() != object_storage.get())
result.emplace_back(std::move(storage_to_use), std::move(key_in_storage));
}

return metadata_files;
return result;
}

void clearOldFiles(ObjectStoragePtr object_storage, const std::vector<String> & old_files)
void clearOldFiles(const std::vector<std::pair<ObjectStoragePtr, String>> & old_files)
{
for (const auto & metadata_file : old_files)
auto log = getLogger("IcebergCompaction");
for (const auto & [storage, key] : old_files)
{
object_storage->removeObjectIfExists(StoredObject(metadata_file));
LOG_DEBUG(log, "Removing old file during compaction: storage={}, key={}", storage->getDescription(), key);
storage->removeObjectIfExists(StoredObject(key));
}
}

void compactIcebergTable(
IcebergHistory snapshots_info,
const PersistentTableComponents & persistent_table_components,
ObjectStoragePtr object_storage_,
std::shared_ptr<SecondaryStorages> secondary_storages_,
const DataLakeStorageSettings & data_lake_settings,
const std::optional<FormatSettings> & format_settings_,
SharedHeader sample_block_,
Expand All @@ -519,22 +572,26 @@ void compactIcebergTable(
data_lake_settings,
persistent_table_components,
object_storage_,
*secondary_storages_,
write_format,
context_,
persistent_table_components.metadata_compression_method);
if (plan.need_optimize)
{
auto old_files = getOldFiles(object_storage_, persistent_table_components.table_path);
auto old_files = getOldFiles(
object_storage_, *secondary_storages_, context_, persistent_table_components, plan);
writeDataFiles(
plan,
sample_block_,
object_storage_,
format_settings_,
context_,
write_format,
persistent_table_components.metadata_compression_method);
persistent_table_components.metadata_compression_method,
persistent_table_components.table_location,
secondary_storages_);
writeMetadataFiles(plan, object_storage_, context_, sample_block_, write_format, persistent_table_components.table_path);
clearOldFiles(object_storage_, old_files);
clearOldFiles(old_files);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Storages/ObjectStorage/DataLakes/Iceberg/PersistentTableComponents.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/Utils.h>


namespace DB::Iceberg
Expand All @@ -15,6 +16,7 @@ void compactIcebergTable(
IcebergHistory snapshots_info,
const PersistentTableComponents & persistent_table_components,
DB::ObjectStoragePtr object_storage_,
std::shared_ptr<SecondaryStorages> secondary_storages_,
const DataLakeStorageSettings & data_lake_settings,
const std::optional<DB::FormatSettings> & format_settings_,
DB::SharedHeader sample_block_,
Expand Down
Loading
Loading