Skip to content

25.3.5 backport 78926 Add _time virtual column in S3Queue engine #895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/en/engines/table-engines/integrations/s3queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ Example:

- `_path` — Path to the file.
- `_file` — Name of the file.
- `_size` — Size of the file.
- `_time` — Time of the file creation.

For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).

Expand Down
5 changes: 4 additions & 1 deletion src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -958,11 +958,14 @@ Chunk ObjectStorageQueueSource::generateImpl()
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueReadRows, chunk.getNumRows());
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueReadBytes, chunk.bytes());

const auto & object_metadata = reader.getObjectInfo()->metadata;

VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, read_from_format_info.requested_virtual_columns,
{
.path = path,
.size = reader.getObjectInfo()->metadata->size_bytes
.size = object_metadata->size_bytes,
.last_modified = object_metadata->last_modified
}, getContext());

return chunk;
Expand Down
17 changes: 13 additions & 4 deletions tests/integration/helpers/s3_queue_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def create_mv(
mv_name=None,
create_dst_table_first=True,
format="column1 UInt32, column2 UInt32, column3 UInt32",
virtual_columns="_path String",
):
if mv_name is None:
mv_name = f"{src_table_name}_mv"
Expand All @@ -292,21 +293,29 @@ def create_mv(
DROP TABLE IF EXISTS {mv_name};
""")

virtual_format = ""
virtual_names = ""
virtual_columns_list = virtual_columns.split(",")
for column in virtual_columns_list:
virtual_format += f", {column}"
name, _ = column.strip().rsplit(" ", 1)
virtual_names += f", {name}"

if create_dst_table_first:
node.query(
f"""
CREATE TABLE {dst_table_name} ({format}, _path String)
CREATE TABLE {dst_table_name} ({format} {virtual_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT * {virtual_names} FROM {src_table_name};
"""
)
else:
node.query(
f"""
SET allow_materialized_view_with_bad_select=1;
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
CREATE TABLE {dst_table_name} ({format}, _path String)
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT * {virtual_names} FROM {src_table_name};
CREATE TABLE {dst_table_name} ({format} {virtual_format})
ENGINE = MergeTree()
ORDER BY column1;
"""
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/test_storage_s3_queue/test_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import string
import time
import uuid
from datetime import datetime
from multiprocessing.dummy import Pool

import pytest
Expand Down Expand Up @@ -595,3 +596,46 @@ def test_multiple_tables_meta_mismatch(started_cluster):
"keeper_path": keeper_path,
},
)


def test_virtual_columns(started_cluster):
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
node = started_cluster.instances["instance"]
table_name = f"test_s3queue_virtual_columns_{generate_random_string()}"
# A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"

total_values = generate_random_files(started_cluster, files_path, 1)
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={"keeper_path": keeper_path},
)
create_mv(node, table_name, dst_table_name, virtual_columns="_path String, _file String, _size UInt64, _time DateTime")
expected_values = set([tuple(i) for i in total_values])
for i in range(20):
selected_values = {
tuple(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
}
if selected_values == expected_values:
break
time.sleep(1)
assert selected_values == expected_values
virtual_values = node.query(
f"SELECT count(), _path, _file, _size, _time FROM {dst_table_name} GROUP BY _path, _file, _size, _time"
).splitlines()
assert len(virtual_values) > 0
(_, res_path, res_file, res_size, res_time) = virtual_values[0].split("\t")
finish_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
assert f"{files_path}/{res_file}" == res_path
assert int(res_size) > 0
assert start_time <= res_time
assert res_time <= finish_time
Loading