Skip to content

Commit 4d3862d

Browse files
committed
Merge pull request #615 from Altinity/project-antalya-24.12.2-distributed-table-engine
Distributed request to tables with Object Storage Engines
1 parent 1b8a417 commit 4d3862d

File tree

2 files changed

+143
-42
lines changed

2 files changed

+143
-42
lines changed

tests/integration/test_s3_cluster/test.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,3 +630,72 @@ def test_cluster_default_expression(started_cluster):
630630
)
631631

632632
assert result == expected_result
633+
634+
635+
def test_distributed_s3_table_engine(started_cluster):
636+
node = started_cluster.instances["s0_0_0"]
637+
638+
resp_def = node.query(
639+
"""
640+
SELECT * from s3Cluster(
641+
'cluster_simple',
642+
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
643+
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
644+
"""
645+
)
646+
647+
node.query("DROP TABLE IF EXISTS single_node");
648+
node.query(
649+
"""
650+
CREATE TABLE single_node
651+
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
652+
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
653+
"""
654+
)
655+
query_id_engine_single_node = str(uuid.uuid4())
656+
resp_engine_single_node = node.query(
657+
"""
658+
SELECT * FROM single_node ORDER BY (name, value, polygon)
659+
""",
660+
query_id = query_id_engine_single_node
661+
)
662+
assert resp_def == resp_engine_single_node
663+
664+
node.query("DROP TABLE IF EXISTS distributed");
665+
node.query(
666+
"""
667+
CREATE TABLE distributed
668+
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
669+
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
670+
SETTINGS object_storage_cluster='cluster_simple'
671+
"""
672+
)
673+
query_id_engine_distributed = str(uuid.uuid4())
674+
resp_engine_distributed = node.query(
675+
"""
676+
SELECT * FROM distributed ORDER BY (name, value, polygon)
677+
""",
678+
query_id = query_id_engine_distributed
679+
)
680+
assert resp_def == resp_engine_distributed
681+
682+
node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
683+
684+
hosts_engine_single_node = node.query(
685+
f"""
686+
SELECT uniq(hostname)
687+
FROM clusterAllReplicas('cluster_simple', system.query_log)
688+
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
689+
"""
690+
)
691+
assert int(hosts_engine_single_node) == 1
692+
hosts_engine_distributed = node.query(
693+
f"""
694+
SELECT uniq(hostname)
695+
FROM clusterAllReplicas('cluster_simple', system.query_log)
696+
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
697+
"""
698+
)
699+
assert int(hosts_engine_distributed) == 3
700+
701+

tests/integration/test_storage_iceberg/test.py

Lines changed: 74 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def get_creation_expression(
202202
allow_dynamic_metadata_for_data_lakes=False,
203203
use_version_hint=False,
204204
run_on_cluster=False,
205+
object_storage_cluster=False,
205206
explicit_metadata_path="",
206207
**kwargs,
207208
):
@@ -215,6 +216,9 @@ def get_creation_expression(
215216
if use_version_hint:
216217
settings_array.append("iceberg_use_version_hint = true")
217218

219+
if object_storage_cluster:
220+
settings_array.append(f"object_storage_cluster = '{object_storage_cluster}'")
221+
218222
if settings_array:
219223
settings_expression = " SETTINGS " + ",".join(settings_array)
220224
else:
@@ -313,10 +317,18 @@ def create_iceberg_table(
313317
table_name,
314318
cluster,
315319
format="Parquet",
320+
object_storage_cluster=False,
316321
**kwargs,
317322
):
318323
node.query(
319-
get_creation_expression(storage_type, table_name, cluster, format, **kwargs)
324+
get_creation_expression(
325+
storage_type,
326+
table_name,
327+
cluster,
328+
format,
329+
object_storage_cluster=object_storage_cluster,
330+
**kwargs,
331+
)
320332
)
321333

322334

@@ -657,61 +669,81 @@ def add_df(mode):
657669
.split()
658670
)
659671

672+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, object_storage_cluster='cluster_simple')
673+
query_id_cluster_table_engine = str(uuid.uuid4())
674+
select_cluster_table_engine = (
675+
instance.query(
676+
f"""
677+
SELECT * FROM {TABLE_NAME}
678+
""",
679+
query_id=query_id_cluster_table_engine,
680+
)
681+
.strip()
682+
.split()
683+
)
684+
685+
# write 3 times
686+
assert int(instance.query(f"SELECT count() FROM {table_function_expr_cluster}")) == 100 * 3
687+
688+
select_remote_cluster = (
689+
instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})")
690+
.strip()
691+
.split()
692+
)
693+
694+
instance.query(f"DROP TABLE IF EXISTS `{TABLE_NAME}` SYNC")
695+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
696+
query_id_pure_table_engine = str(uuid.uuid4())
697+
select_pure_table_engine = (
698+
instance.query(
699+
f"""
700+
SELECT * FROM {TABLE_NAME}
701+
""",
702+
query_id=query_id_pure_table_engine,
703+
)
704+
.strip()
705+
.split()
706+
)
707+
query_id_pure_table_engine_cluster = str(uuid.uuid4())
708+
select_pure_table_engine_cluster = (
709+
instance.query(
710+
f"""
711+
SELECT * FROM {TABLE_NAME}
712+
SETTINGS object_storage_cluster='cluster_simple'
713+
""",
714+
query_id=query_id_pure_table_engine_cluster,
715+
)
716+
.strip()
717+
.split()
718+
)
719+
660720
# Simple size check
661721
assert len(select_regular) == 600
662722
assert len(select_cluster) == 600
663723
assert len(select_cluster_alt_syntax) == 600
724+
assert len(select_cluster_table_engine) == 600
725+
assert len(select_remote_cluster) == 600
726+
assert len(select_pure_table_engine) == 600
727+
assert len(select_pure_table_engine_cluster) == 600
664728

665729
# Actual check
666730
assert select_cluster == select_regular
667731
assert select_cluster_alt_syntax == select_regular
732+
assert select_cluster_table_engine == select_regular
733+
assert select_remote_cluster == select_regular
734+
assert select_pure_table_engine == select_regular
735+
assert select_pure_table_engine_cluster == select_regular
668736

669737
# Check query_log
670738
for replica in started_cluster.instances.values():
671739
replica.query("SYSTEM FLUSH LOGS")
672740

673-
for node_name, replica in started_cluster.instances.items():
674-
cluster_secondary_queries = (
675-
replica.query(
676-
f"""
677-
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
678-
WHERE
679-
type = 'QueryStart'
680-
AND NOT is_initial_query
681-
AND initial_query_id='{query_id_cluster}'
682-
"""
683-
)
684-
.strip()
685-
.split("\n")
686-
)
687-
688-
logging.info(
689-
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
690-
)
691-
assert len(cluster_secondary_queries) == 1
692-
693-
for node_name, replica in started_cluster.instances.items():
694-
cluster_secondary_queries = (
695-
replica.query(
696-
f"""
697-
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
698-
WHERE
699-
type = 'QueryStart'
700-
AND NOT is_initial_query
701-
AND initial_query_id='{query_id_cluster_alt_syntax}'
702-
"""
703-
)
704-
.strip()
705-
.split("\n")
706-
)
707-
708-
logging.info(
709-
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
710-
)
711-
assert len(cluster_secondary_queries) == 1
741+
count_secondary_subqueries(started_cluster, query_id_cluster, 1, "table function")
742+
count_secondary_subqueries(started_cluster, query_id_cluster_alt_syntax, 1, "table function alt syntax")
743+
count_secondary_subqueries(started_cluster, query_id_cluster_table_engine, 1, "cluster table engine")
744+
count_secondary_subqueries(started_cluster, query_id_pure_table_engine, 0, "table engine")
745+
count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster, 1, "table engine with cluster setting")
712746

713-
# write 3 times
714-
assert int(instance.query(f"SELECT count() FROM {table_function_expr_cluster}")) == 100 * 3
715747

716748
@pytest.mark.parametrize("format_version", ["1", "2"])
717749
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])

0 commit comments

Comments
 (0)