Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[submodule "presto-native-execution/velox"]
path = presto-native-execution/velox
url = https://github.com/facebookincubator/velox.git
url = https://github.com/yingsu00/velox.git
branch = pushdownCastToSourceLocal
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class NativeWorkerSessionPropertyProvider
public static final String NATIVE_INDEX_LOOKUP_JOIN_SPLIT_OUTPUT = "native_index_lookup_join_split_output";
public static final String NATIVE_UNNEST_SPLIT_OUTPUT = "native_unnest_split_output";
public static final String NATIVE_USE_VELOX_GEOSPATIAL_JOIN = "native_use_velox_geospatial_join";
public static final String NATIVE_PUSHDOWN_INTEGER_UPCASTS_TO_SOURCE = "native_pushdown_integer_upcasts_to_source";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -427,11 +428,19 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
true,
!nativeExecution),
booleanProperty(

NATIVE_USE_VELOX_GEOSPATIAL_JOIN,
"If this is true, then the protocol::SpatialJoinNode is converted to a " +
"velox::core::SpatialJoinNode. Otherwise, it is converted to a " +
"velox::core::NestedLoopJoinNode.",
true,
!nativeExecution),

booleanProperty(
NATIVE_PUSHDOWN_INTEGER_UPCASTS_TO_SOURCE,
"Native Execution only. Pushdown integer type upcasts to scan if they are " +
"immediately after scan",
false,
!nativeExecution));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.sql;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.cost.CachingCostProvider;
Expand Down Expand Up @@ -104,9 +105,13 @@ public Plan validateAndOptimizePlan(PlanNode root, PlanStage stage)
{
validateIntermediatePlanWithRuntimeStats(root);

Logger log = Logger.get(Optimizer.class);

boolean enableVerboseRuntimeStats = SystemSessionProperties.isVerboseRuntimeStatsEnabled(session);
if (stage.ordinal() >= OPTIMIZED.ordinal()) {
int i = 0;
for (PlanOptimizer optimizer : planOptimizers) {
log.info("optimizer %s, name ", i++, optimizer.toString());
if (Thread.currentThread().isInterrupted()) {
throw new PrestoException(QUERY_PLANNING_TIMEOUT, String.format("The query optimizer exceeded the timeout of %s.", getQueryAnalyzerTimeout(session).toString()));
}
Expand Down
6 changes: 5 additions & 1 deletion presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF)

option(PRESTO_ENABLE_SPATIAL "Enable spatial support" ON)
option(PRESTO_ENABLE_SPATIAL "Enable spatial support" OFF)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should be reverted.



# Set all Velox options below and make sure that if we include folly headers or
# other dependency headers that include folly headers we turn off the coroutines
Expand Down Expand Up @@ -110,12 +111,15 @@ if(PRESTO_ENABLE_CUDF)
cmake_policy(SET CMP0104 NEW)
endif()

set(PRESTO_ENABLE_SPATIAL OFF)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. This is switching off SPATIAL needed for Bolt.

set(
VELOX_ENABLE_GEO
${PRESTO_ENABLE_SPATIAL}
CACHE BOOL
"Enable Velox Geometry (aka spatial) support"
)
message(STATUS "PRESTO_ENABLE_SPATIAL: ${PRESTO_ENABLE_SPATIAL}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these debug messages.

message(STATUS "VELOX_ENABLE_GEO: ${VELOX_ENABLE_GEO}")

set(VELOX_BUILD_TESTING OFF CACHE BOOL "Enable Velox tests")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,14 @@ SessionProperties::SessionProperties() {
false,
std::nullopt,
"true");

addSessionProperty(
kPushdownIntegerUpcastsToSource,
"Enable pushdown of integer upcasts to the source operators.",
BOOLEAN(),
false,
QueryConfig::kPushdownIntegerUpcastsToSource,
boolToString(c.pushdownIntegerUpcastsToSource()));
}

const std::string SessionProperties::toVeloxConfig(
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/SessionProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ class SessionProperties {
static constexpr const char* kUseVeloxGeospatialJoin =
"native_use_velox_geospatial_join";

static constexpr const char* kPushdownIntegerUpcastsToSource =
"native_pushdown_integer_upcasts_to_source";

inline bool hasVeloxConfig(const std::string& key) {
auto sessionProperty = sessionProperties_.find(key);
if (sessionProperty == sessionProperties_.end()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ class SystemConnector : public velox::connector::Connector {
const velox::RowTypePtr& outputType,
const velox::connector::ConnectorTableHandlePtr& tableHandle,
const velox::connector::ColumnHandleMap& columnHandles,
velox::connector::ConnectorQueryCtx* connectorQueryCtx) override final {
velox::connector::ConnectorQueryCtx* connectorQueryCtx,
bool pushdownCasts = false) override final {
VELOX_CHECK(taskManager_);
return std::make_unique<SystemDataSource>(
outputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ ArrowFlightConnector::createDataSource(
const velox::RowTypePtr& outputType,
const velox::connector::ConnectorTableHandlePtr& tableHandle,
const velox::connector::ColumnHandleMap& columnHandles,
velox::connector::ConnectorQueryCtx* connectorQueryCtx) {
velox::connector::ConnectorQueryCtx* connectorQueryCtx,
bool pushdownCasts) {
return std::make_unique<ArrowFlightDataSource>(
outputType,
columnHandles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class ArrowFlightConnector : public velox::connector::Connector {
const velox::RowTypePtr& outputType,
const velox::connector::ConnectorTableHandlePtr& tableHandle,
const velox::connector::ColumnHandleMap& columnHandles,
velox::connector::ConnectorQueryCtx* connectorQueryCtx) override;
velox::connector::ConnectorQueryCtx* connectorQueryCtx,
bool pushdownCasts = false) override;

std::unique_ptr<velox::connector::DataSink> createDataSink(
velox::RowTypePtr inputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class PartitionAndSerializeOperator : public Operator {
serializeKeys(
nextOutputRow_, endOutputRow, keyOutputBufferSize, *keyVector);

// Extract slice from output_ and construct the output vector.
// Extract slice from outputWithoutUpcasts_ and construct the output vector.
std::vector<VectorPtr> childrenVectors;
childrenVectors.push_back(
output_->childAt(0)->slice(nextOutputRow_, batchSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TEST_F(QueryContextManagerTest, nativeSessionProperties) {
{"native_expression_max_array_size_in_reduce", "99999"},
{"native_expression_max_compiled_regexes", "54321"},
{"request_data_sizes_max_wait_sec", "20"},
{"native_pushdown_integer_upcasts_to_source", "true"},
}};
protocol::TaskUpdateRequest updateRequest;
updateRequest.session = session;
Expand All @@ -89,6 +90,7 @@ TEST_F(QueryContextManagerTest, nativeSessionProperties) {
EXPECT_EQ(queryCtx->queryConfig().exprMaxArraySizeInReduce(), 99999);
EXPECT_EQ(queryCtx->queryConfig().exprMaxCompiledRegexes(), 54321);
EXPECT_EQ(queryCtx->queryConfig().requestDataSizesMaxWaitSec(), 20);
EXPECT_TRUE(queryCtx->queryConfig().pushdownIntegerUpcastsToSource());
}

TEST_F(QueryContextManagerTest, nativeConnectorSessionProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ TEST_F(SessionPropertiesTest, validateMapping) {
{SessionProperties::kUnnestSplitOutput,
core::QueryConfig::kUnnestSplitOutput},
{SessionProperties::kUseVeloxGeospatialJoin,
SessionProperties::kUseVeloxGeospatialJoin}};
SessionProperties::kUseVeloxGeospatialJoin},
{SessionProperties::kPushdownIntegerUpcastsToSource,
core::QueryConfig::kPushdownIntegerUpcastsToSource}};

const auto sessionProperties = SessionProperties::instance();
for (const auto& [sessionProperty, expectedVeloxConfig] : expectedMappings) {
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 162 files
Loading