Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add expression evaluation in Presto sidecar #24126

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-benchmark</artifactId>
Expand Down Expand Up @@ -1024,6 +1031,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-native-execution</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.facebook.hive</groupId>
<artifactId>hive-dwrf</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
public class HandleJsonModule
implements Module
{
private final HandleResolver handleResolver;

public HandleJsonModule()
{
this(null);
}

public HandleJsonModule(HandleResolver handleResolver)
{
this.handleResolver = handleResolver;
}

@Override
public void configure(Binder binder)
{
Expand All @@ -38,6 +50,11 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(MetadataUpdateJacksonModule.class);

binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
if (handleResolver == null) {
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
}
else {
binder.bind(HandleResolver.class).toInstance(handleResolver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.RowExpressionSerde;
import com.facebook.presto.spi.analyzer.ViewDefinition;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.spi.plan.SimplePlanFragmentSerde;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.DomainTranslator;
import com.facebook.presto.spi.relation.PredicateCompiler;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.session.WorkerSessionPropertyProvider;
import com.facebook.presto.spiller.FileSingleStreamSpillerFactory;
Expand Down Expand Up @@ -195,6 +197,7 @@
import com.facebook.presto.sql.analyzer.QueryExplainer;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.expressions.JsonCodecRowExpressionSerde;
import com.facebook.presto.sql.gen.ExpressionCompiler;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
Expand Down Expand Up @@ -362,6 +365,7 @@ else if (serverConfig.isCoordinator()) {

// expression manager
binder.bind(ExpressionOptimizerManager.class).in(Scopes.SINGLETON);
binder.bind(RowExpressionSerde.class).to(JsonCodecRowExpressionSerde.class).in(Scopes.SINGLETON);

// schema properties
binder.bind(SchemaPropertyManager.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -555,6 +559,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
jsonCodecBinder(binder).bindJsonCodec(SqlInvokedFunction.class);
jsonCodecBinder(binder).bindJsonCodec(TaskSource.class);
jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class);
jsonCodecBinder(binder).bindJsonCodec(RowExpression.class);
smileCodecBinder(binder).bindSmileCodec(TaskStatus.class);
smileCodecBinder(binder).bindSmileCodec(TaskInfo.class);
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.nodeManager.PluginNodeManager;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.RowExpressionSerde;
import com.facebook.presto.spi.relation.ExpressionOptimizer;
import com.facebook.presto.spi.relation.ExpressionOptimizerProvider;
import com.facebook.presto.spi.sql.planner.ExpressionOptimizerContext;
Expand Down Expand Up @@ -53,23 +54,25 @@ public class ExpressionOptimizerManager

private final NodeManager nodeManager;
private final FunctionAndTypeManager functionAndTypeManager;
private final RowExpressionSerde rowExpressionSerde;
private final FunctionResolution functionResolution;
private final File configurationDirectory;

private final Map<String, ExpressionOptimizerFactory> expressionOptimizerFactories = new ConcurrentHashMap<>();
private final Map<String, ExpressionOptimizer> expressionOptimizers = new ConcurrentHashMap<>();

@Inject
public ExpressionOptimizerManager(PluginNodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager)
public ExpressionOptimizerManager(PluginNodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager, RowExpressionSerde rowExpressionSerde)
{
this(nodeManager, functionAndTypeManager, EXPRESSION_MANAGER_CONFIGURATION_DIRECTORY);
this(nodeManager, functionAndTypeManager, rowExpressionSerde, EXPRESSION_MANAGER_CONFIGURATION_DIRECTORY);
}

public ExpressionOptimizerManager(PluginNodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager, File configurationDirectory)
public ExpressionOptimizerManager(PluginNodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager, RowExpressionSerde rowExpressionSerde, File configurationDirectory)
{
requireNonNull(nodeManager, "nodeManager is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
this.rowExpressionSerde = requireNonNull(rowExpressionSerde, "rowExpressionSerde is null");
this.functionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver());
this.configurationDirectory = requireNonNull(configurationDirectory, "configurationDirectory is null");
expressionOptimizers.put(DEFAULT_EXPRESSION_OPTIMIZER_NAME, new RowExpressionOptimizer(functionAndTypeManager));
Expand All @@ -89,7 +92,7 @@ public void loadExpressionOptimizerFactories()
}
}

private void loadExpressionOptimizerFactory(File configurationFile)
public void loadExpressionOptimizerFactory(File configurationFile)
throws IOException
{
String name = getNameWithoutExtension(configurationFile.getName());
Expand All @@ -99,13 +102,19 @@ private void loadExpressionOptimizerFactory(File configurationFile)
Map<String, String> properties = new HashMap<>(loadProperties(configurationFile));
String factoryName = properties.remove(EXPRESSION_MANAGER_FACTORY_NAME);
checkArgument(!isNullOrEmpty(factoryName), "%s does not contain %s", configurationFile, EXPRESSION_MANAGER_FACTORY_NAME);
loadExpressionOptimizerFactory(factoryName, name, properties);
}

public void loadExpressionOptimizerFactory(String factoryName, String expressionOptimizerName, Map<String, String> properties)
{
requireNonNull(factoryName, "factoryName is null");
checkArgument(expressionOptimizerFactories.containsKey(factoryName),
"ExpressionOptimizerFactory %s is not registered, registered factories: ", factoryName, expressionOptimizerFactories.keySet());

ExpressionOptimizer optimizer = expressionOptimizerFactories.get(factoryName).createOptimizer(
properties,
new ExpressionOptimizerContext(nodeManager, functionAndTypeManager, functionResolution));
expressionOptimizers.put(name, optimizer);
new ExpressionOptimizerContext(nodeManager, rowExpressionSerde, functionAndTypeManager, functionResolution));
expressionOptimizers.put(expressionOptimizerName, optimizer);
}

public void addExpressionOptimizerFactory(ExpressionOptimizerFactory expressionOptimizerFactory)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.expressions;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.spi.RowExpressionSerde;
import com.facebook.presto.spi.relation.RowExpression;

import javax.inject.Inject;

import java.nio.charset.StandardCharsets;

import static java.util.Objects.requireNonNull;

public class JsonCodecRowExpressionSerde
implements RowExpressionSerde
{
private final JsonCodec<RowExpression> codec;

@Inject
public JsonCodecRowExpressionSerde(JsonCodec<RowExpression> codec)
{
this.codec = requireNonNull(codec, "codec is null");
}

@Override
public String serialize(RowExpression expression)
{
return new String(codec.toBytes(expression), StandardCharsets.UTF_8);
}

@Override
public RowExpression deserialize(String data)
{
return codec.fromBytes(data.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.spi.plan.StageExecutionDescriptor;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spiller.FileSingleStreamSpillerFactory;
import com.facebook.presto.spiller.GenericPartitioningSpillerFactory;
import com.facebook.presto.spiller.GenericSpillerFactory;
Expand All @@ -175,6 +176,7 @@
import com.facebook.presto.sql.analyzer.QueryExplainer;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.expressions.JsonCodecRowExpressionSerde;
import com.facebook.presto.sql.gen.ExpressionCompiler;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
Expand Down Expand Up @@ -462,7 +464,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
this.pageIndexerFactory = new GroupByHashPageIndexerFactory(joinCompiler);

NodeInfo nodeInfo = new NodeInfo("test");
expressionOptimizerManager = new ExpressionOptimizerManager(new PluginNodeManager(nodeManager, nodeInfo.getEnvironment()), getFunctionAndTypeManager());
expressionOptimizerManager = new ExpressionOptimizerManager(new PluginNodeManager(nodeManager, nodeInfo.getEnvironment()), getFunctionAndTypeManager(), new JsonCodecRowExpressionSerde(jsonCodec(RowExpression.class)));

this.statsNormalizer = new StatsNormalizer();
this.scalarStatsCalculator = new ScalarStatsCalculator(metadata, expressionOptimizerManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Properties;

import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
import static com.facebook.presto.sql.relational.Expressions.constant;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void setUp()
manager = new ExpressionOptimizerManager(
pluginNodeManager,
METADATA.getFunctionAndTypeManager(),
new JsonCodecRowExpressionSerde(jsonCodec(RowExpression.class)),
directory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.sql.Optimizer;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.expressions.JsonCodecRowExpressionSerde;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.RuleStatsRecorder;
import com.facebook.presto.sql.planner.TypeProvider;
Expand All @@ -49,6 +51,7 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.sql.planner.assertions.PlanAssert.assertPlan;
import static com.facebook.presto.sql.planner.assertions.PlanAssert.assertPlanDoesNotMatch;
import static com.facebook.presto.transaction.TransactionBuilder.transaction;
Expand Down Expand Up @@ -177,7 +180,8 @@ private List<PlanOptimizer> getMinimalOptimizers()
metadata,
new ExpressionOptimizerManager(
new PluginNodeManager(new InMemoryNodeManager()),
queryRunner.getFunctionAndTypeManager())).rules()));
queryRunner.getFunctionAndTypeManager(),
new JsonCodecRowExpressionSerde(jsonCodec(RowExpression.class)))).rules()));
}

private <T> void inTransaction(Function<Session, T> transactionSessionConsumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.spi.relation.SpecialFormExpression;
import com.facebook.presto.sql.TestingRowExpressionTranslator;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.expressions.JsonCodecRowExpressionSerde;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.tree.Expression;
Expand All @@ -38,6 +39,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager;
Expand Down Expand Up @@ -185,7 +187,7 @@ private static void assertSimplifies(String expression, String rowExpressionExpe
Expression actualExpression = rewriteIdentifiersToSymbolReferences(SQL_PARSER.createExpression(expression));

InMemoryNodeManager nodeManager = new InMemoryNodeManager();
ExpressionOptimizerManager expressionOptimizerManager = new ExpressionOptimizerManager(new PluginNodeManager(nodeManager), METADATA.getFunctionAndTypeManager());
ExpressionOptimizerManager expressionOptimizerManager = new ExpressionOptimizerManager(new PluginNodeManager(nodeManager), METADATA.getFunctionAndTypeManager(), new JsonCodecRowExpressionSerde(jsonCodec(RowExpression.class)));

TestingRowExpressionTranslator translator = new TestingRowExpressionTranslator(METADATA);
RowExpression actualRowExpression = translator.translate(actualExpression, TypeProvider.viewOf(TYPES));
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ target_link_libraries(
presto_http
presto_operators
presto_velox_conversion
presto_expression_optimizer
velox_abfs
velox_aggregates
velox_caching
Expand Down
28 changes: 28 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,13 @@ void PrestoServer::registerSidecarEndpoints() {
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getFunctionsMetadata());
});
httpServer_->registerPost(
"/v1/expressions",
[&](proxygen::HTTPMessage* message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
optimizeExpressions(*message, body, downstream);
});
httpServer_->registerPost(
"/v1/velox/plan",
[server = this](
Expand Down Expand Up @@ -1599,4 +1606,25 @@ protocol::NodeStatus PrestoServer::fetchNodeStatus() {
return nodeStatus;
}

void PrestoServer::optimizeExpressions(
const proxygen::HTTPMessage& message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
json::array_t inputRowExpressions =
json::parse(util::extractMessageBody(body));
auto rowExpressionOptimizer =
std::make_unique<expression::RowExpressionOptimizer>(
nativeWorkerPool_.get());
auto result = rowExpressionOptimizer->optimize(
message.getHeaders(), inputRowExpressions);
if (result.second) {
VELOX_CHECK(
result.first.is_array(),
"The output json is not an array of RowExpressions");
http::sendOkResponse(downstream, result.first);
} else {
http::sendErrorResponse(downstream, result.first);
}
}

} // namespace facebook::presto
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "presto_cpp/main/PeriodicHeartbeatManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServerOperations.h"
#include "presto_cpp/main/types/RowExpressionOptimizer.h"
#include "presto_cpp/main/types/VeloxPlanValidator.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryAllocator.h"
Expand Down Expand Up @@ -217,6 +218,11 @@ class PrestoServer {

protocol::NodeStatus fetchNodeStatus();

void optimizeExpressions(
const proxygen::HTTPMessage& message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream);

void populateMemAndCPUInfo();

// Periodically yield tasks if there are tasks queued.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ add_library(presto_velox_conversion OBJECT VeloxPlanConversion.cpp)

target_link_libraries(presto_velox_conversion velox_type)

add_library(presto_expression_optimizer RowExpressionConverter.cpp
RowExpressionOptimizer.cpp)

target_link_libraries(presto_expression_optimizer presto_type_converter
presto_types presto_protocol)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Loading