Skip to content

Commit 3fafe6f

Browse files
committed
Add ability to choose object storage cluster in select query
1 parent db44166 commit 3fafe6f

File tree

8 files changed

+210
-135
lines changed

8 files changed

+210
-135
lines changed

src/Databases/Iceberg/DatabaseIceberg.cpp

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ namespace DatabaseIcebergSetting
4343
namespace Setting
4444
{
4545
extern const SettingsBool allow_experimental_database_iceberg;
46-
extern const SettingsString object_storage_cluster;
4746
}
4847

4948
namespace ErrorCodes
@@ -240,33 +239,18 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
240239

241240
auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;
242241

243-
if (cluster_name.empty())
244-
{
245-
return std::make_shared<StorageObjectStorage>(
246-
configuration,
247-
configuration->createObjectStorage(context_, /* is_readonly */ false),
248-
context_,
249-
StorageID(getDatabaseName(), name),
250-
/* columns */columns,
251-
/* constraints */ConstraintsDescription{},
252-
/* comment */"",
253-
getFormatSettings(context_),
254-
LoadingStrictnessLevel::CREATE,
255-
/* distributed_processing */false,
256-
/* partition_by */nullptr,
257-
/* lazy_init */true);
258-
}
259-
else
260-
{
261-
return std::make_shared<StorageObjectStorageCluster>(
262-
cluster_name,
263-
configuration,
264-
configuration->createObjectStorage(context_, /* is_readonly */ false),
265-
StorageID(getDatabaseName(), name),
266-
columns,
267-
ConstraintsDescription{},
268-
context_);
269-
}
242+
return std::make_shared<StorageObjectStorageCluster>(
243+
cluster_name,
244+
configuration,
245+
configuration->createObjectStorage(context_, /* is_readonly */ false),
246+
context_,
247+
StorageID(getDatabaseName(), name),
248+
/* columns */columns,
249+
/* constraints */ConstraintsDescription{},
250+
/* comment */"",
251+
/* format_settings */ getFormatSettings(context_),
252+
/* mode */ LoadingStrictnessLevel::CREATE,
253+
/* partition_by */nullptr);
270254
}
271255

272256
DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(

src/Storages/IStorageCluster.cpp

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ namespace Setting
3636
extern const SettingsBool skip_unavailable_shards;
3737
}
3838

39+
namespace ErrorCodes
40+
{
41+
extern const int NOT_IMPLEMENTED;
42+
}
43+
3944
IStorageCluster::IStorageCluster(
4045
const String & cluster_name_,
4146
const StorageID & table_id_,
@@ -65,6 +70,19 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
6570
extension = storage->getTaskIteratorExtension(predicate, context);
6671
}
6772

73+
void IStorageCluster::readFallBackToPure(
74+
QueryPlan & /*query_plan*/,
75+
const Names & /*column_names*/,
76+
const StorageSnapshotPtr & /*storage_snapshot*/,
77+
SelectQueryInfo & /*query_info*/,
78+
ContextPtr /*context*/,
79+
QueryProcessingStage::Enum /*processed_stage*/,
80+
size_t /*max_block_size*/,
81+
size_t /*num_streams*/)
82+
{
83+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName());
84+
}
85+
6886
/// The code executes on initiator
6987
void IStorageCluster::read(
7088
QueryPlan & query_plan,
@@ -73,13 +91,21 @@ void IStorageCluster::read(
7391
SelectQueryInfo & query_info,
7492
ContextPtr context,
7593
QueryProcessingStage::Enum processed_stage,
76-
size_t /*max_block_size*/,
77-
size_t /*num_streams*/)
94+
size_t max_block_size,
95+
size_t num_streams)
7896
{
97+
auto cluster_name_ = getClusterName(context);
98+
99+
if (cluster_name_.empty())
100+
{
101+
readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
102+
return;
103+
}
104+
79105
storage_snapshot->check(column_names);
80106

81107
updateBeforeRead(context);
82-
auto cluster = getCluster(context);
108+
auto cluster = getClusterImpl(context, cluster_name_);
83109

84110
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
85111

@@ -196,9 +222,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
196222
return new_context;
197223
}
198224

199-
ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
225+
ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_)
200226
{
201-
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
227+
return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef());
202228
}
203229

204230
}

src/Storages/IStorageCluster.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ class IStorageCluster : public IStorage
2929
SelectQueryInfo & query_info,
3030
ContextPtr context,
3131
QueryProcessingStage::Enum processed_stage,
32-
size_t /*max_block_size*/,
33-
size_t /*num_streams*/) override;
32+
size_t max_block_size,
33+
size_t num_streams) override;
3434

35-
ClusterPtr getCluster(ContextPtr context) const;
35+
ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
3636
/// Query is needed for pruning by virtual columns (_file, _path)
3737
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
3838

@@ -43,13 +43,26 @@ class IStorageCluster : public IStorage
4343
bool supportsOptimizationToSubcolumns() const override { return false; }
4444
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }
4545

46-
const String & getClusterName() const { return cluster_name; }
46+
const String & getOriginalClusterName() const { return cluster_name; }
47+
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }
4748

4849
protected:
4950
virtual void updateBeforeRead(const ContextPtr &) {}
5051
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
5152

53+
virtual void readFallBackToPure(
54+
QueryPlan & query_plan,
55+
const Names & column_names,
56+
const StorageSnapshotPtr & storage_snapshot,
57+
SelectQueryInfo & query_info,
58+
ContextPtr context,
59+
QueryProcessingStage::Enum processed_stage,
60+
size_t max_block_size,
61+
size_t num_streams);
62+
5263
private:
64+
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_);
65+
5366
LoggerPtr log;
5467
String cluster_name;
5568
};

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ namespace DB
2323
namespace Setting
2424
{
2525
extern const SettingsBool use_hive_partitioning;
26+
extern const SettingsString object_storage_cluster;
2627
}
2728

2829
namespace ErrorCodes
@@ -63,15 +64,24 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
6364
const String & cluster_name_,
6465
ConfigurationPtr configuration_,
6566
ObjectStoragePtr object_storage_,
67+
ContextPtr context_,
6668
const StorageID & table_id_,
6769
const ColumnsDescription & columns_,
6870
const ConstraintsDescription & constraints_,
69-
ContextPtr context_)
71+
const String & comment_,
72+
std::optional<FormatSettings> format_settings_,
73+
LoadingStrictnessLevel mode_,
74+
ASTPtr partition_by_
75+
)
7076
: IStorageCluster(
7177
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
7278
, configuration{configuration_}
7379
, object_storage(object_storage_)
7480
, cluster_name_in_settings(false)
81+
, comment(comment_)
82+
, format_settings(format_settings_)
83+
, mode(mode_)
84+
, partition_by(partition_by_)
7585
{
7686
ColumnsDescription columns{columns_};
7787
std::string sample_path;
@@ -94,7 +104,7 @@ std::string StorageObjectStorageCluster::getName() const
94104
return configuration->getEngineName();
95105
}
96106

97-
void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query)
107+
void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context)
98108
{
99109
// Change table engine on table function for distributed request
100110
// CREATE TABLE t (...) ENGINE=IcebergS3(...)
@@ -131,6 +141,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
131141
{"S3", "s3"},
132142
{"Azure", "azureBlobStorage"},
133143
{"HDFS", "hdfs"},
144+
{"Iceberg", "icebergS3"},
134145
{"IcebergS3", "icebergS3"},
135146
{"IcebergAzure", "icebergAzure"},
136147
{"IcebergHDFS", "icebergHDFS"},
@@ -153,7 +164,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
153164
auto function_ast = std::make_shared<ASTFunction>();
154165
function_ast->name = table_function_name;
155166

156-
auto cluster_name = getClusterName();
167+
auto cluster_name = getClusterName(context);
157168

158169
if (cluster_name.empty())
159170
{
@@ -195,7 +206,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
195206
const DB::StorageSnapshotPtr & storage_snapshot,
196207
const ContextPtr & context)
197208
{
198-
updateQueryForDistributedEngineIfNeeded(query);
209+
updateQueryForDistributedEngineIfNeeded(query, context);
199210

200211
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
201212

@@ -247,4 +258,39 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
247258
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
248259
}
249260

261+
void StorageObjectStorageCluster::readFallBackToPure(
262+
QueryPlan & query_plan,
263+
const Names & column_names,
264+
const StorageSnapshotPtr & storage_snapshot,
265+
SelectQueryInfo & query_info,
266+
ContextPtr context,
267+
QueryProcessingStage::Enum processed_stage,
268+
size_t max_block_size,
269+
size_t num_streams)
270+
{
271+
if (!pure_storage)
272+
pure_storage = std::make_shared<StorageObjectStorage>(
273+
configuration,
274+
object_storage,
275+
context,
276+
getStorageID(),
277+
getInMemoryMetadata().getColumns(),
278+
getInMemoryMetadata().getConstraints(),
279+
comment,
280+
format_settings,
281+
mode,
282+
/* distributed_processing */false,
283+
partition_by);
284+
285+
pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
286+
}
287+
288+
String StorageObjectStorageCluster::getClusterName(ContextPtr context) const
289+
{
290+
auto cluster_name_ = context->getSettingsRef()[Setting::object_storage_cluster].value;
291+
if (cluster_name_.empty())
292+
cluster_name_ = getOriginalClusterName();
293+
return cluster_name_;
294+
}
295+
250296
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@ class StorageObjectStorageCluster : public IStorageCluster
1717
const String & cluster_name_,
1818
ConfigurationPtr configuration_,
1919
ObjectStoragePtr object_storage_,
20+
ContextPtr context_,
2021
const StorageID & table_id_,
2122
const ColumnsDescription & columns_,
2223
const ConstraintsDescription & constraints_,
23-
ContextPtr context_);
24+
const String & comment_,
25+
std::optional<FormatSettings> format_settings_,
26+
LoadingStrictnessLevel mode_,
27+
ASTPtr partition_by_ = nullptr
28+
);
2429

2530
std::string getName() const override;
2631

@@ -31,12 +36,24 @@ class StorageObjectStorageCluster : public IStorageCluster
3136

3237
void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; }
3338

39+
String getClusterName(ContextPtr context) const override;
40+
3441
private:
3542
void updateQueryToSendIfNeeded(
3643
ASTPtr & query,
3744
const StorageSnapshotPtr & storage_snapshot,
3845
const ContextPtr & context) override;
3946

47+
void readFallBackToPure(
48+
QueryPlan & query_plan,
49+
const Names & column_names,
50+
const StorageSnapshotPtr & storage_snapshot,
51+
SelectQueryInfo & query_info,
52+
ContextPtr context,
53+
QueryProcessingStage::Enum processed_stage,
54+
size_t max_block_size,
55+
size_t num_streams) override;
56+
4057
/*
4158
In case the table was created with `object_storage_cluster` setting,
4259
modify the AST query object so that it uses the table function implementation
@@ -49,12 +66,18 @@ class StorageObjectStorageCluster : public IStorageCluster
4966
SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster'
5067
to make distributed request over cluster 'cluster'.
5168
*/
52-
void updateQueryForDistributedEngineIfNeeded(ASTPtr & query);
69+
void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context);
5370

5471
const String engine_name;
5572
const StorageObjectStorage::ConfigurationPtr configuration;
5673
const ObjectStoragePtr object_storage;
5774
bool cluster_name_in_settings;
75+
76+
std::shared_ptr<StorageObjectStorage> pure_storage;
77+
String comment;
78+
std::optional<FormatSettings> format_settings;
79+
LoadingStrictnessLevel mode;
80+
ASTPtr partition_by;
5881
};
5982

6083
}

src/Storages/ObjectStorage/registerStorageObjectStorage.cpp

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@ namespace ErrorCodes
2020
extern const int BAD_ARGUMENTS;
2121
}
2222

23-
namespace Setting
24-
{
25-
extern const SettingsString object_storage_cluster;
26-
}
27-
2823
namespace StorageObjectStorageSetting
2924
{
3025
extern const StorageObjectStorageSettingsString object_storage_cluster;
@@ -74,32 +69,18 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
7469
if (args.storage_def->partition_by)
7570
partition_by = args.storage_def->partition_by->clone();
7671

77-
if (cluster_name.empty())
78-
{
79-
return std::make_shared<StorageObjectStorage>(
80-
configuration,
81-
configuration->createObjectStorage(context, /* is_readonly */ false),
82-
args.getContext(),
83-
args.table_id,
84-
args.columns,
85-
args.constraints,
86-
args.comment,
87-
format_settings,
88-
args.mode,
89-
/* distributed_processing */ false,
90-
partition_by);
91-
}
92-
else
93-
{
94-
return std::make_shared<StorageObjectStorageCluster>(
95-
cluster_name,
96-
configuration,
97-
configuration->createObjectStorage(context, /* is_readonly */ false),
98-
args.table_id,
99-
args.columns,
100-
args.constraints,
101-
args.getContext());
102-
}
72+
return std::make_shared<StorageObjectStorageCluster>(
73+
cluster_name,
74+
configuration,
75+
configuration->createObjectStorage(context, /* is_readonly */ false),
76+
args.getContext(),
77+
args.table_id,
78+
args.columns,
79+
args.constraints,
80+
args.comment,
81+
format_settings,
82+
args.mode,
83+
partition_by);
10384
}
10485

10586
#endif

0 commit comments

Comments
 (0)