diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 823afea..47df730 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -12,6 +12,7 @@ add_executable(compio_benchmarks src/defragmentation_benchmark.cpp src/lookup_benchmark.cpp src/checksum_benchmark.cpp + src/scatter.cpp ) target_link_libraries(compio_benchmarks PRIVATE compio benchmark::benchmark) target_include_directories(compio_benchmarks PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) diff --git a/benchmarks/src/scatter.cpp b/benchmarks/src/scatter.cpp new file mode 100644 index 0000000..da238f5 --- /dev/null +++ b/benchmarks/src/scatter.cpp @@ -0,0 +1,106 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "benchmark_util.hpp" + +#include "compio.h" + +#include "sample_data.hpp" + +constexpr size_t UNCOMPRESSED_SIZE = 1 << 18; +constexpr size_t READ_SIZE = 4096; +constexpr size_t OPS_PER_BENCH = 1000; + +static constexpr size_t HTML_DATA_SIZE = sizeof(html_data); + +extern compio_config config; + +static void BM_compio_ScatterWrite(benchmark::State &state) { + const size_t block_size = state.range(0); + config.block_size = block_size; + config.block_size__minimum = block_size / 4; + config.block_size__maximum = block_size * 4; + + std::vector payload(UNCOMPRESSED_SIZE); + for (size_t i = 0; i < UNCOMPRESSED_SIZE; i += HTML_DATA_SIZE) { + size_t copy = std::min(HTML_DATA_SIZE, UNCOMPRESSED_SIZE - i); + std::copy_n(html_data, copy, payload.data() + i); + } + + std::vector write_buffer(READ_SIZE); + for (size_t i = 0; i < READ_SIZE; ++i) { + write_buffer[i] = html_data[i % HTML_DATA_SIZE]; + } + + std::vector> read_ops; + std::mt19937 rng(42); + std::uniform_int_distribution pos_dist(0, UNCOMPRESSED_SIZE - READ_SIZE); + for (size_t i = 0; i < OPS_PER_BENCH; ++i) { + read_ops.emplace_back(pos_dist(rng), READ_SIZE); + } + + std::string fn = get_temporary_filename(); + + { + compio_archive *arch = compio_open_archive(fn.c_str(), "w+", &config); + if (!arch) + throw std::runtime_error("compio_open_archive failed"); + compio_file *f = compio_open_file("A", arch); + if (!f) { + compio_close_archive(arch); + throw std::runtime_error("compio_open_file failed"); + } + size_t written = compio_write(payload.data(), payload.size(), f); + if (written != payload.size()) { + compio_close_file(f); + compio_close_archive(arch); + throw std::runtime_error("compio_write failed"); + } + compio_close_file(f); + compio_close_archive(arch); + } + + size_t total_bytes = 0; + for (auto _ : state) { + compio_archive *arch = compio_open_archive(fn.c_str(), "r+", &config); // works with 'a' + if (!arch) { + state.SkipWithError("open archive failed"); + break; + } + compio_file *f = compio_open_file("A", arch); + if (!f) { + compio_close_archive(arch); + state.SkipWithError("open file failed"); + break; + } + + for (auto [pos, len] : read_ops) { + if (compio_seek(f, pos, COMPIO_SEEK_SET) != 0) { + state.SkipWithError("seek failed"); + break; + } + size_t n = compio_write(write_buffer.data(), len, f); + if (n != len) { + state.SkipWithError("write failed"); + break; + } + total_bytes += len; + compio_flush(arch); + } + + compio_close_file(f); + compio_close_archive(arch); + } + + state.SetBytesProcessed(total_bytes); + state.counters["file_size"] = get_file_size(fn.c_str()); + remove(fn.c_str()); + remove((fn + ".wal").c_str()); +} + +BENCHMARK(BM_compio_ScatterWrite)->Arg(1 << 8)->Unit(benchmark::kMillisecond)->UseRealTime(); \ No newline at end of file diff --git a/src/compio.cpp b/src/compio.cpp index 2e8d385..ea56107 100644 --- a/src/compio.cpp +++ b/src/compio.cpp @@ -212,33 +212,33 @@ compio_archive *compio_open_archive(const char *fp, const char *mode, const comp return NULL; } - // if w+ passed as mode, we have to clear file contents (using w+) - // otherwise we open with a+ mode to read and write - char archive_open_mode[5]; - int mode_idx = 0; - - // Fix: Force update (+) mode for 'w' and 'a' to allow internal reads (e.g. reading header in 'a' mode) - // This matches previous behavior where 'w'/'a' implied 'w+'/'a+' capability for the library. - bool force_plus = (mode_b & mode_bit::w) || (mode_b & mode_bit::a); - + // The library manages file positions itself via fseek+fread/fwrite, so the + // underlying FILE* must NOT have POSIX O_APPEND semantics (which would force + // every write to EOF regardless of fseek). That rules out fopen("a"/"a+"). + // + // Mapping to fopen modes: + // 'r' -> "rb" (read-only) + // 'r+' -> "rb+" (read/write, file must exist) + // 'w'/'w+' -> "wb+" (truncate or create, read/write) + // 'a'/'a+' -> "rb+" (read/write, no truncate); create-if-missing + // fallback to "wb+". Append semantics are enforced at the + // logical level by compio_open_file (cursor = file->size). + const char *archive_open_mode; if (mode_b & mode_bit::w) { - archive_open_mode[mode_idx++] = 'w'; + archive_open_mode = "wb+"; } else if (mode_b & mode_bit::a) { - archive_open_mode[mode_idx++] = 'a'; + archive_open_mode = "rb+"; + } else if (mode_b & mode_bit::plus) { + archive_open_mode = "rb+"; } else { - archive_open_mode[mode_idx++] = 'r'; - } - - archive_open_mode[mode_idx++] = 'b'; - - if ((mode_b & mode_bit::plus) || force_plus) { - archive_open_mode[mode_idx++] = '+'; + archive_open_mode = "rb"; } - - archive_open_mode[mode_idx] = '\0'; - FILE *file; - file = fopen(fp, archive_open_mode); + FILE *file = fopen(fp, archive_open_mode); + if (file == nullptr && (mode_b & mode_bit::a)) { + // 'a'/'a+' must create the file if it does not exist. + file = fopen(fp, "wb+"); + } if (file == nullptr) { return NULL; } @@ -258,10 +258,10 @@ compio_archive *compio_open_archive(const char *fp, const char *mode, const comp // Check for recovery (only if we are not creating a new file from scratch with "w") if (!(mode_b & mode_bit::w)) { if (wal->has_pending_recovery()) { - // Need read-write access to file for recovery - // We use the derived force_plus logic or explicit plus - bool can_write = (mode_b & mode_bit::plus) || (mode_b & mode_bit::a) || force_plus; - + // Recovery needs read-write access to the archive file. 'r' alone is + // read-only; everything else (r+, a, a+, w, w+) can write. + bool can_write = !((mode_b & mode_bit::r) && !(mode_b & mode_bit::plus)); + if (!can_write) { WARNING_PRINT("error: WAL file exists but opening in read-only mode. Cannot recover pending transactions.\n"); fclose(file); @@ -419,8 +419,16 @@ compio_archive *compio_open_archive(const char *fp, const char *mode, const comp goto no_allocator; } - archive->index = new btree(c->b_tree_degree, mode_b & mode_bit::r, archive->header, + // is_readonly=true only for pure 'r' (no '+', 'w', or 'a' bits). + // 'r+' sets both r and plus bits → it must be read-write so btree nodes are persisted. + { + bool btree_readonly = (mode_b & mode_bit::r) && + !(mode_b & mode_bit::plus) && + !(mode_b & mode_bit::w) && + !(mode_b & mode_bit::a); + archive->index = new btree(c->b_tree_degree, btree_readonly, archive->header, archive->allocator, file, c->cache_size__nodes, &archive->io_mutex, archive->wal.get()); + } if (!archive->index) { WARNING_PRINT("warning: failed to allocate memory for btree\n"); goto no_index; diff --git a/src/storage_block_reader.cpp b/src/storage_block_reader.cpp index 9927f8a..72e394f 100644 --- a/src/storage_block_reader.cpp +++ b/src/storage_block_reader.cpp @@ -118,7 +118,10 @@ block::~block() { // } // though it would be better to pass this logic to allocator, and allocate memory again - context.allocator->deallocate(_addr, _c_size + STORAGE_BLOCK_METASIZE); + // Disable maintenance during flush: defrag reads the btree index, but the index + // is only partially updated while clear_cache() destructors are still running. + // Running defrag mid-flush would see stale block addresses and corrupt the archive. + context.allocator->deallocate(_addr, _c_size + STORAGE_BLOCK_METASIZE, false); } uint64_t new_addr = context.allocator->allocate(STORAGE_BLOCK_METASIZE + b.size); @@ -317,9 +320,9 @@ std::shared_ptr storage_block_reader::create_block(uint64_t size, tree_ke void storage_block_reader::clear_cache() { DEBUG_PRINT("[SBR][clear_cache]\n"); - + auto blocks = cache.extract_all(); - + // Sort blocks by address to optimize reallocation during flush. // We prioritize existing blocks (addr != 0) over new blocks (addr == 0). // Existing blocks free their old space first, creating holes.