Skip to content

Distributed request to tables with Object Storage Engines #615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/Databases/Iceberg/DatabaseIceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageNull.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>

#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
Expand All @@ -37,6 +38,7 @@ namespace DatabaseIcebergSetting
extern const DatabaseIcebergSettingsString storage_endpoint;
extern const DatabaseIcebergSettingsString oauth_server_uri;
extern const DatabaseIcebergSettingsBool vended_credentials;
extern const DatabaseIcebergSettingsString object_storage_cluster;
}
namespace Setting
{
Expand Down Expand Up @@ -235,19 +237,20 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
/// no table structure in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings));

return std::make_shared<StorageObjectStorage>(
auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;

return std::make_shared<StorageObjectStorageCluster>(
cluster_name,
configuration,
configuration->createObjectStorage(context_, /* is_readonly */ false),
context_,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* comment */"",
getFormatSettings(context_),
LoadingStrictnessLevel::CREATE,
/* distributed_processing */false,
/* partition_by */nullptr,
/* lazy_init */true);
/* format_settings */ getFormatSettings(context_),
/* mode */ LoadingStrictnessLevel::CREATE,
/* partition_by */nullptr);
}

DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
Expand Down
1 change: 1 addition & 0 deletions src/Databases/Iceberg/DatabaseIcebergSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace ErrorCodes
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
Expand Down
2 changes: 1 addition & 1 deletion src/Parsers/FunctionSecretArgumentsFinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class FunctionSecretArgumentsFinder
}
else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") ||
(function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") ||
(function->name() == "gcs"))
(function->name() == "icebergS3") || (function->name() == "gcs"))
{
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ false);
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class RestorerFromBackup;

class ConditionSelectivityEstimator;

class IObjectStorage;
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;

struct ColumnSize
{
size_t marks = 0;
Expand Down
37 changes: 32 additions & 5 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ namespace Setting
extern const SettingsBool skip_unavailable_shards;
}

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}

IStorageCluster::IStorageCluster(
const String & cluster_name_,
const StorageID & table_id_,
Expand Down Expand Up @@ -73,13 +78,21 @@ void IStorageCluster::read(
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/)
size_t max_block_size,
size_t num_streams)
{
auto cluster_name_from_settings = getClusterName(context);

if (cluster_name_from_settings.empty())
{
readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return;
}

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getCluster(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Expand Down Expand Up @@ -126,6 +139,20 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert)
{
auto cluster_name_from_settings = getClusterName(context);

if (cluster_name_from_settings.empty())
return writeFallBackToPure(query, metadata_snapshot, context, async_insert);

throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName());
}

void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createExtension(nullptr);
Expand Down Expand Up @@ -196,9 +223,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
return new_context;
}

ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_)
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef());
}

}
39 changes: 36 additions & 3 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ class IStorageCluster : public IStorage
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/) override;
size_t max_block_size,
size_t num_streams) override;

ClusterPtr getCluster(ContextPtr context) const;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;

ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move the implementation to the header file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single line implementation. In my opinion single-line implementation in header is more readable and simple to understanding than separate implementation in cpp-file, when declaration and implementation in two different places.

/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;

Expand All @@ -43,11 +49,38 @@ class IStorageCluster : public IStorage
bool supportsOptimizationToSubcolumns() const override { return false; }
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }

const String & getOriginalClusterName() const { return cluster_name; }
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}

virtual void readFallBackToPure(
QueryPlan & /* query_plan */,
const Names & /* column_names */,
const StorageSnapshotPtr & /* storage_snapshot */,
SelectQueryInfo & /* query_info */,
ContextPtr /* context */,
QueryProcessingStage::Enum /* processed_stage */,
size_t /* max_block_size */,
size_t /* num_streams */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName());
}

virtual SinkToStoragePtr writeFallBackToPure(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*context*/,
bool /*async_insert*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
}

private:
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_);

LoggerPtr log;
String cluster_name;
};
Expand Down
22 changes: 16 additions & 6 deletions src/Storages/ObjectStorage/Azure/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll

String connection_url;
String container_name;
std::optional<String> account_name;
std::optional<String> account_key;

if (collection.has("connection_string"))
connection_url = collection.get<String>("connection_string");
Expand Down Expand Up @@ -173,14 +171,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,

std::unordered_map<std::string_view, size_t> engine_args_to_idx;


String connection_url = checkAndGetLiteralArgument<String>(engine_args[0], "connection_string/storage_account_url");
String container_name = checkAndGetLiteralArgument<String>(engine_args[1], "container");
blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath");

std::optional<String> account_name;
std::optional<String> account_key;

auto is_format_arg = [] (const std::string & s) -> bool
{
return s == "auto" || FormatFactory::instance().getAllFormats().contains(Poco::toLower(s));
Expand Down Expand Up @@ -444,6 +438,22 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}

ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();

arguments->children.push_back(std::make_shared<ASTLiteral>(connection_params.endpoint.storage_account_url));
arguments->children.push_back(std::make_shared<ASTIdentifier>(connection_params.endpoint.container_name));
arguments->children.push_back(std::make_shared<ASTLiteral>(blob_path));
if (account_name && account_key)
{
arguments->children.push_back(std::make_shared<ASTLiteral>(*account_name));
arguments->children.push_back(std::make_shared<ASTLiteral>(*account_key));
}

return arguments;
}

}

#endif
4 changes: 4 additions & 0 deletions src/Storages/ObjectStorage/Azure/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,17 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

ASTPtr createArgsWithAccessData() const override;

protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;

std::string blob_path;
std::vector<String> blobs_paths;
AzureBlobStorage::ConnectionParams connection_params;
std::optional<std::string> account_name;
std::optional<std::string> account_key;
};

}
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/ObjectStorage/HDFS/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}

ASTPtr StorageHDFSConfiguration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();
arguments->children.push_back(std::make_shared<ASTLiteral>(url + path));
return arguments;
}

}

#endif
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/HDFS/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

ASTPtr createArgsWithAccessData() const override;

private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
Expand Down
30 changes: 27 additions & 3 deletions src/Storages/ObjectStorage/S3/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_

if (engine_args_to_idx.contains("format"))
{
format = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["format"]], "format");
auto format_ = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["format"]], "format");
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
if (format != "auto")
format = format;
if (format_ != "auto")
format = format_;
}

if (engine_args_to_idx.contains("structure"))
Expand Down Expand Up @@ -585,6 +585,30 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
}

ASTPtr StorageS3Configuration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();

arguments->children.push_back(std::make_shared<ASTLiteral>(url.uri_str));
if (auth_settings[S3AuthSetting::no_sign_request])
{
arguments->children.push_back(std::make_shared<ASTLiteral>("NOSIGN"));
}
else
{
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::access_key_id].value));
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::secret_access_key].value));
if (!auth_settings[S3AuthSetting::session_token].value.empty())
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::session_token].value));
if (format != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(format));
if (!compression_method.empty())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok not to add some other arguments like structure and headers as the docs suggest? https://clickhouse.com/docs/en/sql-reference/table-functions/s3

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this fields are already added in addStructureAndFormatToArgsIfNeeded methods.
https://github.com/ClickHouse/ClickHouse/blob/master/src/Storages/ObjectStorage/S3/Configuration.cpp#L399
These field required for table functions too, when access parameters need to add only for table engine case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: perhaps add a comment saying this is already added somewhere else?

Copy link
Author

@ianton-ru ianton-ru Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I rename method to addPathAndAccessKeysToArgs, to consistency with addStructureAndFormatToArgsIfNeeded

arguments->children.push_back(std::make_shared<ASTLiteral>(compression_method));
}

return arguments;
}

}

#endif
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/S3/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

ASTPtr createArgsWithAccessData() const override;

private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
Expand Down
13 changes: 10 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ String StorageObjectStorage::getPathSample(ContextPtr context)
if (context->getSettingsRef()[Setting::use_hive_partitioning])
local_distributed_processing = false;

if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing)
return configuration->getPath();

auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
query_settings,
Expand All @@ -72,9 +75,6 @@ String StorageObjectStorage::getPathSample(ContextPtr context)
{} // file_progress_callback
);

if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing)
return configuration->getPath();

if (auto file = file_iterator->next(0))
return file->getPath();
return "";
Expand Down Expand Up @@ -165,6 +165,13 @@ void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage
{
IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()};
object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options);
updated = true;
}

void StorageObjectStorage::Configuration::updateIfRequired(ObjectStoragePtr object_storage_ptr, ContextPtr local_context)
{
if (!updated)
Copy link
Member

@Enmk Enmk Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how critical is it to avoid updating settings multiple times? What happens if update is done concurrently?

Thread sanitizer reports that there is a datarace on updated variable:
https://s3.amazonaws.com/altinity-build-artifacts/615/14da835df2d32fe0882cfba308d62a8a8abd0caa/stateless_tests__tsan__[4_4]/stderr.log

So we have two options here:

  • convert updated to std::atomic<bool> so the datarace goes away, but still leaving a chance of concurrent update of the settings.
  • add a mutex that is going to guard Configuration::update's body from concurrent updates (+ we would need some extra checks to avoid updating multiple times)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing destructive, but if cache turned off it causes additional requests to remote storage. With cache it changes cache hit rate, also nothing except some tests based on calculation cache hits failed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I added atomic it in upstream and forget to add for Antalya, it's my fault.

update(object_storage_ptr, local_context);
}

bool StorageObjectStorage::hasExternalDynamicMetadata() const
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,13 @@ class StorageObjectStorage::Configuration
String structure = "auto";

virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context);

/// Create arguments for table function with path and access parameters
virtual ASTPtr createArgsWithAccessData() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method createArgsWithAccessData is not supported by storage {}", getEngineName());
}

protected:
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
Expand All @@ -256,6 +262,7 @@ class StorageObjectStorage::Configuration
void assertInitialized() const;

bool initialized = false;
std::atomic<bool> updated = false;
DataLakePartitionColumns partition_columns;

bool allow_dynamic_metadata_for_data_lakes;
Expand Down
Loading
Loading