Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: introduce inverted index file format & writer & reader #9844

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,18 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_download, {{"type", "download"}}, ExpBuckets{0.001, 2, 20}), \
F(type_view, {{"type", "view"}}, ExpBuckets{0.001, 2, 20}), \
F(type_search, {{"type", "search"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_inverted_index_active_instances, \
"Active Inverted index instances", \
Gauge, \
F(type_build, {"type", "build"}), \
F(type_view, {"type", "view"})) \
M(tiflash_inverted_index_duration, \
"Inverted index operation duration", \
Histogram, \
F(type_build, {{"type", "build"}}, ExpBuckets{0.001, 2, 20}), \
F(type_download, {{"type", "download"}}, ExpBuckets{0.001, 2, 20}), \
F(type_view, {{"type", "view"}}, ExpBuckets{0.001, 2, 20}), \
F(type_search, {{"type", "search"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_io_limiter_pending_count, \
"I/O limiter pending count", \
Counter, \
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class BitmapFilter
void set(BlockInputStreamPtr & stream);
// f[start, satrt+limit) = value
void set(UInt32 start, UInt32 limit, bool value = true);
void set(std::span<const UInt32> row_ids, const FilterPtr & f);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// Caller should ensure n in [0, size).
Expand All @@ -48,8 +49,6 @@ class BitmapFilter
friend class BitmapFilterView;

private:
void set(std::span<const UInt32> row_ids, const FilterPtr & f);

IColumn::Filter filter;
bool all_match;
};
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_headers_and_sources(delta_merge .)
add_headers_and_sources(delta_merge ./BitmapFilter)
add_headers_and_sources(delta_merge ./Index)
add_headers_and_sources(delta_merge ./Index/VectorIndex)
add_headers_and_sources(delta_merge ./Index/InvertedIndex)
add_headers_and_sources(delta_merge ./Filter)
add_headers_and_sources(delta_merge ./FilterParser)
add_headers_and_sources(delta_merge ./File)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyLocalIndexWriter.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Writer.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/DeltaMerge/Index/LocalIndexWriter.h>
#include <Storages/DeltaMerge/dtpb/column_file.pb.h>


namespace DB::ErrorCodes
Expand Down Expand Up @@ -94,7 +94,7 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
struct IndexToBuild
{
LocalIndexInfo info;
VectorIndexWriterInMemoryPtr builder_vector;
LocalIndexWriterInMemoryPtr index_writer;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;
Expand All @@ -107,7 +107,7 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
RUNTIME_CHECK(index_info.def_vector_index != nullptr);
index_builders[index_info.column_id].emplace_back(IndexToBuild{
.info = index_info,
.builder_vector = {},
.index_writer = {},
});
}

Expand All @@ -125,17 +125,8 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
file->getDataPageId());

for (auto & index : indexes)
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexWriterInMemory::create(index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
index.index_writer = LocalIndexWriter::createInMemory(index.info);

read_columns->push_back(*cd_iter);
}

Expand Down Expand Up @@ -171,16 +162,8 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
const auto & col = col_with_type_and_name.column;
for (const auto & index : index_builders[read_columns->at(col_idx).id])
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
RUNTIME_CHECK(index.builder_vector);
index.builder_vector->addBlock(*col, del_mark, should_proceed);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
RUNTIME_CHECK(index.index_writer);
index.index_writer->addBlock(*col, del_mark, should_proceed);
}
}
}
Expand All @@ -192,40 +175,19 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
const auto & cd = read_columns->at(col_idx);
for (const auto & index : index_builders[cd.id])
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
{
RUNTIME_CHECK(index.builder_vector);
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index.builder_vector->finalize(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});

auto idx_info = dtpb::ColumnFileIndexInfo{};
idx_info.set_index_page_id(index_page_id);
auto * idx_props = idx_info.mutable_index_props();
idx_props->set_kind(dtpb::IndexFileKind::VECTOR_INDEX);
idx_props->set_index_id(index.info.index_id);
idx_props->set_file_size(data_size);
auto * vector_index = idx_props->mutable_vector_index();
vector_index->set_format_version(0);
vector_index->set_dimensions(index.info.def_vector_index->dimension);
vector_index->set_distance_metric(
tipb::VectorDistanceMetric_Name(index.info.def_vector_index->distance_metric));
index_infos->emplace_back(std::move(idx_info));

break;
}
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
RUNTIME_CHECK(index.index_writer);
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
dtpb::ColumnFileIndexInfo pb_cf_idx;
pb_cf_idx.set_index_page_id(index_page_id);
auto idx_info = index.index_writer->finalize(compressed, [&write_buf] { return write_buf.count(); });
pb_cf_idx.mutable_index_props()->Swap(&idx_info);
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data withiout fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});
Copy link
Member

@CalvinNeo CalvinNeo Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What magnitude is the size of the page for the inverted index going to be? Is it BlockSize or times of BlockSize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AfterCompressed(MetaSize + BlockCount * BlockSize)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to set data_sizes if we always read the whole page from disk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnFileDataProviderRNLocalPageCache currently does not support read data withiout fields

index_infos->emplace_back(std::move(pb_cf_idx));
}
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ class DMFile : private boost::noncopyable
{
case TiDB::ColumnarIndexKind::Vector:
return fmt::format("idx_{}.vector", index_id);
case TiDB::ColumnarIndexKind::Inverted:
return fmt::format("idx_{}.inverted", index_id);
default:
throw Exception(fmt::format("Unsupported index kind: {}", magic_enum::enum_name(kind)));
}
Expand Down
65 changes: 12 additions & 53 deletions dbms/src/Storages/DeltaMerge/File/DMFileLocalIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
#include <Storages/DeltaMerge/File/DMFileLocalIndexWriter.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Writer.h>
#include <Storages/DeltaMerge/Index/LocalIndexWriter.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/PathPool.h>
#include <tipb/executor.pb.h>

#include <unordered_map>


namespace DB::ErrorCodes
{
extern const int ABORTED;
}

namespace DB::FailPoints
{
extern const char exception_build_local_index_for_file[];
Expand Down Expand Up @@ -116,7 +116,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
LocalIndexInfo info;
String index_file_path; // For write out
String index_file_name; // For meta include
VectorIndexWriterOnDiskPtr builder_vector;
LocalIndexWriterOnDiskPtr index_writer;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;
Expand All @@ -127,7 +127,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
.info = index_info,
.index_file_path = "",
.index_file_name = "",
.builder_vector = {},
.index_writer = {},
});
}

Expand Down Expand Up @@ -156,17 +156,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
index.info.column_id,
index.info.index_id);

switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexWriterOnDisk::create( //
index.index_file_path,
index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
index.index_writer = LocalIndexWriter::createOnDisk(index.index_file_path, index.info);
}
read_columns.push_back(*cd_iter);
}
Expand Down Expand Up @@ -213,16 +203,8 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
const auto & col = col_with_type_and_name.column;
for (const auto & index : index_builders[read_columns[col_idx].id])
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
RUNTIME_CHECK(index.builder_vector);
index.builder_vector->addBlock(*col, del_mark, should_proceed);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
RUNTIME_CHECK(index.index_writer);
index.index_writer->addBlock(*col, del_mark, should_proceed);
}
}
}
Expand All @@ -240,33 +222,10 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab

for (const auto & index : index_builders[cd.id])
{
dtpb::DMFileIndexInfo pb_dmfile_idx{};
auto * pb_idx = pb_dmfile_idx.mutable_index_props();

switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
{
index.builder_vector->finalize();
auto * pb_vec_idx = pb_idx->mutable_vector_index();
pb_vec_idx->set_format_version(0);
pb_vec_idx->set_dimensions(index.info.def_vector_index->dimension);
pb_vec_idx->set_distance_metric(
tipb::VectorDistanceMetric_Name(index.info.def_vector_index->distance_metric));
break;
}
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}

auto index_file = Poco::File(index.index_file_path);
RUNTIME_CHECK(index_file.exists());
pb_idx->set_kind(index.info.getKindAsDtpb());
pb_idx->set_index_id(index.info.index_id);
pb_idx->set_file_size(index_file.getSize());

total_built_index_bytes += pb_idx->file_size();
dtpb::DMFileIndexInfo pb_dmfile_idx;
auto idx_info = index.index_writer->finalize();
pb_dmfile_idx.mutable_index_props()->Swap(&idx_info);
total_built_index_bytes += pb_dmfile_idx.index_props().file_size();
new_indexes.emplace_back(std::move(pb_dmfile_idx));
iw->include(index.index_file_name);
}
Expand Down
Loading