Skip to content

Conversation

@han-yan01
Copy link
Contributor

Summary:
Add trace replay support for BroadcastWrite operator to presto_cpp, instead of ws as intermediate storage, allow user-defined path for sink data inspection

Release Notes

== NO RELEASE NOTE ==

Differential Revision: D87790828

@han-yan01 han-yan01 requested review from a team as code owners November 25, 2025 04:08
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Nov 25, 2025
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 25, 2025

Reviewer's Guide

Adds trace replay support for the BroadcastWrite operator in presto_cpp by introducing a dedicated BroadcastWriteReplayer, wiring it into the trace runner and server trace-node factory, and providing comprehensive tests (including multi-driver scenarios and configurable output directory).

Sequence diagram for BroadcastWrite trace replay with configurable output directory

sequenceDiagram
  actor Developer
  participant TraceReplayerMain
  participant PrestoTraceReplayRunner
  participant BroadcastWriteReplayer
  participant BroadcastWriteNode
  participant FileSystem

  Developer->>TraceReplayerMain: Start trace_replayer with node_name=BroadcastWrite and broadcast_write_output_dir
  TraceReplayerMain->>PrestoTraceReplayRunner: create Runner with FLAGS including broadcast_write_output_dir
  Developer->>PrestoTraceReplayRunner: run()
  PrestoTraceReplayRunner->>PrestoTraceReplayRunner: inspect nodeName
  PrestoTraceReplayRunner->>PrestoTraceReplayRunner: VELOX_USER_CHECK broadcast_write_output_dir not empty
  PrestoTraceReplayRunner->>BroadcastWriteReplayer: construct BroadcastWriteReplayer(traceDir, queryId, taskId, nodeId, nodeName, driverIds, queryCapacity, executor, replayOutputDir)

  PrestoTraceReplayRunner->>BroadcastWriteReplayer: createPlanNode(node, nodeId, source)
  BroadcastWriteReplayer->>BroadcastWriteReplayer: dynamic_cast to facebook::presto::operators::BroadcastWriteNode
  BroadcastWriteReplayer->>BroadcastWriteNode: construct BroadcastWriteNode(nodeId, replayOutputDir, maxBroadcastBytes, serdeRowType, source)

  PrestoTraceReplayRunner->>BroadcastWriteNode: execute replayed plan
  BroadcastWriteNode->>FileSystem: write broadcast data under replayOutputDir
  FileSystem-->>BroadcastWriteNode: data persisted for inspection
  BroadcastWriteNode-->>Developer: replayed broadcast data available in output directory
Loading

Updated class diagram for BroadcastWriteReplayer and related trace replay classes

classDiagram
  namespace facebook_velox_tool_trace {
    class OperatorReplayerBase {
      <<abstract>>
      +OperatorReplayerBase(string traceDir, string queryId, string taskId, string nodeId, string nodeName, string extraInfo, string driverIds, uint64_t queryCapacity, folly_Executor* executor)
      +core_PlanNodePtr createPlan()*
      +void run()
      #core_PlanNodePtr createPlanNode(const core_PlanNode* node, const core_PlanNodeId& nodeId, const core_PlanNodePtr& source) *
    }

    class BroadcastWriteReplayer {
      +BroadcastWriteReplayer(string traceDir, string queryId, string taskId, string nodeId, string nodeName, string driverIds, uint64_t queryCapacity, folly_Executor* executor, string replayOutputDir)
      -core_PlanNodePtr createPlanNode(const core_PlanNode* node, const core_PlanNodeId& nodeId, const core_PlanNodePtr& source) const
      -string replayOutputDir_
    }
  }

  namespace facebook_presto_operators {
    class BroadcastWriteNode {
      +BroadcastWriteNode(core_PlanNodeId id, string basePath, uint64_t maxBroadcastBytes, TypePtr serdeRowType, core_PlanNodePtr source)
      +string basePath()
      +uint64_t maxBroadcastBytes()
      +TypePtr serdeRowType()
      +vector~core_PlanNodePtr~ sources()
    }
  }

  namespace facebook_velox_core {
    class PlanNode {
      <<abstract>>
      +core_PlanNodeId id()
      +RowTypePtr outputType()
      +vector~core_PlanNodePtr~ sources()
    }
  }

  namespace facebook_velox_exec_trace {
    class DummySourceNode {
      +DummySourceNode(RowTypePtr outputType)
    }
  }

  class PrestoTraceReplayRunner {
    +PrestoTraceReplayRunner()
    +unique_ptr~OperatorReplayerBase~ createOperatorReplayer(string nodeName)
  }

  class PrestoServer {
    +void registerTraceNodeFactories()
  }

  facebook_velox_tool_trace_OperatorReplayerBase <|-- facebook_velox_tool_trace_BroadcastWriteReplayer
  facebook_velox_core_PlanNode <|-- facebook_presto_operators_BroadcastWriteNode
  facebook_velox_core_PlanNode <|-- facebook_velox_exec_trace_DummySourceNode

  PrestoTraceReplayRunner ..> facebook_velox_tool_trace_BroadcastWriteReplayer : creates
  facebook_velox_tool_trace_BroadcastWriteReplayer ..> facebook_presto_operators_BroadcastWriteNode : constructs in createPlanNode
  facebook_presto_operators_BroadcastWriteNode o--> facebook_velox_core_PlanNode : source
  facebook_presto_operators_BroadcastWriteNode ..> facebook_velox_exec_trace_DummySourceNode : used in trace factory
  PrestoServer ..> facebook_presto_operators_BroadcastWriteNode : registers trace node factory
  PrestoServer ..> facebook_velox_exec_trace_DummySourceNode : uses as dummy source
Loading

File-Level Changes

Change Details Files
Wire BroadcastWrite trace replay into the trace replayer entrypoint and expose an output directory flag.
  • Include BroadcastWrite and BroadcastWriteReplayer headers in the trace replayer main.
  • Add a --broadcast_write_output_dir gflag to specify the replay output directory for BroadcastWrite traces.
  • Extend PrestoTraceReplayRunner::makeReplayer to construct a BroadcastWriteReplayer when the traced node type is BroadcastWrite and enforce that the output dir flag is set.
presto-native-execution/presto_cpp/main/tool/trace/TraceReplayerMain.cpp
Register a trace-node factory for BroadcastWrite so traces can be captured and later reconstructed for replay.
  • Register a velox::exec::trace::registerTraceNodeFactory handler for the BroadcastWrite operator in PrestoServer::registerTraceNodeFactories.
  • In the factory, downcast to operators::BroadcastWriteNode and recreate a BroadcastWriteNode that wraps its single source with a DummySourceNode while preserving basePath, maxBroadcastBytes, and serdeRowType.
presto-native-execution/presto_cpp/main/PrestoServer.cpp
Introduce BroadcastWriteReplayer that reconstructs BroadcastWrite plan nodes with a caller-specified output directory instead of the original base path.
  • Implement BroadcastWriteReplayer as a concrete OperatorReplayerBase subclass that stores a replayOutputDir and validates it is non-empty.
  • Override createPlanNode to downcast the traced node to BroadcastWriteNode and recreate it with the replayOutputDir, original maxBroadcastBytes, serdeRowType, and the provided source node.
  • Provide a header declaring BroadcastWriteReplayer with the constructor and createPlanNode override.
presto-native-execution/presto_cpp/main/tool/trace/BroadcastWriteReplayer.cpp
presto-native-execution/presto_cpp/main/tool/trace/BroadcastWriteReplayer.h
Add comprehensive tests for BroadcastWrite trace replay behavior, including output path redirection and multi-driver support, using mock operators and writers.
  • Define a MockBroadcastFileWriter and a global registry to capture writes and allow verification of file paths, row counts, and finalization state during replay.
  • Implement TestBroadcastWriteOperator that uses MockBroadcastFileWriter and MockBroadcastWriteTranslator/MockBroadcastWriteOperator to separate trace-time and replay-time behavior.
  • Set up BroadcastWriteReplayerTest fixture to register necessary serdes, plan-node factories (including a BroadcastWriteNode trace factory), and manage a CPUThreadPoolExecutor and mock-writer lifecycle.
  • Add a basic test that traces a single-driver BroadcastWrite using a mock operator, verifies trace metadata, replays with BroadcastWriteReplayer into a new directory, and asserts that captured writer paths use the replay output dir and data is written/finalized.
  • Add a multipleDrivers test that traces BroadcastWrite with multiple drivers, discovers which drivers generated traces, replays only those driver IDs, and validates both per-writer output path redirection and that total captured rows across mock writers equals the expected traced rows.
presto-native-execution/presto_cpp/main/tool/trace/tests/BroadcastWriteReplayerTest.cpp

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • The BroadcastWrite trace-node registration logic is duplicated between PrestoServer::registerTraceNodeFactories and BroadcastWriteReplayerTest::SetUpTestCase; consider factoring this into a shared helper to avoid future drift.
  • In TraceReplayerMain, the broadcast_write_output_dir flag is only checked for non-emptiness; it may be more robust to also validate that the directory exists (or can be created) before running the replay.
  • The tests modify the global operator registry (registerOperator/unregisterAllOperators); consider using an RAII-style guard or restoring previous state after each test to avoid surprising interactions with other tests that rely on operator registration.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The BroadcastWrite trace-node registration logic is duplicated between PrestoServer::registerTraceNodeFactories and BroadcastWriteReplayerTest::SetUpTestCase; consider factoring this into a shared helper to avoid future drift.
- In TraceReplayerMain, the broadcast_write_output_dir flag is only checked for non-emptiness; it may be more robust to also validate that the directory exists (or can be created) before running the replay.
- The tests modify the global operator registry (registerOperator/unregisterAllOperators); consider using an RAII-style guard or restoring previous state after each test to avoid surprising interactions with other tests that rely on operator registration.

## Individual Comments

### Comment 1
<location> `presto-native-execution/presto_cpp/main/PrestoServer.cpp:1818-1825` </location>
<code_context>
+      "BroadcastWrite",
+      [](const velox::core::PlanNode* traceNode,
+         const velox::core::PlanNodeId& nodeId) -> velox::core::PlanNodePtr {
+        if (const auto* broadcastWriteNode =
+                dynamic_cast<const operators::BroadcastWriteNode*>(traceNode)) {
+          return std::make_shared<operators::BroadcastWriteNode>(
+              nodeId,
+              broadcastWriteNode->basePath(),
+              broadcastWriteNode->maxBroadcastBytes(),
+              broadcastWriteNode->serdeRowType(),
+              std::make_shared<velox::exec::trace::DummySourceNode>(
+                  broadcastWriteNode->sources().front()->outputType()));
+        }
</code_context>

<issue_to_address>
**issue (bug_risk):** Guard against unexpected empty sources when constructing DummySourceNode.

This assumes `broadcastWriteNode->sources()` is non-empty and calls `.front()` directly. In malformed or partially recorded traces this could be empty, leading to undefined behavior or a crash. Please add a defensive check (e.g., `VELOX_CHECK(!broadcastWriteNode->sources().empty())`) or otherwise handle the empty case so trace replay fails with a clear error.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +1818 to +1825
if (const auto* broadcastWriteNode =
dynamic_cast<const operators::BroadcastWriteNode*>(traceNode)) {
return std::make_shared<operators::BroadcastWriteNode>(
nodeId,
broadcastWriteNode->basePath(),
broadcastWriteNode->maxBroadcastBytes(),
broadcastWriteNode->serdeRowType(),
std::make_shared<velox::exec::trace::DummySourceNode>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Guard against unexpected empty sources when constructing DummySourceNode.

This assumes broadcastWriteNode->sources() is non-empty and calls .front() directly. In malformed or partially recorded traces this could be empty, leading to undefined behavior or a crash. Please add a defensive check (e.g., VELOX_CHECK(!broadcastWriteNode->sources().empty())) or otherwise handle the empty case so trace replay fails with a clear error.

xiaoxmeng
xiaoxmeng previously approved these changes Nov 25, 2025
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@han-yan01 please make sure it is e2e working before land.

…cpp (prestodb#26690)

Summary:

Add trace replay support for BroadcastWrite operator to presto_cpp, instead of ws as intermediate storage, allow user-defined path for sink data inspection

# Release Notes
```
== NO RELEASE NOTE ==
```

Reviewed By: xiaoxmeng

Differential Revision: D87790828
@singcha singcha self-requested a review December 1, 2025 17:28
@singcha singcha merged commit e28d004 into prestodb:master Dec 1, 2025
83 checks passed
@prestodb-ci prestodb-ci mentioned this pull request Dec 1, 2025
19 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants