Skip to content

Commit 40e6b5b

Browse files
committed
fix: Implement chunking for large SBF filters
This commit adds chunking functionalities for load/save operations of bloom filters. Additional information is added in the serialization of each filter. Specifically, when saving each filter the total size of the filter is written followed by chunks of the filter (max size of 64 MB per chunk). Signed-off-by: Eric <[email protected]>
1 parent 5bbaa0d commit 40e6b5b

File tree

3 files changed

+29
-5
lines changed

3 files changed

+29
-5
lines changed

src/server/rdb_load.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1874,7 +1874,23 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
18741874
unsigned hash_cnt;
18751875
string filter_data;
18761876
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
1877-
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
1877+
1878+
unsigned total_size = 0;
1879+
SET_OR_UNEXPECT(LoadLen(nullptr), total_size);
1880+
1881+
filter_data.resize(total_size);
1882+
size_t offset = 0;
1883+
while (offset < total_size) {
1884+
unsigned chunk_size = 0;
1885+
SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size);
1886+
error_code ec = FetchBuf(chunk_size, filter_data.data() + offset);
1887+
if (ec) {
1888+
return make_unexpected(ec);
1889+
}
1890+
1891+
offset += chunk_size;
1892+
}
1893+
18781894
size_t bit_len = filter_data.size() * 8;
18791895
if (!is_power2(bit_len)) { // must be power of two
18801896
return Unexpected(errc::rdb_file_corrupted);

src/server/rdb_save.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -623,11 +623,17 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
623623
RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i)));
624624

625625
string_view blob = sbf->data(i);
626-
RETURN_ON_ERR(SaveString(blob));
627-
FlushState flush_state = FlushState::kFlushMidEntry;
628-
if ((i + 1) == sbf->num_filters())
629-
flush_state = FlushState::kFlushEndEntry;
626+
size_t num_chunks = (blob.size() + kFilterChunkSize - 1) / kFilterChunkSize;
627+
RETURN_ON_ERR(SaveLen(blob.size()));
630628

629+
for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) {
630+
size_t offset = chunk_idx * kFilterChunkSize;
631+
size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset);
632+
RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len)));
633+
}
634+
635+
FlushState flush_state =
636+
(i + 1 == sbf->num_filters()) ? FlushState::kFlushEndEntry : FlushState::kFlushMidEntry;
631637
FlushIfNeeded(flush_state);
632638
}
633639

src/server/rdb_save.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ extern "C" {
2020
#include "server/journal/types.h"
2121
#include "server/table.h"
2222

23+
constexpr size_t kFilterChunkSize = 1ULL << 26;
24+
2325
typedef struct rax rax;
2426
typedef struct streamCG streamCG;
2527
typedef struct quicklistNode quicklistNode;

0 commit comments

Comments
 (0)