Skip to content

Commit e28d004

Browse files
authored
feat: Add trace replay support for BroadcastWrite operator to presto_cpp (#26690)
Summary: Add trace replay support for BroadcastWrite operator to presto_cpp, instead of ws as intermediate storage, allow user-defined path for sink data inspection # Release Notes ``` == NO RELEASE NOTE == ``` Differential Revision: D87790828
1 parent ac85438 commit e28d004

File tree

5 files changed

+776
-1
lines changed

5 files changed

+776
-1
lines changed

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,6 +1810,24 @@ void PrestoServer::reportNodeStats(proxygen::ResponseHandler* downstream) {
18101810
}
18111811

18121812
void PrestoServer::registerTraceNodeFactories() {
1813+
// Register trace node factory for BroadcastWrite operator.
1814+
velox::exec::trace::registerTraceNodeFactory(
1815+
"BroadcastWrite",
1816+
[](const velox::core::PlanNode* traceNode,
1817+
const velox::core::PlanNodeId& nodeId) -> velox::core::PlanNodePtr {
1818+
if (const auto* broadcastWriteNode =
1819+
dynamic_cast<const operators::BroadcastWriteNode*>(traceNode)) {
1820+
return std::make_shared<operators::BroadcastWriteNode>(
1821+
nodeId,
1822+
broadcastWriteNode->basePath(),
1823+
broadcastWriteNode->maxBroadcastBytes(),
1824+
broadcastWriteNode->serdeRowType(),
1825+
std::make_shared<velox::exec::trace::DummySourceNode>(
1826+
broadcastWriteNode->sources().front()->outputType()));
1827+
}
1828+
return nullptr;
1829+
});
1830+
18131831
// Register trace node factory for PartitionAndSerialize operator
18141832
velox::exec::trace::registerTraceNodeFactory(
18151833
"PartitionAndSerialize",
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
/*
15+
* Copyright (c) Facebook, Inc. and its affiliates.
16+
*/
17+
#include "presto_cpp/main/tool/trace/BroadcastWriteReplayer.h"
18+
19+
#include "presto_cpp/main/operators/BroadcastWrite.h"
20+
21+
using namespace facebook::velox;
22+
using namespace facebook::velox::exec;
23+
24+
namespace facebook::velox::tool::trace {
25+
26+
BroadcastWriteReplayer::BroadcastWriteReplayer(
27+
const std::string& traceDir,
28+
const std::string& queryId,
29+
const std::string& taskId,
30+
const std::string& nodeId,
31+
const std::string& nodeName,
32+
const std::string& driverIds,
33+
uint64_t queryCapacity,
34+
folly::Executor* executor,
35+
const std::string& replayOutputDir)
36+
: OperatorReplayerBase(
37+
traceDir,
38+
queryId,
39+
taskId,
40+
nodeId,
41+
nodeName,
42+
"",
43+
driverIds,
44+
queryCapacity,
45+
executor),
46+
replayOutputDir_(replayOutputDir) {
47+
VELOX_CHECK(!replayOutputDir_.empty());
48+
}
49+
50+
core::PlanNodePtr BroadcastWriteReplayer::createPlanNode(
51+
const core::PlanNode* node,
52+
const core::PlanNodeId& nodeId,
53+
const core::PlanNodePtr& source) const {
54+
const auto* broadcastWriteNode =
55+
dynamic_cast<const facebook::presto::operators::BroadcastWriteNode*>(
56+
node);
57+
VELOX_CHECK_NOT_NULL(broadcastWriteNode);
58+
59+
return std::make_shared<facebook::presto::operators::BroadcastWriteNode>(
60+
nodeId,
61+
replayOutputDir_,
62+
broadcastWriteNode->maxBroadcastBytes(),
63+
broadcastWriteNode->serdeRowType(),
64+
source);
65+
}
66+
67+
} // namespace facebook::velox::tool::trace
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
/*
15+
* Copyright (c) Facebook, Inc. and its affiliates.
16+
*/
17+
#pragma once
18+
19+
#include "velox/core/PlanNode.h"
20+
#include "velox/tool/trace/OperatorReplayerBase.h"
21+
22+
namespace facebook::velox::tool::trace {
23+
24+
/// The replayer to replay the traced 'BroadcastWrite' operator.
25+
class BroadcastWriteReplayer final : public OperatorReplayerBase {
26+
public:
27+
BroadcastWriteReplayer(
28+
const std::string& traceDir,
29+
const std::string& queryId,
30+
const std::string& taskId,
31+
const std::string& nodeId,
32+
const std::string& nodeName,
33+
const std::string& driverIds,
34+
uint64_t queryCapacity,
35+
folly::Executor* executor,
36+
const std::string& replayOutputDir);
37+
38+
private:
39+
core::PlanNodePtr createPlanNode(
40+
const core::PlanNode* node,
41+
const core::PlanNodeId& nodeId,
42+
const core::PlanNodePtr& source) const override;
43+
44+
const std::string replayOutputDir_;
45+
};
46+
47+
} // namespace facebook::velox::tool::trace

presto-native-execution/presto_cpp/main/tool/trace/TraceReplayerMain.cpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,17 @@
1515
#include "velox/tool/trace/TraceReplayRunner.h"
1616

1717
#include <folly/init/Init.h>
18+
#include "presto_cpp/main/operators/BroadcastWrite.h"
1819
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
20+
#include "presto_cpp/main/tool/trace/BroadcastWriteReplayer.h"
1921
#include "presto_cpp/main/tool/trace/PartitionAndSerializeReplayer.h"
2022
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
2123

24+
DEFINE_string(
25+
broadcast_write_output_dir,
26+
"",
27+
"Specify output directory of BroadcastWrite.");
28+
2229
using namespace facebook::velox;
2330
using namespace facebook::presto;
2431

@@ -49,7 +56,21 @@ class PrestoTraceReplayRunner
4956
const auto queryCapacityBytes = (1ULL * FLAGS_query_memory_capacity_mb)
5057
<< 20;
5158

52-
if (nodeName == "PartitionAndSerialize") {
59+
if (nodeName == "BroadcastWrite") {
60+
VELOX_USER_CHECK(
61+
!FLAGS_broadcast_write_output_dir.empty(),
62+
"--broadcast_write_output_dir is required");
63+
return std::make_unique<tool::trace::BroadcastWriteReplayer>(
64+
FLAGS_root_dir,
65+
FLAGS_query_id,
66+
FLAGS_task_id,
67+
FLAGS_node_id,
68+
nodeName,
69+
FLAGS_driver_ids,
70+
queryCapacityBytes,
71+
cpuExecutor_.get(),
72+
FLAGS_broadcast_write_output_dir);
73+
} else if (nodeName == "PartitionAndSerialize") {
5374
return std::make_unique<tool::trace::PartitionAndSerializeReplayer>(
5475
FLAGS_root_dir,
5576
FLAGS_query_id,

0 commit comments

Comments
 (0)