Skip to content

Commit

Permalink
dl/translator: prepartory change for translator porting
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Feb 12, 2025
1 parent 90c713a commit 1c70c57
Show file tree
Hide file tree
Showing 27 changed files with 468 additions and 167 deletions.
27 changes: 25 additions & 2 deletions src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ inline std::error_code make_error_code(writer_error e) noexcept {
return {static_cast<int>(e), data_writer_error_category::error_category()};
}

class writer_mem_tracker {
public:
writer_mem_tracker() = default;
writer_mem_tracker(const writer_mem_tracker&) = delete;
writer_mem_tracker(writer_mem_tracker&&) = default;
writer_mem_tracker& operator=(const writer_mem_tracker&) = delete;
writer_mem_tracker& operator=(writer_mem_tracker&&) = delete;

virtual ~writer_mem_tracker() = default;

virtual ss::future<> maybe_reserve_memory(size_t bytes) = 0;

virtual void update_current_memory_usage(size_t) = 0;

virtual void release() = 0;
};

/**
* Parquet writer interface. The writer should write parquet serialized data to
* the output stream provided during its creation.
Expand All @@ -58,6 +75,10 @@ class parquet_ostream {
virtual ss::future<writer_error>
add_data_struct(iceberg::struct_value, size_t) = 0;

virtual size_t buffered_bytes() const = 0;
virtual size_t flushed_bytes() const = 0;

virtual ss::future<> flush() = 0;
virtual ss::future<writer_error> finish() = 0;
};

Expand All @@ -72,8 +93,9 @@ class parquet_ostream_factory {

virtual ~parquet_ostream_factory() = default;

virtual ss::future<std::unique_ptr<parquet_ostream>>
create_writer(const iceberg::struct_type&, ss::output_stream<char>) = 0;
virtual ss::future<std::unique_ptr<parquet_ostream>> create_writer(
const iceberg::struct_type&, ss::output_stream<char>, writer_mem_tracker&)
= 0;
};

/**
Expand All @@ -96,6 +118,7 @@ class parquet_file_writer {
iceberg::struct_value /* data */, int64_t /* approx_size */)
= 0;

virtual ss::future<> flush() = 0;
virtual ss::future<result<local_file_metadata, writer_error>> finish() = 0;
};

Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace datalake {
struct data_writer_result;
class record_multiplexer;
class record_translator;
class schema_manager;
class type_resolver;
Expand Down
25 changes: 18 additions & 7 deletions src/v/datalake/local_parquet_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ namespace datalake {

local_parquet_file_writer::local_parquet_file_writer(
local_path output_file_path,
ss::shared_ptr<parquet_ostream_factory> writer_factory)
ss::shared_ptr<parquet_ostream_factory> writer_factory,
writer_mem_tracker& reservations)
: _output_file_path(std::move(output_file_path))
, _writer_factory(std::move(writer_factory)) {}
, _writer_factory(std::move(writer_factory))
, _mem_tracker(reservations) {}

ss::future<checked<std::nullopt_t, writer_error>>
local_parquet_file_writer::initialize(const iceberg::struct_type& schema) {
vlog(datalake_log.info, "Writing Parquet file to {}", _output_file_path);
vlog(datalake_log.debug, "Writing Parquet file to {}", _output_file_path);
try {
_output_file = co_await ss::open_file_dma(
_output_file_path().string(),
Expand Down Expand Up @@ -56,7 +58,7 @@ local_parquet_file_writer::initialize(const iceberg::struct_type& schema) {
}

_writer = co_await _writer_factory->create_writer(
schema, std::move(fut.get()));
schema, std::move(fut.get()), _mem_tracker);
_initialized = true;
co_return std::nullopt;
}
Expand All @@ -83,6 +85,13 @@ ss::future<writer_error> local_parquet_file_writer::add_data_struct(
co_return writer_error::ok;
}

ss::future<> local_parquet_file_writer::flush() {
if (!_initialized) {
return ss::make_ready_future();
}
return _writer->flush();
}

ss::future<result<local_file_metadata, writer_error>>
local_parquet_file_writer::finish() {
if (!_initialized) {
Expand Down Expand Up @@ -133,16 +142,18 @@ local_path local_parquet_file_writer_factory::create_filename() const {
local_parquet_file_writer_factory::local_parquet_file_writer_factory(
local_path base_directory,
ss::sstring file_name_prefix,
ss::shared_ptr<parquet_ostream_factory> writer_factory)
ss::shared_ptr<parquet_ostream_factory> writer_factory,
std::unique_ptr<writer_mem_tracker> mem_tracker)
: _base_directory(std::move(base_directory))
, _file_name_prefix(std::move(file_name_prefix))
, _writer_factory(std::move(writer_factory)) {}
, _writer_factory(std::move(writer_factory))
, _mem_tracker(std::move(mem_tracker)) {}

ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
local_parquet_file_writer_factory::create_writer(
const iceberg::struct_type& schema) {
auto writer = std::make_unique<local_parquet_file_writer>(
create_filename(), _writer_factory);
create_filename(), _writer_factory, *_mem_tracker);

auto res = co_await writer->initialize(schema);
if (res.has_error()) {
Expand Down
9 changes: 7 additions & 2 deletions src/v/datalake/local_parquet_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ namespace datalake {
class local_parquet_file_writer : public parquet_file_writer {
public:
local_parquet_file_writer(
local_path, ss::shared_ptr<parquet_ostream_factory>);
local_path, ss::shared_ptr<parquet_ostream_factory>, writer_mem_tracker&);

ss::future<checked<std::nullopt_t, writer_error>>
initialize(const iceberg::struct_type&);

ss::future<writer_error> add_data_struct(
iceberg::struct_value /* data */, int64_t /* approx_size */) final;

ss::future<> flush() final;

ss::future<result<local_file_metadata, writer_error>> finish() final;

private:
Expand All @@ -44,6 +46,7 @@ class local_parquet_file_writer : public parquet_file_writer {

std::unique_ptr<parquet_ostream> _writer;
ss::shared_ptr<parquet_ostream_factory> _writer_factory;
writer_mem_tracker& _mem_tracker;
bool _initialized{false};
};

Expand All @@ -52,7 +55,8 @@ class local_parquet_file_writer_factory : public parquet_file_writer_factory {
local_parquet_file_writer_factory(
local_path base_directory,
ss::sstring file_name_prefix,
ss::shared_ptr<parquet_ostream_factory>);
ss::shared_ptr<parquet_ostream_factory>,
std::unique_ptr<writer_mem_tracker> reservations);

ss::future<result<std::unique_ptr<parquet_file_writer>, writer_error>>
create_writer(const iceberg::struct_type& schema) final;
Expand All @@ -63,6 +67,7 @@ class local_parquet_file_writer_factory : public parquet_file_writer_factory {
local_path _base_directory;
ss::sstring _file_name_prefix;
ss::shared_ptr<parquet_ostream_factory> _writer_factory;
std::unique_ptr<writer_mem_tracker> _mem_tracker;
};

} // namespace datalake
60 changes: 42 additions & 18 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@

namespace datalake {

namespace {

template<typename Func>
requires requires(Func f, model::record_batch batch) {
{ f(std::move(batch)) } -> std::same_as<ss::future<ss::stop_iteration>>;
}
class relaying_consumer {
public:
explicit relaying_consumer(Func f)
: _func(std::move(f)) {}

ss::future<ss::stop_iteration> operator()(model::record_batch b) {
return _func(std::move(b));
}
void end_of_stream() {}

private:
Func _func;
};
} // namespace

record_multiplexer::record_multiplexer(
const model::ntp& ntp,
model::revision_id topic_revision,
Expand All @@ -35,8 +56,7 @@ record_multiplexer::record_multiplexer(
record_translator& record_translator,
table_creator& table_creator,
model::iceberg_invalid_record_action invalid_record_action,
location_provider location_provider,
lazy_abort_source& as)
location_provider location_provider)
: _log(datalake_log, fmt::format("{}", ntp))
, _ntp(ntp)
, _topic_revision(topic_revision)
Expand All @@ -46,16 +66,23 @@ record_multiplexer::record_multiplexer(
, _record_translator(record_translator)
, _table_creator(table_creator)
, _invalid_record_action(invalid_record_action)
, _location_provider(std::move(location_provider))
, _as(as) {}

ss::future<ss::stop_iteration>
record_multiplexer::operator()(model::record_batch batch) {
if (_as.abort_requested()) {
vlog(
_log.debug,
"Abort requested, stopping translation, reason: {}",
_as.abort_reason());
, _location_provider(std::move(location_provider)) {}

ss::future<> record_multiplexer::multiplex(
model::record_batch_reader reader,
model::timeout_clock::time_point deadline,
ss::abort_source& as) {
co_await std::move(reader).consume(
relaying_consumer{[this, &as](model::record_batch b) mutable {
return do_multiplex(std::move(b), as);
}},
deadline);
}

ss::future<ss::stop_iteration> record_multiplexer::do_multiplex(
model::record_batch batch, ss::abort_source& as) {
if (as.abort_requested()) {
vlog(_log.debug, "Abort requested, stopping translation");
co_return ss::stop_iteration::yes;
}
if (batch.compressed()) {
Expand All @@ -66,11 +93,8 @@ record_multiplexer::operator()(model::record_batch batch) {
auto it = model::record_batch_iterator::create(batch);

while (it.has_next()) {
if (_as.abort_requested()) {
vlog(
_log.debug,
"Abort requested, stopping translation, reason: {}",
_as.abort_reason());
if (as.abort_requested()) {
vlog(_log.debug, "Abort requested, stopping translation");
co_return ss::stop_iteration::yes;
}
auto record = it.next();
Expand Down Expand Up @@ -260,7 +284,7 @@ record_multiplexer::operator()(model::record_batch batch) {
}

ss::future<result<record_multiplexer::write_result, writer_error>>
record_multiplexer::end_of_stream() {
record_multiplexer::finish() && {
if (_error) {
co_return *_error;
}
Expand Down
17 changes: 11 additions & 6 deletions src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include "datalake/partitioning_writer.h"
#include "datalake/schema_identifier.h"
#include "model/record.h"
#include "utils/lazy_abort_source.h"
#include "model/record_batch_reader.h"
#include "utils/prefix_logger.h"

#include <seastar/core/future.hh>
Expand Down Expand Up @@ -53,11 +53,17 @@ class record_multiplexer {
record_translator& record_translator,
table_creator&,
model::iceberg_invalid_record_action,
location_provider,
lazy_abort_source& as);
location_provider);

ss::future<ss::stop_iteration> operator()(model::record_batch batch);
ss::future<result<write_result, writer_error>> end_of_stream();
ss::future<> multiplex(
model::record_batch_reader reader,
model::timeout_clock::time_point deadline,
ss::abort_source& as);

ss::future<ss::stop_iteration>
do_multiplex(model::record_batch batch, ss::abort_source&);

ss::future<result<write_result, writer_error>> finish() &&;

private:
// Handles the given record components of a record that is invalid for the
Expand All @@ -80,7 +86,6 @@ class record_multiplexer {
table_creator& _table_creator;
model::iceberg_invalid_record_action _invalid_record_action;
location_provider _location_provider;
lazy_abort_source& _as;
chunked_hash_map<
record_schema_components,
std::unique_ptr<partitioning_writer>>
Expand Down
34 changes: 29 additions & 5 deletions src/v/datalake/serde_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

namespace datalake {

ss::future<writer_error>
serde_parquet_writer::add_data_struct(iceberg::struct_value value, size_t) {
ss::future<writer_error> serde_parquet_writer::add_data_struct(
iceberg::struct_value value, size_t approx_size) {
auto conversion_result = co_await to_parquet_value(
std::make_unique<iceberg::struct_value>(std::move(value)));
if (conversion_result.has_error()) {
Expand All @@ -19,7 +19,11 @@ serde_parquet_writer::add_data_struct(iceberg::struct_value value, size_t) {
auto group = std::get<serde::parquet::group_value>(
std::move(conversion_result.value()));
try {
co_await _writer.write_row(std::move(group));
co_await _mem_tracker.maybe_reserve_memory(approx_size);
auto stats = co_await _writer.write_row(std::move(group));
_buffered_bytes = stats.buffered_size;
_flushed_bytes = stats.flushed_size;
_mem_tracker.update_current_memory_usage(_buffered_bytes);
} catch (...) {
vlog(
datalake_log.warn,
Expand All @@ -30,14 +34,33 @@ serde_parquet_writer::add_data_struct(iceberg::struct_value value, size_t) {
co_return writer_error::ok;
}

size_t serde_parquet_writer::buffered_bytes() const { return _buffered_bytes; }
size_t serde_parquet_writer::flushed_bytes() const { return _flushed_bytes; }

ss::future<> serde_parquet_writer::flush() {
co_await _writer.flush_row_group();
auto stats = _writer.stats();
_buffered_bytes = stats.buffered_size;
_flushed_bytes = stats.flushed_size;
vassert(
_buffered_bytes == 0,
"Memory buffered in the writer after flush: {}",
_buffered_bytes);
_mem_tracker.release();
}

ss::future<writer_error> serde_parquet_writer::finish() {
co_await _writer.close();
_mem_tracker.release();
_buffered_bytes = _flushed_bytes = 0;
co_return writer_error::ok;
}

ss::future<std::unique_ptr<parquet_ostream>>
serde_parquet_writer_factory::create_writer(
const iceberg::struct_type& schema, ss::output_stream<char> out) {
const iceberg::struct_type& schema,
ss::output_stream<char> out,
writer_mem_tracker& mem_tracker) {
serde::parquet::writer::options opts{
.schema = schema_to_parquet(schema),
.version = ss::sstring(redpanda_git_version()),
Expand All @@ -46,7 +69,8 @@ serde_parquet_writer_factory::create_writer(
};
serde::parquet::writer writer(std::move(opts), std::move(out));
co_await writer.init();
co_return std::make_unique<serde_parquet_writer>(std::move(writer));
co_return std::make_unique<serde_parquet_writer>(
std::move(writer), mem_tracker);
}

} // namespace datalake
Loading

0 comments on commit 1c70c57

Please sign in to comment.