Skip to content

Commit 343277e

Browse files
committed
feat: isthmus To/From SQL examples & enhanced APIs
Starting from two examples of using Isthmus to go from and to SQL a few new methods make the process a lot simpler and symetrical. We found that in use we needed helper wrapper clases, but these could be pushed into the main code. Signed-off-by: MBWhite <[email protected]>
1 parent 5d51349 commit 343277e

File tree

11 files changed

+484
-4
lines changed

11 files changed

+484
-4
lines changed

examples/isthmus-api/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
_apps
2+
_data
3+
**/*/bin
4+
build

examples/isthmus-api/README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Isthmus API Examples
2+
3+
The Isthmus library converts Substrait plans to and from Spark Plans. There are two examples showing convertion in each direction.
4+
5+
## How does this work in theory?
6+
7+
In both cases, the Calcite library is used to do parsing and generation of the SQL String. Calcite has it's own relational object model, so there are clalsses within Ishtmus to convert Substrait to and from Calcites object model.
8+
9+
Converting to Substrait from SQL will use Calcite to parse the SQL to an object model, and then it will be converted to Substrait.
10+
11+
Converting from Substrait to SQL will involved converting Substrait to Calcite's object model, then asking Calcite to generate SQL strings.
12+
13+
## Running the examples
14+
15+
There are 2 example classes:
16+
17+
- [FromSql](./src/main/java/io/substrait/examples/FromSql.java) that creates a plan starting from SQL
18+
- [ToSql](./app/src/main/java/io/substrait/examples/ToSQL.java) that reads a plan and creats the SQL
19+
20+
21+
### Requirements
22+
23+
To run these you will need:
24+
25+
- Java 17 or greater
26+
- [Two datafiles](./app/src/main/resources/) are provided for the sample data
27+
28+
29+
## Creating a Substrait Plan from SQL
30+
31+
To run [`FromSql.java`](./src/main/java/io/substrait/examples/FromSql.java) from the root of this repository. `subtrait.plan` is the name of file written.
32+
33+
```bash
34+
./gradlew examples:isthmus-api:run --args "FromSql substrait.plan"
35+
> Task :examples:isthmus-api:run
36+
Plan{version=Version{major=0, minor=77, patch=0, producer=isthmus}, roots=[Root{input=Sort{input=Aggregate{input=Project{remap=Remap{indices=[15]}, input=Filter{input=Join{left=NamedScan{initialSchema=NamedStruct{struct=Struct{nullable=false, fields=[VarChar{nullable=true, length=15}, VarChar{nullable=true, length=40}, VarChar{nullable=true, length=40}, VarChar{nullable=true, length=15}, VarChar{nullable=true, length=15}, I32{nullable=true}, VarChar{nullable=true, length=15}]}, names=[vehicle_id, make, model, colour, fuel_type, cylinder_capacity, first_use_date]}, names=[vehicles]}, right=NamedScan{initialSchema=NamedStruct{struct=Struct{nullable=false, fields=[VarChar{nullable=true, length=15}, VarChar{nullable=true, length=15}, VarChar{nullable=true, length=20}, VarChar{nullable=true, length=20}, VarChar{nullable=true, length=20}, VarChar{nullable=true, length=15}, I32{nullable=true}, VarChar{nullable=true, length=15}]}, names=[test_id, vehicle_id, test_date, test_class, test_type, test_result, test_mileage, postcode_area]}, names=[tests]}, condition=ScalarFunctionInvocation{declaration=equal:any_any, arguments=[FieldReference{segments=[StructField{offset=0}], type=VarChar{nullable=true, length=15}}, FieldReference{segments=[StructField{offset=8}], type=VarChar{nullable=true, length=15}}], options=[], outputType=Bool{nullable=true}}, joinType=INNER}, condition=ScalarFunctionInvocation{declaration=equal:any_any, arguments=[FieldReference{segments=[StructField{offset=12}], type=VarChar{nullable=true, length=15}}, VarCharLiteral{nullable=false, value=P, length=15}], options=[], outputType=Bool{nullable=true}}}, expressions=[FieldReference{segments=[StructField{offset=3}], type=VarChar{nullable=true, length=15}}]}, groupings=[Grouping{expressions=[FieldReference{segments=[StructField{offset=0}], type=VarChar{nullable=true, length=15}}]}], measures=[Measure{function=AggregateFunctionInvocation{declaration=count:, arguments=[], options=[], aggregationPhase=INITIAL_TO_RESULT, sort=[], outputType=I64{nullable=false}, invocation=ALL}}]}, sortFields=[SortField{expr=FieldReference{segments=[StructField{offset=1}], type=Struct{nullable=false, fields=[VarChar{nullable=true, length=15}, I64{nullable=false}]}}, direction=ASC_NULLS_LAST}]}, names=[COLOUR, COLOURCOUNT]}], expectedTypeUrls=[]}
37+
File written to substrait.plan
38+
```
39+
40+
It is a binary file, so to check the file written out
41+
```bash
42+
ls -l examples/isthmus-api/substrait.plan
43+
-rw-r--r-- 1 matthew matthew 808 Dec 1 12:05 examples/isthmus-api/substrait.plan
44+
```
45+
46+
Please see the code comments for details of how the conversion is done.
47+
48+
## Creating SQL from a Substrait Plan
49+
50+
To run [`ToSql.java`](./src/main/java/io/substrait/examples/ToSql.java) from the root of this repository
51+
`subtrait.plan` is the name of file to be read - and probably will be the first created with `FromSql`.
52+
53+
```bash
54+
./gradlew examples:isthmus-api:run --args "ToSql substrait.plan"
55+
56+
> Task :examples:isthmus-api:run
57+
Reading from substrait.plan
58+
Plan{version=Version{major=0, minor=77, patch=0, producer=isthmus}, roots=[Root{input=Sort{input=Aggregate{input=Project{remap=Remap{indices=[15]}, input=Filter{input=Join{left=NamedScan{initialSchema=NamedStruct{struct=Struct{nullable=false, fields=[VarChar{nullable=true, length=15}, VarChar{nullable=true, length=40}, VarChar{nullable=true, length=40}, VarChar{nullable=true, length=15}, VarChar{nullable=true, length=15}, I32{nullable=true}, VarChar{nullable=true, length=15}]}, names=[vehicle_id, make, model, colour, fuel_type, cylinder_capacity, first_use_date]}, names=[vehicles]}, right=NamedScan{initialSchema=NamedStruct{struct=Struct{nullable=false, fields=[VarChar{nullable=true, length=15}, VarChar{nullable=true, length=15}, VarChar{nullable=true, length=20}, VarChar{nullable=true, length=20}, VarChar{nullable=true, length=20}, VarChar{nullable=true, length=15}, I32{nullable=true}, VarChar{nullable=true, length=15}]}, names=[test_id, vehicle_id, test_date, test_class, test_type, test_result, test_mileage, postcode_area]}, names=[tests]}, condition=ScalarFunctionInvocation{declaration=equal:any_any, arguments=[FieldReference{segments=[StructField{offset=0}], type=VarChar{nullable=true, length=15}}, FieldReference{segments=[StructField{offset=8}], type=VarChar{nullable=true, length=15}}], options=[], outputType=Bool{nullable=true}}, joinType=INNER}, condition=ScalarFunctionInvocation{declaration=equal:any_any, arguments=[FieldReference{segments=[StructField{offset=12}], type=VarChar{nullable=true, length=15}}, VarCharLiteral{nullable=false, value=P, length=15}], options=[], outputType=Bool{nullable=true}}}, expressions=[FieldReference{segments=[StructField{offset=3}], type=VarChar{nullable=true, length=15}}]}, groupings=[Grouping{expressions=[FieldReference{segments=[StructField{offset=0}], type=VarChar{nullable=true, length=15}}]}], measures=[Measure{function=AggregateFunctionInvocation{declaration=count:, arguments=[], options=[], aggregationPhase=INITIAL_TO_RESULT, sort=[], outputType=I64{nullable=false}, invocation=ALL}}]}, sortFields=[SortField{expr=FieldReference{segments=[StructField{offset=1}], type=I64{nullable=false}}, direction=ASC_NULLS_LAST}]}, names=[COLOUR, COLOURCOUNT]}], expectedTypeUrls=[]}
59+
60+
SELECT `t2`.`colour0` AS `COLOUR`, `t2`.`$f1` AS `COLOURCOUNT`
61+
FROM (SELECT `vehicles`.`colour` AS `colour0`, COUNT(*) AS `$f1`
62+
FROM `vehicles`
63+
INNER JOIN `tests` ON `vehicles`.`vehicle_id` = `tests`.`vehicle_id`
64+
WHERE `tests`.`test_result` = 'P'
65+
GROUP BY `vehicles`.`colour`
66+
ORDER BY COUNT(*) IS NULL, 2) AS `t2`
67+
68+
```
69+
70+
The SQL statement in the selected dialect will be created (MySql is used in the example).
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
plugins {
2+
// Apply the application plugin to add support for building a CLI application in Java.
3+
id("application")
4+
alias(libs.plugins.spotless)
5+
id("substrait.java-conventions")
6+
}
7+
8+
repositories {
9+
// Use Maven Central for resolving dependencies.
10+
mavenCentral()
11+
}
12+
13+
dependencies {
14+
implementation(project(":isthmus"))
15+
implementation(libs.calcite.core)
16+
implementation(libs.calcite.server)
17+
// For a real Spark application, these would not be required since they would be in the Spark
18+
// server classpath
19+
runtimeOnly(libs.spark.core)
20+
runtimeOnly(libs.spark.hive)
21+
}
22+
23+
application { mainClass = "io.substrait.examples.IsthmusAppExamples" }
24+
25+
tasks.named<Test>("test") {
26+
// Use JUnit Platform for unit tests.
27+
useJUnitPlatform()
28+
}
29+
30+
java { toolchain { languageVersion.set(JavaLanguageVersion.of(17)) } }
31+
32+
tasks.pmdMain { dependsOn(":core:shadowJar") }
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package io.substrait.examples;
2+
3+
import io.substrait.examples.IsthmusAppExamples.Action;
4+
import io.substrait.isthmus.SqlToSubstrait;
5+
import io.substrait.isthmus.SubstraitTypeSystem;
6+
import io.substrait.isthmus.sql.SubstraitCreateStatementParser;
7+
import io.substrait.plan.Plan;
8+
import io.substrait.plan.PlanProtoConverter;
9+
import java.io.IOException;
10+
import java.nio.file.Files;
11+
import java.nio.file.Path;
12+
import java.nio.file.Paths;
13+
import java.util.List;
14+
import org.apache.calcite.config.CalciteConnectionConfig;
15+
import org.apache.calcite.config.CalciteConnectionProperty;
16+
import org.apache.calcite.jdbc.CalciteSchema;
17+
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
18+
import org.apache.calcite.prepare.CalciteCatalogReader;
19+
import org.apache.calcite.rel.type.RelDataTypeFactory;
20+
import org.apache.calcite.sql.SqlDialect;
21+
import org.apache.calcite.sql.parser.SqlParseException;
22+
23+
/**
24+
* Substrait from SQL conversions.
25+
*
26+
* <p>There are 4 steps in the whole process.
27+
*
28+
* <p>1) A fully typed schema is required for the 'inputs'. Within a SQL context this is the `CREATE
29+
* TABLE` commands; this needs to be converted to a Calcite Schema 2) The SQL query to convert (ion
30+
* one type of dialect) 3) Conversion of the SQL query to Calcite Relations 4) Conversion of the
31+
* Calcite Relations to Substrait relations
32+
*
33+
* <p>Note that schema could be created from other means eg Caclcite's refelect based schema.
34+
*
35+
* <p>The substrait plan can then be used as wished.
36+
*/
37+
public class FromSql implements Action {
38+
39+
@Override
40+
public void run(final String[] args) {
41+
try {
42+
final String createSql =
43+
"""
44+
CREATE TABLE "vehicles" ("vehicle_id" varchar(15), "make" varchar(40), "model" varchar(40),
45+
"colour" varchar(15), "fuel_type" varchar(15),
46+
"cylinder_capacity" int, "first_use_date" varchar(15));
47+
48+
CREATE TABLE "tests" ("test_id" varchar(15), "vehicle_id" varchar(15),
49+
"test_date" varchar(20), "test_class" varchar(20), "test_type" varchar(20),
50+
"test_result" varchar(15),"test_mileage" int, "postcode_area" varchar(15));
51+
52+
""";
53+
54+
// Create the Caclcite Schema from the CREATE TABLES statements
55+
// as this is a SQL it could be in a schema, but the Isthmus Helper classes here are assuminmg
56+
// a common SQL format
57+
final CalciteSchema calciteSchema = CalciteSchema.createRootSchema(false);
58+
SubstraitCreateStatementParser.processCreateStatements(createSql)
59+
.forEach(t -> calciteSchema.add(t.getName(), t));
60+
61+
// Type Factory based on Java Types
62+
final RelDataTypeFactory typeFactory =
63+
new JavaTypeFactoryImpl(SubstraitTypeSystem.TYPE_SYSTEM);
64+
65+
// Default configuration for calcite
66+
final CalciteConnectionConfig calciteDefaultConfig =
67+
CalciteConnectionConfig.DEFAULT.set(
68+
CalciteConnectionProperty.CASE_SENSITIVE, Boolean.FALSE.toString());
69+
70+
final CalciteCatalogReader catalogReader =
71+
new CalciteCatalogReader(calciteSchema, List.of(), typeFactory, calciteDefaultConfig);
72+
73+
// Query that needs to be converted; again this could be in a variety of SQL dialects
74+
final String query =
75+
"""
76+
SELECT vehicles.colour, count(*) as colourcount FROM vehicles INNER JOIN tests
77+
ON vehicles.vehicle_id=tests.vehicle_id WHERE tests.test_result = 'P'
78+
GROUP BY vehicles.colour ORDER BY count(*)
79+
""";
80+
final SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
81+
82+
// choose Apache Derby as an example dialect
83+
final SqlDialect dialect = SqlDialect.DatabaseProduct.DERBY.getDialect();
84+
final Plan substraitPlan = sqlToSubstrait.convert(query, catalogReader, dialect);
85+
86+
System.out.println(substraitPlan);
87+
88+
// write out to file if given a file name
89+
// convert to a protobuff byte array and write as binary file
90+
if (args.length == 1) {
91+
final PlanProtoConverter planToProto = new PlanProtoConverter();
92+
final byte[] buffer = planToProto.toProto(substraitPlan).toByteArray();
93+
94+
final Path outputFile = Paths.get(args[0]);
95+
Files.write(outputFile, buffer);
96+
System.out.println("File written to " + outputFile);
97+
}
98+
99+
} catch (SqlParseException | IOException e) {
100+
e.printStackTrace();
101+
}
102+
}
103+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.substrait.examples;
2+
3+
import java.util.Arrays;
4+
5+
/** Main class */
6+
public final class IsthmusAppExamples {
7+
8+
/** Implemented by all examples */
9+
@FunctionalInterface
10+
public interface Action {
11+
12+
/**
13+
* Run
14+
*
15+
* @param args String []
16+
*/
17+
void run(String[] args);
18+
}
19+
20+
private IsthmusAppExamples() {}
21+
22+
/**
23+
* Traditional main method
24+
*
25+
* @param args string[]
26+
*/
27+
@SuppressWarnings("unchecked")
28+
public static void main(final String args[]) {
29+
try {
30+
31+
if (args.length == 0) {
32+
System.err.println(
33+
"Please provide base classname of example to run. eg ToSql to run class io.substrait.examples.ToSql ");
34+
System.exit(-1);
35+
}
36+
final String exampleClass = args[0];
37+
38+
final Class<Action> clz =
39+
(Class<Action>)
40+
Class.forName(
41+
String.format("%s.%s", IsthmusAppExamples.class.getPackageName(), exampleClass));
42+
final Action action = clz.getDeclaredConstructor().newInstance();
43+
if (args.length == 1) {
44+
action.run(new String[] {});
45+
} else {
46+
action.run(Arrays.copyOfRange(args, 1, args.length));
47+
}
48+
} catch (Exception e) {
49+
e.printStackTrace();
50+
System.exit(-1);
51+
}
52+
}
53+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.substrait.examples;
2+
3+
import io.substrait.isthmus.calcite.SubstraitTable;
4+
import io.substrait.isthmus.sql.SubstraitCreateStatementParser;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import org.apache.calcite.jdbc.CalciteSchema;
8+
import org.apache.calcite.prepare.CalciteCatalogReader;
9+
import org.apache.calcite.sql.parser.SqlParseException;
10+
11+
/** Helper functions for schemas. */
12+
public final class SchemaHelper {
13+
14+
private SchemaHelper() {}
15+
16+
/**
17+
* Parses one or more SQL strings containing only CREATE statements into a {@link
18+
* CalciteCatalogReader}
19+
*
20+
* @param createStatements a SQL string containing only CREATE statements
21+
* @return a {@link CalciteCatalogReader} generated from the CREATE statements
22+
* @throws SqlParseException
23+
*/
24+
public static CalciteSchema processCreateStatementsToSchema(final List<String> createStatements)
25+
throws SqlParseException {
26+
27+
final List<SubstraitTable> tables = new ArrayList<>();
28+
for (final String statement : createStatements) {
29+
tables.addAll(SubstraitCreateStatementParser.processCreateStatements(statement));
30+
}
31+
32+
final CalciteSchema rootSchema = CalciteSchema.createRootSchema(false);
33+
for (final SubstraitTable table : tables) {
34+
rootSchema.add(table.getName(), table);
35+
}
36+
37+
return rootSchema;
38+
}
39+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.substrait.examples;
2+
3+
import io.substrait.examples.IsthmusAppExamples.Action;
4+
import io.substrait.extension.DefaultExtensionCatalog;
5+
import io.substrait.extension.SimpleExtension;
6+
import io.substrait.isthmus.SubstraitToCalcite;
7+
import io.substrait.isthmus.SubstraitTypeSystem;
8+
import io.substrait.plan.Plan;
9+
import io.substrait.plan.Plan.Root;
10+
import io.substrait.plan.ProtoPlanConverter;
11+
import java.io.IOException;
12+
import java.nio.file.Files;
13+
import java.nio.file.Paths;
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
17+
import org.apache.calcite.rel.RelNode;
18+
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
19+
import org.apache.calcite.sql.SqlDialect;
20+
import org.apache.calcite.sql.SqlNode;
21+
22+
/**
23+
* Substrait to SQL conversions.
24+
*
25+
* <p>There are steps in the whole process
26+
*
27+
* <p>1) Load the plan into the protobuf object, and creatithe in POJO memory object. 2) Create a
28+
* Converter to map the Substrait to Calcite relations. This will need the type system to use and
29+
* the collection of extensions to put into the substrait plan. 3) Given configuration, convert the
30+
* Calcite relational nodes to SQL statements.
31+
*
32+
* <p>It is possible to get multiple SQL statements from a single Substrait plan.
33+
*/
34+
public class ToSql implements Action {
35+
36+
@Override
37+
public void run(String[] args) {
38+
39+
try {
40+
41+
// Load the protobuf binary file into a Substrait Plan POJO
42+
System.out.println("Reading from " + args[0]);
43+
final byte[] buffer = Files.readAllBytes(Paths.get(args[0]));
44+
45+
final io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer);
46+
final ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
47+
final Plan substraitPlan = protoToPlan.from(proto);
48+
49+
// output the plan for information
50+
System.out.println(substraitPlan);
51+
52+
final SimpleExtension.ExtensionCollection extensions =
53+
DefaultExtensionCatalog.DEFAULT_COLLECTION;
54+
final SubstraitToCalcite converter =
55+
new SubstraitToCalcite(
56+
extensions, new JavaTypeFactoryImpl(SubstraitTypeSystem.TYPE_SYSTEM));
57+
58+
// Determine which SQL Dialect we want the resultnat queries to be in
59+
final SqlDialect sqlDialect = SqlDialect.DatabaseProduct.MYSQL.getDialect();
60+
61+
// Create the Sql to Calcite Relation Parser
62+
final RelToSqlConverter relToSql = new RelToSqlConverter(sqlDialect);
63+
final List<String> sqlStrings = new ArrayList<>();
64+
65+
// and get each root from the calcite plan; Then deployme this plan into the sql creaton step
66+
for (final Root root : substraitPlan.getRoots()) {
67+
final RelNode calciteRelNode = converter.convert(root).project(true);
68+
final SqlNode sqlNode = relToSql.visitRoot(calciteRelNode).asStatement();
69+
70+
final String sqlString = sqlNode.toSqlString(sqlDialect).getSql();
71+
sqlStrings.add(sqlString);
72+
}
73+
sqlStrings.forEach(System.out::println);
74+
75+
} catch (IOException e) {
76+
e.printStackTrace();
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)