Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,24 @@ void PrestoServer::reportNodeStats(proxygen::ResponseHandler* downstream) {
}

void PrestoServer::registerTraceNodeFactories() {
// Register trace node factory for BroadcastWrite operator.
velox::exec::trace::registerTraceNodeFactory(
"BroadcastWrite",
[](const velox::core::PlanNode* traceNode,
const velox::core::PlanNodeId& nodeId) -> velox::core::PlanNodePtr {
if (const auto* broadcastWriteNode =
dynamic_cast<const operators::BroadcastWriteNode*>(traceNode)) {
return std::make_shared<operators::BroadcastWriteNode>(
nodeId,
broadcastWriteNode->basePath(),
broadcastWriteNode->maxBroadcastBytes(),
broadcastWriteNode->serdeRowType(),
std::make_shared<velox::exec::trace::DummySourceNode>(
Comment on lines +1818 to +1825
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Guard against unexpected empty sources when constructing DummySourceNode.

This assumes broadcastWriteNode->sources() is non-empty and calls .front() directly. In malformed or partially recorded traces this could be empty, leading to undefined behavior or a crash. Please add a defensive check (e.g., VELOX_CHECK(!broadcastWriteNode->sources().empty())) or otherwise handle the empty case so trace replay fails with a clear error.

broadcastWriteNode->sources().front()->outputType()));
}
return nullptr;
});

// Register trace node factory for PartitionAndSerialize operator
velox::exec::trace::registerTraceNodeFactory(
"PartitionAndSerialize",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*/
#include "presto_cpp/main/tool/trace/BroadcastWriteReplayer.h"

#include "presto_cpp/main/operators/BroadcastWrite.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace facebook::velox::tool::trace {

BroadcastWriteReplayer::BroadcastWriteReplayer(
const std::string& traceDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const std::string& nodeName,
const std::string& driverIds,
uint64_t queryCapacity,
folly::Executor* executor,
const std::string& replayOutputDir)
: OperatorReplayerBase(
traceDir,
queryId,
taskId,
nodeId,
nodeName,
"",
driverIds,
queryCapacity,
executor),
replayOutputDir_(replayOutputDir) {
VELOX_CHECK(!replayOutputDir_.empty());
}

core::PlanNodePtr BroadcastWriteReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
const auto* broadcastWriteNode =
dynamic_cast<const facebook::presto::operators::BroadcastWriteNode*>(
node);
VELOX_CHECK_NOT_NULL(broadcastWriteNode);

return std::make_shared<facebook::presto::operators::BroadcastWriteNode>(
nodeId,
replayOutputDir_,
broadcastWriteNode->maxBroadcastBytes(),
broadcastWriteNode->serdeRowType(),
source);
}

} // namespace facebook::velox::tool::trace
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*/
#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {

/// The replayer to replay the traced 'BroadcastWrite' operator.
class BroadcastWriteReplayer final : public OperatorReplayerBase {
public:
BroadcastWriteReplayer(
const std::string& traceDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const std::string& nodeName,
const std::string& driverIds,
uint64_t queryCapacity,
folly::Executor* executor,
const std::string& replayOutputDir);

private:
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;

const std::string replayOutputDir_;
};

} // namespace facebook::velox::tool::trace
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
#include "velox/tool/trace/TraceReplayRunner.h"

#include <folly/init/Init.h>
#include "presto_cpp/main/operators/BroadcastWrite.h"
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/tool/trace/BroadcastWriteReplayer.h"
#include "presto_cpp/main/tool/trace/PartitionAndSerializeReplayer.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"

DEFINE_string(
broadcast_write_output_dir,
"",
"Specify output directory of BroadcastWrite.");

using namespace facebook::velox;
using namespace facebook::presto;

Expand Down Expand Up @@ -49,7 +56,21 @@ class PrestoTraceReplayRunner
const auto queryCapacityBytes = (1ULL * FLAGS_query_memory_capacity_mb)
<< 20;

if (nodeName == "PartitionAndSerialize") {
if (nodeName == "BroadcastWrite") {
VELOX_USER_CHECK(
!FLAGS_broadcast_write_output_dir.empty(),
"--broadcast_write_output_dir is required");
return std::make_unique<tool::trace::BroadcastWriteReplayer>(
FLAGS_root_dir,
FLAGS_query_id,
FLAGS_task_id,
FLAGS_node_id,
nodeName,
FLAGS_driver_ids,
queryCapacityBytes,
cpuExecutor_.get(),
FLAGS_broadcast_write_output_dir);
} else if (nodeName == "PartitionAndSerialize") {
return std::make_unique<tool::trace::PartitionAndSerializeReplayer>(
FLAGS_root_dir,
FLAGS_query_id,
Expand Down
Loading
Loading