Skip to content

Commit 942c5e1

Browse files
committed
More simple
1 parent 293ac83 commit 942c5e1

File tree

2 files changed

+25
-43
lines changed

2 files changed

+25
-43
lines changed

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace ErrorCodes
3030
{
3131
extern const int LOGICAL_ERROR;
3232
extern const int UNKNOWN_FUNCTION;
33+
extern const int NOT_IMPLEMENTED;
3334
}
3435

3536

@@ -78,10 +79,6 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
7879
, configuration{configuration_}
7980
, object_storage(object_storage_)
8081
, cluster_name_in_settings(false)
81-
, comment(comment_)
82-
, format_settings(format_settings_)
83-
, mode(mode_)
84-
, partition_by(partition_by_)
8582
{
8683
ColumnsDescription columns{columns_};
8784
std::string sample_path;
@@ -98,7 +95,18 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
9895
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path));
9996
setInMemoryMetadata(metadata);
10097

101-
getPureStorage(context_);
98+
pure_storage = std::make_shared<StorageObjectStorage>(
99+
configuration,
100+
object_storage,
101+
context_,
102+
getStorageID(),
103+
getInMemoryMetadata().getColumns(),
104+
getInMemoryMetadata().getConstraints(),
105+
comment_,
106+
format_settings_,
107+
mode_,
108+
/* distributed_processing */false,
109+
partition_by_);
102110
}
103111

104112
std::string StorageObjectStorageCluster::getName() const
@@ -260,34 +268,6 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
260268
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
261269
}
262270

263-
std::shared_ptr<StorageObjectStorage> StorageObjectStorageCluster::getPureStorage(ContextPtr context)
264-
{
265-
std::lock_guard lock(mutex);
266-
if (!pure_storage)
267-
{
268-
pure_storage = std::make_shared<StorageObjectStorage>(
269-
configuration,
270-
object_storage,
271-
context,
272-
getStorageID(),
273-
getInMemoryMetadata().getColumns(),
274-
getInMemoryMetadata().getConstraints(),
275-
comment,
276-
format_settings,
277-
mode,
278-
/* distributed_processing */false,
279-
partition_by);
280-
281-
auto virtuals_ = getVirtualsPtr();
282-
if (virtuals_)
283-
pure_storage->setVirtuals(*virtuals_);
284-
285-
pure_storage->setInMemoryMetadata(getInMemoryMetadata());
286-
}
287-
288-
return pure_storage;
289-
}
290-
291271
void StorageObjectStorageCluster::readFallBackToPure(
292272
QueryPlan & query_plan,
293273
const Names & column_names,
@@ -298,7 +278,7 @@ void StorageObjectStorageCluster::readFallBackToPure(
298278
size_t max_block_size,
299279
size_t num_streams)
300280
{
301-
getPureStorage(context)->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
281+
pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
302282
}
303283

304284
SinkToStoragePtr StorageObjectStorageCluster::writeFallBackToPure(
@@ -307,11 +287,15 @@ SinkToStoragePtr StorageObjectStorageCluster::writeFallBackToPure(
307287
ContextPtr context,
308288
bool async_insert)
309289
{
310-
return getPureStorage(context)->write(query, metadata_snapshot, context, async_insert);
290+
return pure_storage->write(query, metadata_snapshot, context, async_insert);
311291
}
312292

313293
String StorageObjectStorageCluster::getClusterName(ContextPtr context) const
314294
{
295+
/// We try to get cluster name from query settings.
296+
/// If it emtpy, we take default cluster name from table settings.
297+
/// When it is not empty, we use this cluster to distibuted requests.
298+
/// When both are empty, we must fall back to pure implementatiuon.
315299
auto cluster_name_ = context->getSettingsRef()[Setting::object_storage_cluster].value;
316300
if (cluster_name_.empty())
317301
cluster_name_ = getOriginalClusterName();
@@ -335,7 +319,11 @@ void StorageObjectStorageCluster::truncate(
335319
ContextPtr local_context,
336320
TableExclusiveLockHolder & lock_holder)
337321
{
338-
return getPureStorage(local_context)->truncate(query, metadata_snapshot, local_context, lock_holder);
322+
/// Full query if fall back to pure storage.
323+
if (getClusterName(local_context).empty())
324+
return pure_storage->truncate(query, metadata_snapshot, local_context, lock_holder);
325+
326+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
339327
}
340328

341329
void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ class StorageObjectStorageCluster : public IStorageCluster
8484
ContextPtr context,
8585
bool async_insert) override;
8686

87-
std::shared_ptr<StorageObjectStorage> getPureStorage(ContextPtr context);
88-
8987
/*
9088
In case the table was created with `object_storage_cluster` setting,
9189
modify the AST query object so that it uses the table function implementation
@@ -105,12 +103,8 @@ class StorageObjectStorageCluster : public IStorageCluster
105103
const ObjectStoragePtr object_storage;
106104
bool cluster_name_in_settings;
107105

108-
std::mutex mutex;
106+
/// non-clustered storage to fall back on pure realisation if needed
109107
std::shared_ptr<StorageObjectStorage> pure_storage;
110-
String comment;
111-
std::optional<FormatSettings> format_settings;
112-
LoadingStrictnessLevel mode;
113-
ASTPtr partition_by;
114108
};
115109

116110
}

0 commit comments

Comments
 (0)