Skip to content

Commit d5fc9df

Browse files
committed
Isthmus To/From SQL examples & enhanced APIs
Starting from two examples of using Isthmus to go from and to SQL a few new methods make the process a lot simpler and symetrical. We found that in use we needed helper wrapper clases, but these could be pushed into the main code. Signed-off-by: MBWhite <[email protected]>
1 parent 7b4d5cf commit d5fc9df

File tree

16 files changed

+1397
-6
lines changed

16 files changed

+1397
-6
lines changed

examples/isthmus-api/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
_apps
2+
_data
3+
**/*/bin
4+
build
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
plugins {
2+
// Apply the application plugin to add support for building a CLI application in Java.
3+
id("application")
4+
alias(libs.plugins.spotless)
5+
id("substrait.java-conventions")
6+
}
7+
8+
repositories {
9+
// Use Maven Central for resolving dependencies.
10+
mavenCentral()
11+
}
12+
13+
dependencies {
14+
implementation(project(":isthmus"))
15+
implementation(libs.calcite.core)
16+
implementation(libs.calcite.server)
17+
// For a real Spark application, these would not be required since they would be in the Spark
18+
// server classpath
19+
runtimeOnly(libs.spark.core)
20+
runtimeOnly(libs.spark.hive)
21+
}
22+
23+
tasks.jar {
24+
dependsOn(":spark:jar", ":core:jar", ":core:shadowJar")
25+
26+
isZip64 = true
27+
exclude("META-INF/*.RSA")
28+
exclude("META-INF/*.SF")
29+
exclude("META-INF/*.DSA")
30+
31+
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
32+
manifest.attributes["Main-Class"] = "io.substrait.examples.IsthmusAppExample"
33+
from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) }))
34+
}
35+
36+
application { mainClass = "io.substrait.examples.IsthmusAppExamples" }
37+
38+
tasks.named<Test>("test") {
39+
// Use JUnit Platform for unit tests.
40+
useJUnitPlatform()
41+
}
42+
43+
java { toolchain { languageVersion.set(JavaLanguageVersion.of(17)) } }
44+
45+
tasks.pmdMain { dependsOn(":core:shadowJar") }
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package io.substrait.examples;
2+
3+
import io.substrait.examples.IsthmusAppExamples.Action;
4+
import io.substrait.examples.util.SubstraitStringify;
5+
import io.substrait.isthmus.SqlToSubstrait;
6+
import io.substrait.isthmus.SubstraitTypeSystem;
7+
import io.substrait.isthmus.sql.SubstraitCreateStatementParser;
8+
import io.substrait.plan.Plan;
9+
import io.substrait.plan.PlanProtoConverter;
10+
import java.io.IOException;
11+
import java.nio.file.Files;
12+
import java.nio.file.Path;
13+
import java.nio.file.Paths;
14+
import java.util.List;
15+
import org.apache.calcite.config.CalciteConnectionConfig;
16+
import org.apache.calcite.config.CalciteConnectionProperty;
17+
import org.apache.calcite.jdbc.CalciteSchema;
18+
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
19+
import org.apache.calcite.prepare.CalciteCatalogReader;
20+
import org.apache.calcite.rel.type.RelDataTypeFactory;
21+
import org.apache.calcite.sql.SqlDialect;
22+
import org.apache.calcite.sql.parser.SqlParseException;
23+
24+
/**
25+
* Substrait from SQL conversions.
26+
*
27+
* <p>There are 4 steps in the whole process.
28+
*
29+
* <p>1) A fully typed schema is required for the 'inputs'. Within a SQL context this is the `CREATE
30+
* TABLE` commands; this needs to be converted to a Calcite Schema 2) The SQL query to convert (ion
31+
* one type of dialect) 3) Conversion of the SQL query to Calcite Relations 4) Conversion of the
32+
* Calcite Relations to Substrait relations
33+
*
34+
* <p>Note that schema could be created from other means eg Caclcite's refelect based schema.
35+
*
36+
* <p>The substrait plan can then be used as wished.
37+
*/
38+
public class FromSql implements Action {
39+
40+
@Override
41+
public void run(final String[] args) {
42+
try {
43+
final String createSql =
44+
"""
45+
CREATE TABLE "vehicles" ("vehicle_id" varchar(15), "make" varchar(40), "model" varchar(40),
46+
"colour" varchar(15), "fuel_type" varchar(15),
47+
"cylinder_capacity" int, "first_use_date" varchar(15));
48+
49+
CREATE TABLE "tests" ("test_id" varchar(15), "vehicle_id" varchar(15),
50+
"test_date" varchar(20), "test_class" varchar(20), "test_type" varchar(20),
51+
"test_result" varchar(15),"test_mileage" int, "postcode_area" varchar(15));
52+
53+
""";
54+
55+
// Create the Caclcite Schema from the CREATE TABLES statements
56+
// as this is a SQL it could be in a schema, but the Isthmus Helper classes here are assuminmg
57+
// a common SQL format
58+
final CalciteSchema calciteSchema = CalciteSchema.createRootSchema(false);
59+
SubstraitCreateStatementParser.processCreateStatements(createSql)
60+
.forEach(t -> calciteSchema.add(t.getName(), t));
61+
62+
// Type Factory based on Java Types
63+
final RelDataTypeFactory typeFactory =
64+
new JavaTypeFactoryImpl(SubstraitTypeSystem.TYPE_SYSTEM);
65+
66+
// Default configuration for calcite
67+
final CalciteConnectionConfig calciteDefaultConfig =
68+
CalciteConnectionConfig.DEFAULT.set(
69+
CalciteConnectionProperty.CASE_SENSITIVE, Boolean.FALSE.toString());
70+
71+
final CalciteCatalogReader catalogReader =
72+
new CalciteCatalogReader(calciteSchema, List.of(), typeFactory, calciteDefaultConfig);
73+
74+
// Query that needs to be converted; again this could be in a variety of SQL dialects
75+
final String query =
76+
"""
77+
SELECT vehicles.colour, count(*) as colourcount FROM vehicles INNER JOIN tests
78+
ON vehicles.vehicle_id=tests.vehicle_id WHERE tests.test_result = 'P'
79+
GROUP BY vehicles.colour ORDER BY count(*)
80+
""";
81+
final SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
82+
83+
// choose Apache Derby as an example dialect
84+
final SqlDialect dialect = SqlDialect.DatabaseProduct.DERBY.getDialect();
85+
final Plan substraitPlan = sqlToSubstrait.convert(query, catalogReader, dialect);
86+
87+
SubstraitStringify.explain(substraitPlan).forEach(System.out::println);
88+
89+
// write out to file if given a file name
90+
// convert to a protobuff byte array and write as binary file
91+
if (args.length == 1) {
92+
final PlanProtoConverter planToProto = new PlanProtoConverter();
93+
final byte[] buffer = planToProto.toProto(substraitPlan).toByteArray();
94+
95+
final Path outputFile = Paths.get(args[0]);
96+
Files.write(outputFile, buffer);
97+
System.out.println("File written to " + outputFile);
98+
}
99+
100+
} catch (SqlParseException | IOException e) {
101+
e.printStackTrace();
102+
}
103+
}
104+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.substrait.examples;
2+
3+
import java.util.Arrays;
4+
5+
/** Main class */
6+
public final class IsthmusAppExamples {
7+
8+
/** Implemented by all examples */
9+
@FunctionalInterface
10+
public interface Action {
11+
12+
/**
13+
* Run
14+
*
15+
* @param args String []
16+
*/
17+
void run(String[] args);
18+
}
19+
20+
private IsthmusAppExamples() {}
21+
22+
/**
23+
* Traditional main method
24+
*
25+
* @param args string[]
26+
*/
27+
@SuppressWarnings("unchecked")
28+
public static void main(final String args[]) {
29+
try {
30+
31+
if (args.length == 0) {
32+
System.err.println(
33+
"Please provide base classname of example to run. eg ToSql to run class io.substrait.examples.ToSql ");
34+
System.exit(-1);
35+
}
36+
final String exampleClass = args[0];
37+
38+
final Class<Action> clz =
39+
(Class<Action>)
40+
Class.forName(
41+
String.format("%s.%s", IsthmusAppExamples.class.getPackageName(), exampleClass));
42+
final Action action = clz.getDeclaredConstructor().newInstance();
43+
if (args.length == 1) {
44+
action.run(new String[] {});
45+
} else {
46+
action.run(Arrays.copyOfRange(args, 1, args.length));
47+
}
48+
} catch (Exception e) {
49+
e.printStackTrace();
50+
System.exit(-1);
51+
}
52+
}
53+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.substrait.examples;
2+
3+
import io.substrait.isthmus.calcite.SubstraitTable;
4+
import io.substrait.isthmus.sql.SubstraitCreateStatementParser;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import org.apache.calcite.jdbc.CalciteSchema;
8+
import org.apache.calcite.prepare.CalciteCatalogReader;
9+
import org.apache.calcite.sql.parser.SqlParseException;
10+
11+
/** Helper functions for schemas. */
12+
public final class SchemaHelper {
13+
14+
private SchemaHelper() {}
15+
16+
/**
17+
* Parses one or more SQL strings containing only CREATE statements into a {@link
18+
* CalciteCatalogReader}
19+
*
20+
* @param createStatements a SQL string containing only CREATE statements
21+
* @return a {@link CalciteCatalogReader} generated from the CREATE statements
22+
* @throws SqlParseException
23+
*/
24+
public static CalciteSchema processCreateStatementsToSchema(final List<String> createStatements)
25+
throws SqlParseException {
26+
27+
final List<SubstraitTable> tables = new ArrayList<>();
28+
for (final String statement : createStatements) {
29+
tables.addAll(SubstraitCreateStatementParser.processCreateStatements(statement));
30+
}
31+
32+
final CalciteSchema rootSchema = CalciteSchema.createRootSchema(false);
33+
for (final SubstraitTable table : tables) {
34+
rootSchema.add(table.getName(), table);
35+
}
36+
37+
return rootSchema;
38+
}
39+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.substrait.examples;
2+
3+
import io.substrait.examples.IsthmusAppExamples.Action;
4+
import io.substrait.examples.util.SubstraitStringify;
5+
import io.substrait.extension.DefaultExtensionCatalog;
6+
import io.substrait.extension.SimpleExtension;
7+
import io.substrait.isthmus.SubstraitToCalcite;
8+
import io.substrait.isthmus.SubstraitTypeSystem;
9+
import io.substrait.plan.Plan;
10+
import io.substrait.plan.Plan.Root;
11+
import io.substrait.plan.ProtoPlanConverter;
12+
import java.io.IOException;
13+
import java.nio.file.Files;
14+
import java.nio.file.Paths;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
18+
import org.apache.calcite.rel.RelNode;
19+
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
20+
import org.apache.calcite.sql.SqlDialect;
21+
import org.apache.calcite.sql.SqlNode;
22+
23+
/**
24+
* Substrait to SQL conversions.
25+
*
26+
* <p>There are steps in the whole process
27+
*
28+
* <p>1) Load the plan into the protobuf object, and creatithe in POJO memory object. 2) Create a
29+
* Converter to map the Substrait to Calcite relations. This will need the type system to use and
30+
* the collection of extensions to put into the substrait plan. 3) Given configuration, convert the
31+
* Calcite relational nodes to SQL statements.
32+
*
33+
* <p>It is possible to get multiple SQL statements from a single Substrait plan.
34+
*/
35+
public class ToSql implements Action {
36+
37+
@Override
38+
public void run(String[] args) {
39+
40+
try {
41+
42+
// Load the protobuf binary file into a Substrait Plan POJO
43+
System.out.println("Reading from " + args[0]);
44+
final byte[] buffer = Files.readAllBytes(Paths.get(args[0]));
45+
46+
final io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer);
47+
final ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
48+
final Plan plan = protoToPlan.from(proto);
49+
50+
// output the plan for information
51+
SubstraitStringify.explain(plan).forEach(System.out::println);
52+
53+
final SimpleExtension.ExtensionCollection extensions =
54+
DefaultExtensionCatalog.DEFAULT_COLLECTION;
55+
final SubstraitToCalcite converter =
56+
new SubstraitToCalcite(
57+
extensions, new JavaTypeFactoryImpl(SubstraitTypeSystem.TYPE_SYSTEM));
58+
59+
// Determine which SQL Dialect we want the resultnat queries to be in
60+
final SqlDialect sqlDialect = SqlDialect.DatabaseProduct.MYSQL.getDialect();
61+
62+
// Create the Sql to Calcite Relation Parser
63+
final RelToSqlConverter relToSql = new RelToSqlConverter(sqlDialect);
64+
final List<String> sqlStrings = new ArrayList<>();
65+
66+
// and get each root from the calcite plan; Then deployme this plan into the sql creaton step
67+
for (final Root root : plan.getRoots()) {
68+
final RelNode calciteRelNode = converter.convert(root).project(true);
69+
final SqlNode sqlNode = relToSql.visitRoot(calciteRelNode).asStatement();
70+
71+
final String sqlString = sqlNode.toSqlString(sqlDialect).getSql();
72+
sqlStrings.add(sqlString);
73+
}
74+
sqlStrings.forEach(System.out::println);
75+
76+
} catch (IOException e) {
77+
e.printStackTrace();
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)