Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ add_executable(compio_benchmarks
src/defragmentation_benchmark.cpp
src/lookup_benchmark.cpp
src/checksum_benchmark.cpp
src/scatter.cpp
)
target_link_libraries(compio_benchmarks PRIVATE compio benchmark::benchmark)
target_include_directories(compio_benchmarks PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
Expand Down
106 changes: 106 additions & 0 deletions benchmarks/src/scatter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include <algorithm>
#include <benchmark/benchmark.h>
#include <cstdio>
#include <exception>
#include <random>
#include <sys/stat.h>
#include <vector>

#include "benchmark_util.hpp"

#include "compio.h"

#include "sample_data.hpp"

constexpr size_t UNCOMPRESSED_SIZE = 1 << 18;
constexpr size_t READ_SIZE = 4096;
constexpr size_t OPS_PER_BENCH = 1000;

static constexpr size_t HTML_DATA_SIZE = sizeof(html_data);

extern compio_config config;

static void BM_compio_ScatterWrite(benchmark::State &state) {
const size_t block_size = state.range(0);
config.block_size = block_size;
config.block_size__minimum = block_size / 4;
config.block_size__maximum = block_size * 4;

std::vector<char> payload(UNCOMPRESSED_SIZE);
for (size_t i = 0; i < UNCOMPRESSED_SIZE; i += HTML_DATA_SIZE) {
size_t copy = std::min(HTML_DATA_SIZE, UNCOMPRESSED_SIZE - i);
std::copy_n(html_data, copy, payload.data() + i);
}

std::vector<char> write_buffer(READ_SIZE);
for (size_t i = 0; i < READ_SIZE; ++i) {
write_buffer[i] = html_data[i % HTML_DATA_SIZE];
}

std::vector<std::pair<size_t, size_t>> read_ops;
std::mt19937 rng(42);
std::uniform_int_distribution<size_t> pos_dist(0, UNCOMPRESSED_SIZE - READ_SIZE);
for (size_t i = 0; i < OPS_PER_BENCH; ++i) {
read_ops.emplace_back(pos_dist(rng), READ_SIZE);
}

std::string fn = get_temporary_filename();

{
compio_archive *arch = compio_open_archive(fn.c_str(), "w+", &config);
if (!arch)
throw std::runtime_error("compio_open_archive failed");
compio_file *f = compio_open_file("A", arch);
if (!f) {
compio_close_archive(arch);
throw std::runtime_error("compio_open_file failed");
}
size_t written = compio_write(payload.data(), payload.size(), f);
if (written != payload.size()) {
compio_close_file(f);
compio_close_archive(arch);
throw std::runtime_error("compio_write failed");
}
compio_close_file(f);
compio_close_archive(arch);
}

size_t total_bytes = 0;
for (auto _ : state) {
compio_archive *arch = compio_open_archive(fn.c_str(), "r+", &config); // works with 'a'
if (!arch) {
state.SkipWithError("open archive failed");
break;
}
compio_file *f = compio_open_file("A", arch);
if (!f) {
compio_close_archive(arch);
state.SkipWithError("open file failed");
break;
}

for (auto [pos, len] : read_ops) {
if (compio_seek(f, pos, COMPIO_SEEK_SET) != 0) {
state.SkipWithError("seek failed");
break;
}
size_t n = compio_write(write_buffer.data(), len, f);
if (n != len) {
state.SkipWithError("write failed");
break;
}
total_bytes += len;
compio_flush(arch);
}

compio_close_file(f);
compio_close_archive(arch);
}

state.SetBytesProcessed(total_bytes);
state.counters["file_size"] = get_file_size(fn.c_str());
remove(fn.c_str());
remove((fn + ".wal").c_str());
}

BENCHMARK(BM_compio_ScatterWrite)->Arg(1 << 8)->Unit(benchmark::kMillisecond)->UseRealTime();
62 changes: 35 additions & 27 deletions src/compio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,33 +212,33 @@ compio_archive *compio_open_archive(const char *fp, const char *mode, const comp
return NULL;
}

// if w+ passed as mode, we have to clear file contents (using w+)
// otherwise we open with a+ mode to read and write
char archive_open_mode[5];
int mode_idx = 0;

// Fix: Force update (+) mode for 'w' and 'a' to allow internal reads (e.g. reading header in 'a' mode)
// This matches previous behavior where 'w'/'a' implied 'w+'/'a+' capability for the library.
bool force_plus = (mode_b & mode_bit::w) || (mode_b & mode_bit::a);

// The library manages file positions itself via fseek+fread/fwrite, so the
// underlying FILE* must NOT have POSIX O_APPEND semantics (which would force
// every write to EOF regardless of fseek). That rules out fopen("a"/"a+").
//
// Mapping to fopen modes:
// 'r' -> "rb" (read-only)
// 'r+' -> "rb+" (read/write, file must exist)
// 'w'/'w+' -> "wb+" (truncate or create, read/write)
// 'a'/'a+' -> "rb+" (read/write, no truncate); create-if-missing
// fallback to "wb+". Append semantics are enforced at the
// logical level by compio_open_file (cursor = file->size).
const char *archive_open_mode;
if (mode_b & mode_bit::w) {
archive_open_mode[mode_idx++] = 'w';
archive_open_mode = "wb+";
} else if (mode_b & mode_bit::a) {
archive_open_mode[mode_idx++] = 'a';
archive_open_mode = "rb+";
} else if (mode_b & mode_bit::plus) {
archive_open_mode = "rb+";
} else {
archive_open_mode[mode_idx++] = 'r';
}

archive_open_mode[mode_idx++] = 'b';

if ((mode_b & mode_bit::plus) || force_plus) {
archive_open_mode[mode_idx++] = '+';
archive_open_mode = "rb";
}

archive_open_mode[mode_idx] = '\0';

FILE *file;
file = fopen(fp, archive_open_mode);
FILE *file = fopen(fp, archive_open_mode);
if (file == nullptr && (mode_b & mode_bit::a)) {
// 'a'/'a+' must create the file if it does not exist.
file = fopen(fp, "wb+");
}
if (file == nullptr) {
return NULL;
}
Expand All @@ -258,10 +258,10 @@ compio_archive *compio_open_archive(const char *fp, const char *mode, const comp
// Check for recovery (only if we are not creating a new file from scratch with "w")
if (!(mode_b & mode_bit::w)) {
if (wal->has_pending_recovery()) {
// Need read-write access to file for recovery
// We use the derived force_plus logic or explicit plus
bool can_write = (mode_b & mode_bit::plus) || (mode_b & mode_bit::a) || force_plus;
// Recovery needs read-write access to the archive file. 'r' alone is
// read-only; everything else (r+, a, a+, w, w+) can write.
bool can_write = !((mode_b & mode_bit::r) && !(mode_b & mode_bit::plus));

if (!can_write) {
WARNING_PRINT("error: WAL file exists but opening in read-only mode. Cannot recover pending transactions.\n");
fclose(file);
Expand Down Expand Up @@ -419,8 +419,16 @@ compio_archive *compio_open_archive(const char *fp, const char *mode, const comp
goto no_allocator;
}

archive->index = new btree(c->b_tree_degree, mode_b & mode_bit::r, archive->header,
// is_readonly=true only for pure 'r' (no '+', 'w', or 'a' bits).
// 'r+' sets both r and plus bits → it must be read-write so btree nodes are persisted.
{
bool btree_readonly = (mode_b & mode_bit::r) &&
!(mode_b & mode_bit::plus) &&
!(mode_b & mode_bit::w) &&
!(mode_b & mode_bit::a);
archive->index = new btree(c->b_tree_degree, btree_readonly, archive->header,
archive->allocator, file, c->cache_size__nodes, &archive->io_mutex, archive->wal.get());
}
if (!archive->index) {
WARNING_PRINT("warning: failed to allocate memory for btree\n");
goto no_index;
Expand Down
9 changes: 6 additions & 3 deletions src/storage_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ block::~block() {
// }

// though it would be better to pass this logic to allocator, and allocate memory again
context.allocator->deallocate(_addr, _c_size + STORAGE_BLOCK_METASIZE);
// Disable maintenance during flush: defrag reads the btree index, but the index
// is only partially updated while clear_cache() destructors are still running.
// Running defrag mid-flush would see stale block addresses and corrupt the archive.
context.allocator->deallocate(_addr, _c_size + STORAGE_BLOCK_METASIZE, false);
}

uint64_t new_addr = context.allocator->allocate(STORAGE_BLOCK_METASIZE + b.size);
Expand Down Expand Up @@ -317,9 +320,9 @@ std::shared_ptr<block> storage_block_reader::create_block(uint64_t size, tree_ke

void storage_block_reader::clear_cache() {
DEBUG_PRINT("[SBR][clear_cache]\n");

auto blocks = cache.extract_all();

// Sort blocks by address to optimize reallocation during flush.
// We prioritize existing blocks (addr != 0) over new blocks (addr == 0).
// Existing blocks free their old space first, creating holes.
Expand Down
Loading