diff --git a/neo4j-plugin/docs/Neo4jSink-batchsink.md b/neo4j-plugin/docs/Neo4jSink-batchsink.md new file mode 100644 index 000000000..647000986 --- /dev/null +++ b/neo4j-plugin/docs/Neo4jSink-batchsink.md @@ -0,0 +1,52 @@ +# Neo4j Batch Sink + + +Description +----------- +Write data to Neo4j instance using a configurable CQL query. +Count of created Node and Relation depend on Output Query. + + +Use Case +-------- +The sink is used whenever you need to write to a Neo4j instance. + + +Properties +---------- +**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc. + +**Driver Name:** Name of the JDBC driver to use. + +**Neo4j Host:** Neo4j database host. + +**Neo4j Port:** Neo4j database port. + +**Output Query:** The query to use to export data to the Neo4j database. Query example: 'CREATE (n: $(*))' +or 'CREATE (n: $(property_1, property_2))'. Addition information can be found on +https://wiki.cask.co/display/CE/Neo4j+database+plugin + +**Username:** User to use to connect to the Neo4j database. + +**Password:** Password to use to connect to the Neo4j database. + + +Data Types Mapping +---------- + + | CDAP Schema Data Types | Neo4j Data Types | Comment | + | ---------------------- | ------------------------------------- | -------------------------------------------- | + | null | null | | + | array | List | | + | boolean | Boolean | | + | long, int | Integer | | + | double | Float | | + | string | String | | + | bytes | ByteArray | | + | date | Date | | + | time-micros | Time, LocalTime | | + | timestamp-micros | DateTime, LocalDateTime | | + | record | Duration, Point | Depending on record fields | + + + \ No newline at end of file diff --git a/neo4j-plugin/docs/Neo4jSource-batchsource.md b/neo4j-plugin/docs/Neo4jSource-batchsource.md new file mode 100644 index 000000000..a31b11c20 --- /dev/null +++ b/neo4j-plugin/docs/Neo4jSource-batchsource.md @@ -0,0 +1,61 @@ +# Neo4j Batch Source + + +Description +----------- +Reads from a Neo4j instance using a configurable CQL query. +Outputs one record for each row returned by the query. + + +Use Case +-------- +The source is used whenever you need to read from a Neo4j instance. + + +Properties +---------- +**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc. + +**Driver Name:** Name of the JDBC driver to use. + +**Neo4j Host:** Neo4j database host. + +**Neo4j Port:** Neo4j database port. + +**Input Query:** The query to use to import data from the Neo4j database. +Query example: 'MATCH (n:Label) RETURN n.property_1, n.property_2'. + +**Username:** User to use to connect to the Neo4j database. + +**Password:** Password to use to connect to the Neo4j database. + +**Splits Number:** The number of splits to generate. If set to one, the orderBy is not needed. + +**Order By:** Field Name which will be used for ordering during splits generation. This is required unless numSplits +is set to one and 'ORDER BY' keyword not exist in Input Query. + + +Data Types Mapping +---------- + + | Neo4j Data Types | CDAP Schema Data Types | Comment | + | ------------------------------- | ---------------------- | -------------------------------------------------- | + | null | null | | + | List | array | | + | Map | record | | + | Boolean | boolean | | + | Integer | long | | + | Float | double | | + | String | string | | + | ByteArray | bytes | | + | Date | date | | + | Time | time-micros | | + | LocalTime | time-micros | | + | DateTime | timestamp-micros | | + | LocalDateTime | timestamp-micros | | + | Node | record | | + | Relationship | record | | + | Duration | record | | + | Point | record | | + | Path | | | + \ No newline at end of file diff --git a/neo4j-plugin/icons/Neo4jSink-batchsink.png b/neo4j-plugin/icons/Neo4jSink-batchsink.png new file mode 100644 index 000000000..e7d5c3836 Binary files /dev/null and b/neo4j-plugin/icons/Neo4jSink-batchsink.png differ diff --git a/neo4j-plugin/icons/Neo4jSource-batchsource.png b/neo4j-plugin/icons/Neo4jSource-batchsource.png new file mode 100644 index 000000000..e7d5c3836 Binary files /dev/null and b/neo4j-plugin/icons/Neo4jSource-batchsource.png differ diff --git a/neo4j-plugin/pom.xml b/neo4j-plugin/pom.xml new file mode 100644 index 000000000..909d6a31c --- /dev/null +++ b/neo4j-plugin/pom.xml @@ -0,0 +1,122 @@ + + + + + database-plugins + io.cdap.plugin + 1.3.0-SNAPSHOT + + + Neo4j plugin + neo4j-plugin + 1.3.0-SNAPSHOT + 4.0.0 + + + + io.cdap.cdap + cdap-etl-api + + + io.cdap.plugin + database-commons + ${project.version} + + + io.cdap.plugin + hydrator-common + + + org.apache.hadoop + hadoop-common + + + org.neo4j + neo4j-jdbc-driver + 3.4.0 + + + + io.cdap.plugin + database-commons + ${project.version} + test-jar + test + + + io.cdap.cdap + hydrator-test + + + io.cdap.cdap + cdap-data-pipeline + + + junit + junit + + + io.cdap.cdap + cdap-api + provided + + + org.jetbrains + annotations + RELEASE + compile + + + + + + + org.apache.felix + maven-bundle-plugin + 3.3.0 + true + + + <_exportcontents> + io.cdap.plugin.neo4j.*; + org.apache.commons.logging.*; + org.codehaus.jackson.* + + *;inline=false;scope=compile + true + lib + + + + + package + + bundle + + + + + + io.cdap + cdap-maven-plugin + + + + + \ No newline at end of file diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/Neo4jConstants.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/Neo4jConstants.java new file mode 100644 index 000000000..7c656b1e2 --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/Neo4jConstants.java @@ -0,0 +1,67 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j; + +import io.cdap.cdap.api.data.schema.Schema; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Neo4j constants. + */ +public final class Neo4jConstants { + public static final String NEO4J_CONNECTION_STRING_FORMAT = "jdbc:neo4j:bolt://%s:%s/?username=%s,password=%s"; + public static final String NAME_REFERENCE_NAME = "referenceName"; + public static final String NAME_DRIVER_NAME = "jdbcPluginName"; + public static final String NAME_HOST_STRING = "neo4jHost"; + public static final String NAME_PORT_STRING = "neo4jPort"; + public static final String NAME_USERNAME = "username"; + public static final String NAME_PASSWORD = "password"; + + public static final String OUTPUT_QUERY = "cdap.neo4j.output.query"; + + public static final List DURATION_RECORD_FIELDS = Collections.unmodifiableList(Arrays.asList( + Schema.Field.of("duration", Schema.of(Schema.Type.STRING)), + Schema.Field.of("seconds", Schema.of(Schema.Type.LONG)), + Schema.Field.of("months", Schema.of(Schema.Type.LONG)), + Schema.Field.of("days", Schema.of(Schema.Type.LONG)), + Schema.Field.of("nanoseconds", Schema.of(Schema.Type.INT)) + )); + + public static final List POINT_2D_RECORD_FIELDS = Collections.unmodifiableList(Arrays.asList( + Schema.Field.of("srid", Schema.of(Schema.Type.INT)), + Schema.Field.of("x", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("y", Schema.of(Schema.Type.DOUBLE)) + )); + + public static final List POINT_3D_RECORD_FIELDS = Collections.unmodifiableList(Arrays.asList( + Schema.Field.of("srid", Schema.of(Schema.Type.INT)), + Schema.Field.of("x", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("y", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("z", Schema.of(Schema.Type.DOUBLE)) + )); + + public static final List NEO4J_SYS_FIELDS = Collections.unmodifiableList(Arrays.asList( + "_id", + "_labels", + "_type", + "_startId", + "_endId" + )); +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/Neo4jRecord.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/Neo4jRecord.java new file mode 100644 index 000000000..a00dc34ef --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/Neo4jRecord.java @@ -0,0 +1,225 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j; + +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.db.ColumnType; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.neo4j.source.Neo4jSchemaReader; +import org.neo4j.driver.internal.InternalIsoDuration; +import org.neo4j.driver.internal.InternalPoint2D; +import org.neo4j.driver.internal.InternalPoint3D; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +/** + * Writable class for Neo4j Source/Sink. + */ +public class Neo4jRecord extends DBRecord { + private static final Logger LOG = LoggerFactory.getLogger(Neo4jRecord.class); + + private Map positions; + + public Neo4jRecord(StructuredRecord record, List columnTypes) { + super(record, columnTypes); + } + + public Neo4jRecord() { + } + + @Override + protected SchemaReader getSchemaReader() { + return new Neo4jSchemaReader(); + } + + @Override + protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, + int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { + if (Types.JAVA_OBJECT == sqlType) { + handleSpecificType(resultSet, recordBuilder, field, columnIndex); + } else { + setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); + } + } + + private void handleSpecificType(ResultSet resultSet, + StructuredRecord.Builder recordBuilder, + Schema.Field field, int columnIndex) throws SQLException { + if (resultSet.getObject(columnIndex) instanceof byte[]) { + recordBuilder.set(field.getName(), resultSet.getObject(columnIndex)); + } else { + recordBuilder.set(field.getName(), buildRecord(field.getSchema(), resultSet.getObject(columnIndex, Map.class))); + } + } + + private StructuredRecord buildRecord(Schema schema, Map values) { + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + for (int i = 0; i < schema.getFields().size(); i++) { + Schema.Field field = schema.getFields().get(i); + Object o = values.get(field.getName()); + if (o instanceof Date) { + builder.setDate(field.getName(), ((Date) o).toLocalDate()); + } else if (o instanceof Time) { + builder.setTime(field.getName(), ((Time) o).toLocalTime()); + } else if (o instanceof Timestamp) { + Instant instant = ((Timestamp) o).toInstant(); + builder.setTimestamp(field.getName(), instant.atZone(ZoneId.ofOffset("UTC", ZoneOffset.UTC))); + } else if (o instanceof BigDecimal) { + builder.setDecimal(field.getName(), (BigDecimal) o); + } else if (o instanceof Map) { + builder.set(field.getName(), buildRecord(field.getSchema(), (Map) o)); + } else { + builder.set(field.getName(), o); + } + } + + return builder.build(); + } + + @Override + public void write(PreparedStatement stmt) throws SQLException { + for (int i = 0; i < positions.size(); i++) { + Map results = new TreeMap<>(); + String fields = positions.get(i + 1); + List fieldsList = Arrays.stream(fields.split(",")).map(String::trim).collect(Collectors.toList()); + if (fieldsList.size() == 1 && fieldsList.get(0).equals("*")) { + fieldsList = record.getSchema().getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()); + } + for (String v : fieldsList) { + Schema.Field field = record.getSchema().getField(v); + results.put(v, processData(record, field, true)); + } + stmt.setObject(i + 1, results); + } + } + + private Object processData(StructuredRecord record, @Nullable Schema.Field field, boolean baseRecord) + throws SQLException { + if (field == null) { + return null; + } + String fieldName = field.getName(); + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + Schema.Type fieldType = fieldSchema.getType(); + Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType(); + Object fieldValue = record.get(fieldName); + + if (fieldValue == null) { + return null; + } + + if (fieldLogicalType != null) { + switch (fieldLogicalType) { + case DATE: + return record.getDate(fieldName); + case TIME_MILLIS: + case TIME_MICROS: + return record.getTime(fieldName); + case TIMESTAMP_MILLIS: + case TIMESTAMP_MICROS: + return record.getTimestamp(fieldName); + case DECIMAL: + return record.getDecimal(fieldName); + } + return null; + } + + switch (fieldType) { + case NULL: + return null; + case BYTES: + return fieldValue instanceof ByteBuffer ? Bytes.toBytes((ByteBuffer) fieldValue) : (byte[]) fieldValue; + case RECORD: + return processRecord((StructuredRecord) fieldValue, baseRecord); + case STRING: + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case ARRAY: + return fieldValue; + default: + throw new SQLException(String.format("Unsupported datatype: %s with value: %s.", fieldType, fieldValue)); + } + } + + private Object processRecord(StructuredRecord record, boolean baseRecord) throws SQLException { + Schema schema = record.getSchema(); + + if (schema.getFields().stream().map(Schema.Field::toString).collect(Collectors.toList()) + .containsAll(Neo4jConstants.DURATION_RECORD_FIELDS.stream().map(Schema.Field::toString) + .collect(Collectors.toList()))) { + return new InternalIsoDuration(record.get("months"), record.get("days"), record.get("seconds"), + record.get("nanoseconds")); + } else if (schema.getFields().stream().map(Schema.Field::toString).collect(Collectors.toList()) + .containsAll(Neo4jConstants.POINT_3D_RECORD_FIELDS.stream().map(Schema.Field::toString) + .collect(Collectors.toList()))) { + return new InternalPoint3D(record.get("srid"), record.get("x"), record.get("y"), record.get("z")); + } else if (schema.getFields().stream().map(Schema.Field::toString).collect(Collectors.toList()) + .containsAll(Neo4jConstants.POINT_2D_RECORD_FIELDS.stream().map(Schema.Field::toString) + .collect(Collectors.toList()))) { + return new InternalPoint2D(record.get("srid"), record.get("x"), record.get("y")); + } else { + Map resultMap = new TreeMap<>(); + for (Schema.Field field : schema.getFields()) { + if (Neo4jConstants.NEO4J_SYS_FIELDS.contains(field.getName())) { + continue; + } + if (field.getSchema().getType().equals(Schema.Type.RECORD)) { + if (!baseRecord) { + throw new SQLException(String.format("Unsupported datatype: %s with value: %s.", + field.getSchema().getType(), + record.get(field.getName()))); + } else { + resultMap.put(field.getName(), processRecord(record.get(field.getName()), false)); + } + } + resultMap.put(field.getName(), processData(record, field, false)); + } + return resultMap; + } + } + + public void setPositions(Map positions) { + if (this.positions == null) { + this.positions = positions; + } + } +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jDataDriveDBOutputFormat.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jDataDriveDBOutputFormat.java new file mode 100644 index 000000000..faa008b86 --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jDataDriveDBOutputFormat.java @@ -0,0 +1,183 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.sink; + +import com.google.common.base.Throwables; +import io.cdap.plugin.db.ConnectionConfigAccessor; +import io.cdap.plugin.db.JDBCDriverShim; +import io.cdap.plugin.neo4j.Neo4jConstants; +import io.cdap.plugin.neo4j.Neo4jRecord; +import io.cdap.plugin.util.DBUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A OutputFormat that write data to Neo4j. + * + * @param - Key passed to this class to be written + * @param - Value passed to this class to be written. The value is ignored. + */ +public class Neo4jDataDriveDBOutputFormat extends DBOutputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(Neo4jDataDriveDBOutputFormat.class); + + private Configuration conf; + private Driver driver; + private JDBCDriverShim driverShim; + + private Map positions = new HashMap<>(); + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { + conf = context.getConfiguration(); + + try { + Connection connection = getConnection(conf); + PreparedStatement statement = connection.prepareStatement(constructQuery()); + return new DBRecordWriter(connection, statement) { + + private boolean emptyData = true; + + //Implementation of the close method below is the exact implementation in DBOutputFormat except that + //we check if there is any data to be written and if not, we skip executeBatch call. + //There might be reducers that don't receive any data and thus this check is necessary to prevent + //empty data to be committed (since some Databases doesn't support that). + @Override + public void close(TaskAttemptContext context) throws IOException { + try { + if (!emptyData) { + getStatement().executeBatch(); + getConnection().commit(); + } + } catch (SQLException e) { + try { + getConnection().rollback(); + } catch (SQLException ex) { + LOG.warn(StringUtils.stringifyException(ex)); + } + throw new IOException(e); + } finally { + try { + getStatement().close(); + getConnection().close(); + } catch (SQLException ex) { + throw new IOException(ex); + } + } + + try { + DriverManager.deregisterDriver(driverShim); + } catch (SQLException e) { + throw new IOException(e); + } + } + + @Override + public void write(K key, V value) { + emptyData = false; + //We need to make correct logging to avoid losing information about error + try { + ((Neo4jRecord) key).setPositions(positions); + key.write(getStatement()); + getStatement().addBatch(); + } catch (SQLException e) { + LOG.warn("Failed to write value to database", e); + } + } + }; + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + + private Connection getConnection(Configuration conf) { + Connection connection; + try { + String url = conf.get(DBConfiguration.URL_PROPERTY); + try { + // throws SQLException if no suitable driver is found + DriverManager.getDriver(url); + } catch (SQLException e) { + if (driverShim == null) { + if (driver == null) { + ClassLoader classLoader = conf.getClassLoader(); + @SuppressWarnings("unchecked") + Class driverClass = + (Class) classLoader.loadClass(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); + driver = driverClass.newInstance(); + + // De-register the default driver that gets registered when driver class is loaded. + DBUtils.deregisterAllDrivers(driverClass); + } + + driverShim = new JDBCDriverShim(driver); + DriverManager.registerDriver(driverShim); + LOG.debug("Registered JDBC driver via shim {}. Actual Driver {}.", driverShim, driver); + } + } + + ConnectionConfigAccessor connectionConfigAccessor = new ConnectionConfigAccessor(conf); + Map connectionArgs = connectionConfigAccessor.getConnectionArguments(); + Properties properties = new Properties(); + properties.putAll(connectionArgs); + connection = DriverManager.getConnection(url, properties); + connection.setReadOnly(false); + } catch (Exception e) { + throw Throwables.propagate(e); + } + return connection; + } + + public String constructQuery() { + return processQuery(conf.get(Neo4jConstants.OUTPUT_QUERY)); + } + + public String processQuery(String query) { + positions = new HashMap<>(); + String regex = "\\$\\([^()]+\\)"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(query); + int counter = 1; + while (matcher.find()) { + String group = matcher.group(); + positions.put(counter, group.substring(group.indexOf("(") + 1, group.indexOf(")"))); + query = query.replace(group, "{" + counter + "}"); + counter++; + } + return query; + } +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jSink.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jSink.java new file mode 100644 index 000000000..2fac08c97 --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jSink.java @@ -0,0 +1,170 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.sink; + +import com.google.common.base.Preconditions; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Output; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.plugin.PluginProperties; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.ReferenceBatchSink; +import io.cdap.plugin.common.ReferencePluginConfig; +import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; +import io.cdap.plugin.db.ColumnType; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.ConnectionConfigAccessor; +import io.cdap.plugin.neo4j.Neo4jConstants; +import io.cdap.plugin.neo4j.Neo4jRecord; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.lib.db.DBConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Driver; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Batch sink to write to Neo4j. + */ +@Plugin(type = BatchSink.PLUGIN_TYPE) +@Name(Neo4jSink.NAME) +@Description("Writes records to a Neo4j database.") +public class Neo4jSink extends ReferenceBatchSink { + public static final String NAME = "Neo4jSink"; + private static final Logger LOG = LoggerFactory.getLogger(Neo4jSink.class); + + private Neo4jSinkConfig config; + + private Class driverClass; + private List columnTypes; + protected List columns; + protected String dbColumns; + + public Neo4jSink(Neo4jSinkConfig config) { + super(new ReferencePluginConfig(config.getReferenceName())); + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema(); + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + config.validate(collector, inputSchema); + + Class driverClass = pipelineConfigurer.usePluginClass( + ConnectionConfig.JDBC_PLUGIN_TYPE, + config.getJdbcPluginName(), + getJDBCPluginId(), PluginProperties.builder().build()); + Preconditions.checkArgument( + driverClass != null, "Unable to load JDBC Driver class for plugin name '%s'. Please make sure " + + "that the plugin '%s' of type '%s' containing the driver has been installed correctly.", + config.getJdbcPluginName(), + config.getJdbcPluginName(), ConnectionConfig.JDBC_PLUGIN_TYPE); + try { + Class.forName(driverClass.getName()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void initialize(BatchRuntimeContext context) throws Exception { + super.initialize(context); + + driverClass = context.loadPluginClass(getJDBCPluginId()); + + Schema outputSchema = Optional.ofNullable(context.getInputSchema()).orElse(null); + + if (outputSchema != null) { + setColumnsInfo(outputSchema.getFields()); + setColumnsType(outputSchema.getFields()); + } + } + + @Override + public void prepareRun(BatchSinkContext context) throws Exception { + FailureCollector collector = context.getFailureCollector(); + config.validate(collector, context.getInputSchema()); + collector.getOrThrowException(); + + Schema outputSchema = context.getInputSchema(); + setColumnsInfo(outputSchema.getFields()); + emitLineage(context, outputSchema.getFields()); + driverClass = context.loadPluginClass(getJDBCPluginId()); + + ConnectionConfigAccessor configAccessor = new ConnectionConfigAccessor(); + configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName()); + configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, config.getConnectionString()); + configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns); + configAccessor.getConfiguration().set(DBConfiguration.USERNAME_PROPERTY, config.getUsername()); + configAccessor.getConfiguration().set(DBConfiguration.PASSWORD_PROPERTY, config.getPassword()); + configAccessor.getConfiguration().set(Neo4jConstants.OUTPUT_QUERY, config.getOutputQuery()); + + context.addOutput(Output.of(config.getReferenceName(), + new SinkOutputFormatProvider(Neo4jDataDriveDBOutputFormat.class, + configAccessor.getConfiguration()))); + } + + @Override + public void transform(StructuredRecord input, Emitter> emitter) { + emitter.emit(new KeyValue<>(getNeo4jRecord(input), null)); + } + + private Neo4jRecord getNeo4jRecord(StructuredRecord output) { + return new Neo4jRecord(output, columnTypes); + } + + private String getJDBCPluginId() { + return String.format("%s.%s.%s", "sink", ConnectionConfig.JDBC_PLUGIN_TYPE, "neo4j"); + } + + private void setColumnsInfo(List fields) { + columns = fields.stream() + .map(Schema.Field::getName) + .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); + + dbColumns = String.join(",", columns); + } + + private void setColumnsType(List fields) { + columnTypes = fields.stream().map(v -> new ColumnType(v.getName(), null, 0)).collect(Collectors.toList()); + } + + private void emitLineage(BatchSinkContext context, List fields) { + LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName()); + + if (!fields.isEmpty()) { + lineageRecorder.recordWrite("Write", "Wrote to DB table.", + fields.stream().map(Schema.Field::getName).collect(Collectors.toList())); + } + } +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jSinkConfig.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jSinkConfig.java new file mode 100644 index 000000000..180af2919 --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/sink/Neo4jSinkConfig.java @@ -0,0 +1,255 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.sink; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.plugin.PluginConfig; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.neo4j.Neo4jConstants; + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Batch source to read from Neo4j. + */ +public class Neo4jSinkConfig extends PluginConfig { + + private static final List UNAVAILABLE_QUERY_KEYWORDS = + Arrays.asList("UNWIND", "DELETE", "SET", "REMOVE"); + + public static final String NAME_OUTPUT_QUERY = "outputQuery"; + + @Name(Neo4jConstants.NAME_REFERENCE_NAME) + @Description("This will be used to uniquely identify this source for lineage, annotating metadata, etc.") + private String referenceName; + + @Macro + @Name(Neo4jConstants.NAME_DRIVER_NAME) + @Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " + + "file for the JDBC plugin.") + private String jdbcPluginName; + + @Macro + @Name(Neo4jConstants.NAME_HOST_STRING) + @Description("Neo4j database host.") + private String neo4jHost; + + @Macro + @Name(Neo4jConstants.NAME_PORT_STRING) + @Description("Neo4j database port.") + private Integer neo4jPort; + + @Macro + @Name(Neo4jConstants.NAME_USERNAME) + @Description("User to use to connect to the Neo4j database.") + private String username; + + @Macro + @Name(Neo4jConstants.NAME_PASSWORD) + @Description("Password to use to connect to the Neo4j database.") + private String password; + + @Name(NAME_OUTPUT_QUERY) + @Description("The query to use to export data to the Neo4j database. Query example: " + + "'CREATE (n: $(*))' or 'CREATE (n: $(property_1, property_2))'") + private String outputQuery; + + public Neo4jSinkConfig(String referenceName, String jdbcPluginName, String neo4jHost, Integer neo4jPort, + String username, String password, String outputQuery) { + this.referenceName = referenceName; + this.jdbcPluginName = jdbcPluginName; + this.neo4jHost = neo4jHost; + this.neo4jPort = neo4jPort; + this.username = username; + this.password = password; + this.outputQuery = outputQuery; + } + + private Neo4jSinkConfig(Builder builder) { + referenceName = builder.referenceName; + jdbcPluginName = builder.jdbcPluginName; + neo4jHost = builder.neo4jHost; + neo4jPort = builder.neo4jPort; + username = builder.username; + password = builder.password; + outputQuery = builder.outputQuery; + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(Neo4jSinkConfig copy) { + return builder() + .setReferenceName(copy.referenceName) + .setJdbcPluginName(copy.jdbcPluginName) + .setNeo4jHost(copy.neo4jHost) + .setNeo4jPort(copy.neo4jPort) + .setUsername(copy.username) + .setPassword(copy.password) + .setOutputQuery(copy.outputQuery); + } + + public String getReferenceName() { + return referenceName; + } + + public String getJdbcPluginName() { + return jdbcPluginName; + } + + public String getNeo4jHost() { + return neo4jHost; + } + + public int getNeo4jPort() { + return neo4jPort == null ? 0 : neo4jPort; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getOutputQuery() { + return outputQuery; + } + + public String getConnectionString() { + return String.format(Neo4jConstants.NEO4J_CONNECTION_STRING_FORMAT, getNeo4jHost(), getNeo4jPort(), + getUsername(), getPassword()); + } + + public void validate(FailureCollector collector, Schema inputSchema) { + + if (UNAVAILABLE_QUERY_KEYWORDS.stream().parallel().anyMatch(outputQuery.toUpperCase()::contains)) { + collector.addFailure( + String.format("The input request must not contain any of the following keywords: '%s'", + UNAVAILABLE_QUERY_KEYWORDS.toString()), + "Proved correct Input query.") + .withConfigProperty(NAME_OUTPUT_QUERY); + } + + List schemaFields = inputSchema.getFields().stream().map(Schema.Field::getName) + .collect(Collectors.toList()); + + if (!outputQuery.toUpperCase().startsWith("CREATE")) { + collector.addFailure("Output query must start with 'CREATE' keyword.", null) + .withConfigProperty(NAME_OUTPUT_QUERY); + } + + String regex = "\\$\\([^()]+\\)"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(outputQuery); + if (!matcher.find()) { + collector.addFailure("Output query must contained at least one block of properties '$(...)'.", + "Provide correct output query.") + .withConfigProperty(NAME_OUTPUT_QUERY); + return; + } + + matcher = pattern.matcher(outputQuery); + + while (matcher.find()) { + String group = matcher.group(); + if (group.equals("$(*)")) { + continue; + } + group = group.substring(group.indexOf("(") + 1, group.indexOf(")")); + List values = Arrays.stream(group.split(",")).map(String::trim).collect(Collectors.toList()); + if (values.isEmpty()) { + collector.addFailure("The block of properties can not be empty.", + "Provide block of properties in format '$(*)' or '$(property_1, property_2)'.") + .withConfigProperty(NAME_OUTPUT_QUERY); + } + for (String value : values) { + if (!schemaFields.contains(value)) { + collector.addFailure("Property '%s' not exists in input schema.", + "Provide property that present in input schema.") + .withConfigProperty(NAME_OUTPUT_QUERY) + .withInputSchemaField(value); + break; + } + } + } + } + + /** + * Builder for Neo4jSinkConfig + */ + public static final class Builder { + private String referenceName; + private String jdbcPluginName; + private String neo4jHost; + private Integer neo4jPort; + private String username; + private String password; + private String outputQuery; + + private Builder() { + } + + public Builder setReferenceName(String referenceName) { + this.referenceName = referenceName; + return this; + } + + public Builder setJdbcPluginName(String jdbcPluginName) { + this.jdbcPluginName = jdbcPluginName; + return this; + } + + public Builder setNeo4jHost(String neo4jHost) { + this.neo4jHost = neo4jHost; + return this; + } + + public Builder setNeo4jPort(Integer neo4jPort) { + this.neo4jPort = neo4jPort; + return this; + } + + public Builder setUsername(String username) { + this.username = username; + return this; + } + + public Builder setPassword(String password) { + this.password = password; + return this; + } + + public Builder setOutputQuery(String outputQuery) { + this.outputQuery = outputQuery; + return this; + } + + public Neo4jSinkConfig build() { + return new Neo4jSinkConfig(this); + } + } +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jDBRecordReader.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jDBRecordReader.java new file mode 100644 index 000000000..c3d88877a --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jDBRecordReader.java @@ -0,0 +1,68 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.source; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBRecordReader; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; + +/** + * A RecordReader that reads records from a Neo4j. + * + * @param + */ +public class Neo4jDBRecordReader extends DBRecordReader { + + private static final Logger LOG = LoggerFactory.getLogger(Neo4jDBRecordReader.class); + + public Neo4jDBRecordReader(DBInputFormat.DBInputSplit split, Class inputClass, Configuration conf, + Connection conn, DBConfiguration dbConfig, String cond, String[] fields, + String table) throws SQLException { + super(split, inputClass, conf, conn, dbConfig, cond, fields, table); + } + + @Override + protected String getSelectQuery() { + StringBuilder query = new StringBuilder(); + + //PREBUILT QUERY + String inputQuery = getDBConf().getInputQuery(); + boolean existOrderBy = inputQuery.toUpperCase().contains(" ORDER BY "); + query.append(getDBConf().getInputQuery()); + String orderBy = getDBConf().getInputOrderBy(); + if (!existOrderBy && orderBy != null && orderBy.length() > 0) { + query.append(" ORDER BY ").append(orderBy); + } + + try { + query.append(" SKIP ").append(getSplit().getStart()); + query.append(" LIMIT ").append(getSplit().getLength()); + } catch (IOException ex) { + // should never happen + throw new IllegalStateException(ex); + } + return query.toString(); + } +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jDataDriveDBInputFormat.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jDataDriveDBInputFormat.java new file mode 100644 index 000000000..4c153288c --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jDataDriveDBInputFormat.java @@ -0,0 +1,137 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.source; + +import com.google.common.base.Throwables; +import io.cdap.plugin.db.ConnectionConfigAccessor; +import io.cdap.plugin.db.JDBCDriverShim; +import io.cdap.plugin.util.DBUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +/** + * A InputFormat that reads input data from an Neo4j. + * + * @param + */ +public class Neo4jDataDriveDBInputFormat extends DBInputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(Neo4jDataDriveDBInputFormat.class); + + private Driver driver; + private JDBCDriverShim driverShim; + + public static void setInput(Configuration conf, Class inputClass, String inputQuery, + String orderBy, boolean enableAutoCommit) { + DBConfiguration dbConf = new DBConfiguration(conf); + dbConf.setInputClass(inputClass); + dbConf.setInputQuery(inputQuery); + dbConf.setInputOrderBy(orderBy); + new ConnectionConfigAccessor(conf).setAutoCommitEnabled(enableAutoCommit); + } + + @Override + public void setConf(Configuration conf) { + dbConf = new DBConfiguration(conf); + connection = getConnection(); + tableName = dbConf.getInputTableName(); + fieldNames = dbConf.getInputFieldNames(); + conditions = dbConf.getInputConditions(); + } + + @Override + public Connection getConnection() { + if (this.connection == null) { + ConnectionConfigAccessor connectionConfigAccessor = new ConnectionConfigAccessor(getConf()); + try { + String url = connectionConfigAccessor.getConfiguration().get(DBConfiguration.URL_PROPERTY); + try { + // throws SQLException if no suitable driver is found + DriverManager.getDriver(url); + } catch (SQLException e) { + if (driverShim == null) { + if (driver == null) { + ClassLoader classLoader = connectionConfigAccessor.getConfiguration().getClassLoader(); + String driverClassName = connectionConfigAccessor.getConfiguration() + .get(DBConfiguration.DRIVER_CLASS_PROPERTY); + @SuppressWarnings("unchecked") + Class driverClass = + (Class) classLoader.loadClass(driverClassName); + driver = driverClass.newInstance(); + + // De-register the default driver that gets registered when driver class is loaded. + DBUtils.deregisterAllDrivers(driverClass); + } + driverShim = new JDBCDriverShim(driver); + DriverManager.registerDriver(driverShim); + LOG.debug("Registered JDBC driver via shim {}. Actual Driver {}.", driverShim, driver); + } + } + + Properties properties = new Properties(); + properties.putAll(connectionConfigAccessor.getConnectionArguments()); + connection = DriverManager.getConnection(url, properties); + connection.setReadOnly(true); + connection.setAutoCommit(connectionConfigAccessor.isAutoCommitEnabled()); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + return this.connection; + } + + @Override + protected RecordReader createDBRecordReader(DBInputFormat.DBInputSplit split, + Configuration conf) throws IOException { + DBConfiguration dbConf = getDBConf(); + @SuppressWarnings("unchecked") + Class inputClass = (Class) (dbConf.getInputClass()); + try { + // Use Neo4j-specific db reader + return new Neo4jDBRecordReader<>(split, inputClass, conf, getConnection(), dbConf, + dbConf.getInputConditions(), dbConf.getInputFieldNames(), + dbConf.getInputTableName()); + } catch (SQLException ex) { + throw new IOException(ex.getMessage()); + } + } + + @Override + protected String getCountQuery() { + String query = dbConf.getInputQuery(); + StringBuilder sb = new StringBuilder(); + for (String s : query.split(" ")) { + if (s.toUpperCase().equals("RETURN")) { + sb.append(query, 0, query.indexOf(s)).append("RETURN COUNT(*)"); + break; + } + } + return sb.toString(); + } +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSchemaReader.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSchemaReader.java new file mode 100644 index 000000000..72943d48d --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSchemaReader.java @@ -0,0 +1,185 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.source; + +import com.google.common.collect.Lists; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.data.schema.UnsupportedTypeException; +import io.cdap.plugin.db.CommonSchemaReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Neo4j schema reader for mapping Neo4j DB types. + */ +public class Neo4jSchemaReader extends CommonSchemaReader { + + private static final Logger LOG = LoggerFactory.getLogger(Neo4jSchemaReader.class); + + @Override + public List getSchemaFields(ResultSet resultSet) throws SQLException { + List schemaFields = Lists.newArrayList(); + ResultSetMetaData metadata = resultSet.getMetaData(); + resultSet.next(); + // ResultSetMetadata columns are numbered starting with 1 + for (int i = 1; i <= metadata.getColumnCount(); i++) { + if (shouldIgnoreColumn(metadata, i)) { + continue; + } + String columnName = metadata.getColumnName(i).replace("}", "").replace(".", "_") + .replace("(", "_"); + Schema columnSchema = getSchema(resultSet, metadata, i); + if (ResultSetMetaData.columnNullable == metadata.isNullable(i)) { + columnSchema = Schema.nullableOf(columnSchema); + } + Schema.Field field = Schema.Field.of(columnName, columnSchema); + schemaFields.add(field); + } + return schemaFields; + } + + public Schema getSchema(ResultSet rs, ResultSetMetaData metadata, int index) throws SQLException { + int sqlType = metadata.getColumnType(index); + String columnTypeName = metadata.getColumnTypeName(index); + if ("PATH".equals(columnTypeName)) { + throw new IllegalArgumentException("Unsupported type 'PATH'"); + } + + Schema.Type type = Schema.Type.STRING; + switch (sqlType) { + case Types.NULL: + type = Schema.Type.NULL; + break; + + case Types.BOOLEAN: + type = Schema.Type.BOOLEAN; + break; + case Types.INTEGER: + type = Schema.Type.LONG; + break; + case Types.FLOAT: + type = Schema.Type.DOUBLE; + break; + case Types.NUMERIC: + case Types.DECIMAL: + int precision = metadata.getPrecision(index); // total number of digits + int scale = metadata.getScale(index); // digits after the decimal point + return Schema.decimalOf(precision, scale); + case Types.DATE: + return Schema.of(Schema.LogicalType.DATE); + case Types.TIME: + return Schema.of(Schema.LogicalType.TIME_MICROS); + case Types.TIMESTAMP: + case Types.TIMESTAMP_WITH_TIMEZONE: + return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + case Types.ARRAY: + List list = rs.getObject(index, List.class); + Class objClass = list.get(0).getClass(); + return Schema.arrayOf(getSubSchema(objClass)); + case Types.JAVA_OBJECT: + if (rs.getObject(index) instanceof Map) { + Map map = rs.getObject(index, Map.class); + String columnName = metadata.getColumnName(index).replace("}", "") + .replace(" ", "_") + .replace(".", "_") + .replace("(", "_"); + return processMapObject(columnName, map); + } + if (rs.getObject(index) instanceof byte[]) { + type = Schema.Type.BYTES; + } + break; + case Types.BIT: + case Types.TINYINT: + case Types.SMALLINT: + case Types.BIGINT: + case Types.REAL: + case Types.DOUBLE: + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + case Types.BLOB: + case Types.DATALINK: + case Types.DISTINCT: + case Types.OTHER: + case Types.REF: + case Types.SQLXML: + case Types.STRUCT: + throw new SQLException(new UnsupportedTypeException("Unsupported SQL Type: " + sqlType)); + } + + return Schema.of(type); + } + + private Schema processMapObject(String fieldName, Map mapObject) { + List fields = new ArrayList<>(); + mapObject.keySet().forEach(k -> { + Object o = mapObject.get(k); + if (o instanceof List) { + List l = ((List) mapObject.get(k)); + Class subClass = !l.isEmpty() ? l.get(0).getClass() : String.class; + fields.add(Schema.Field.of((String) k, Schema.arrayOf(getSubSchema(subClass)))); + } else if (o instanceof Map) { + fields.add(Schema.Field.of((String) k, processMapObject((String) k, (Map) o))); + } else { + fields.add(Schema.Field.of((String) k, getSubSchema(o.getClass()))); + } + }); + return Schema.recordOf(fieldName, fields); + } + + private Schema getSubSchema(Class objClass) { + Schema schema = Schema.of(Schema.Type.STRING); + if (Long.class.equals(objClass)) { + return Schema.of(Schema.Type.LONG); + } + if (Integer.class.equals(objClass)) { + return Schema.of(Schema.Type.INT); + } + if (Double.class.equals(objClass)) { + return Schema.of(Schema.Type.DOUBLE); + } + if (Boolean.class.equals(objClass)) { + return Schema.of(Schema.Type.BOOLEAN); + } + if (Date.class.equals(objClass)) { + return Schema.of(Schema.LogicalType.DATE); + } + if (Time.class.equals(objClass)) { + return Schema.of(Schema.LogicalType.TIME_MICROS); + } + if (Timestamp.class.equals(objClass)) { + return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + } + if (byte[].class.equals(objClass)) { + return Schema.of(Schema.Type.BYTES); + } + + return schema; + } +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSource.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSource.java new file mode 100644 index 000000000..bed570cc9 --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSource.java @@ -0,0 +1,191 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.source; + +import com.google.common.base.Preconditions; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Input; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.plugin.PluginProperties; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.internal.io.SchemaTypeAdapter; +import io.cdap.plugin.common.ReferenceBatchSource; +import io.cdap.plugin.common.ReferencePluginConfig; +import io.cdap.plugin.common.SourceInputFormatProvider; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.ConnectionConfigAccessor; +import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.neo4j.Neo4jRecord; +import io.cdap.plugin.util.DBUtils; +import io.cdap.plugin.util.DriverCleanup; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +/** + * Batch source to read from Neo4j. + */ +@Plugin(type = BatchSource.PLUGIN_TYPE) +@Name(Neo4jSource.NAME) +@Description("Reads from a Neo4j instance using a configurable CQL query. " + + "Outputs one record for each row returned by the query.") +public class Neo4jSource extends ReferenceBatchSource { + public static final String NAME = "Neo4jSource"; + private static final Logger LOG = LoggerFactory.getLogger(Neo4jSource.class); + + private static final SchemaTypeAdapter SCHEMA_TYPE_ADAPTER = new SchemaTypeAdapter(); + + protected Class driverClass; + + private final Neo4jSourceConfig config; + + public Neo4jSource(Neo4jSourceConfig config) { + super(new ReferencePluginConfig(config.getReferenceName())); + this.config = config; + } + + @Override + public void initialize(BatchRuntimeContext context) throws Exception { + super.initialize(context); + driverClass = context.loadPluginClass(getJDBCPluginId()); + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + config.validate(collector); + + Class driverClass = pipelineConfigurer.usePluginClass( + ConnectionConfig.JDBC_PLUGIN_TYPE, + config.getJdbcPluginName(), + getJDBCPluginId(), PluginProperties.builder().build()); + Preconditions.checkArgument( + driverClass != null, "Unable to load JDBC Driver class for plugin name '%s'. Please make sure " + + "that the plugin '%s' of type '%s' containing the driver has been installed correctly.", + config.getJdbcPluginName(), + config.getJdbcPluginName(), ConnectionConfig.JDBC_PLUGIN_TYPE); + try { + Class.forName(driverClass.getName()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + + pipelineConfigurer.getStageConfigurer().setOutputSchema(getSchema()); + } + + @Override + public void prepareRun(BatchSourceContext context) throws Exception { + FailureCollector collector = context.getFailureCollector(); + config.validate(collector); + collector.getOrThrowException(); + + ConnectionConfigAccessor connectionConfigAccessor = new ConnectionConfigAccessor(); + Class driverClass = context.loadPluginClass(getJDBCPluginId()); + + DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(), driverClass.getName(), + config.getConnectionString(), config.getUsername(), config.getPassword()); + Neo4jDataDriveDBInputFormat.setInput(connectionConfigAccessor.getConfiguration(), getDBRecordType(), + config.getInputQuery(), config.getOrderBy(), + false); + + connectionConfigAccessor.getConfiguration().setInt(MRJobConfig.NUM_MAPS, config.getSplitNum()); + Schema schemaFromDB = loadSchemaFromDB(driverClass); + String schemaStr = SCHEMA_TYPE_ADAPTER.toJson(schemaFromDB); + connectionConfigAccessor.setSchema(schemaStr); + + context.setInput(Input.of(config.getReferenceName(), new SourceInputFormatProvider( + Neo4jDataDriveDBInputFormat.class, connectionConfigAccessor.getConfiguration()))); + } + + @Override + public void transform(KeyValue input, Emitter emitter) { + emitter.emit(input.getValue().getRecord()); + } + + private String getJDBCPluginId() { + return String.format("%s.%s.%s", "source", ConnectionConfig.JDBC_PLUGIN_TYPE, "neo4j"); + } + + public Schema getSchema() { + try (Connection connection = this.getConnection()) { + String query = config.getInputQuery(); + return loadSchemaFromDB(connection, query); + } catch (Exception ex) { + throw new IllegalStateException("Exception while performing getSchema", ex); + } + } + + private Schema loadSchemaFromDB(Connection connection, String query) throws SQLException { + Statement statement = connection.createStatement(); + statement.setMaxRows(1); + ResultSet resultSet = statement.executeQuery(query); + return Schema.recordOf("outputSchema", this.getSchemaReader().getSchemaFields(resultSet)); + } + + private Schema loadSchemaFromDB(Class driverClass) + throws SQLException, IllegalAccessException, InstantiationException { + String connectionString = config.getConnectionString(); + DriverCleanup driverCleanup + = DBUtils.ensureJDBCDriverIsAvailable(driverClass, connectionString, "neo4j"); + + Properties connectionProperties = new Properties(); + try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) { + return loadSchemaFromDB(connection, config.getInputQuery()); + + } catch (SQLException e) { + // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath + throw new SQLException(e.getMessage(), e.getSQLState(), e.getErrorCode()); + } finally { + driverCleanup.destroy(); + } + } + + private Connection getConnection() throws SQLException { + String connectionString = config.getConnectionString(); + return DriverManager.getConnection(connectionString); + } + + protected SchemaReader getSchemaReader() { + return new Neo4jSchemaReader(); + } + + protected Class getDBRecordType() { + return Neo4jRecord.class; + } + +} diff --git a/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSourceConfig.java b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSourceConfig.java new file mode 100644 index 000000000..58fbd1450 --- /dev/null +++ b/neo4j-plugin/src/main/java/io/cdap/plugin/neo4j/source/Neo4jSourceConfig.java @@ -0,0 +1,276 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.source; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.plugin.PluginConfig; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.neo4j.Neo4jConstants; + +import java.util.Arrays; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Config for Neo4j Source plugin + */ +public class Neo4jSourceConfig extends PluginConfig { + + private static final List UNAVAILABLE_QUERY_KEYWORDS = + Arrays.asList("UNWIND", "CREATE", "DELETE", "SET", "REMOVE", "MERGE"); + private static final List REQUIRED_QUERY_KEYWORDS = Arrays.asList("MATCH", "RETURN"); + + public static final String NAME_INPUT_QUERY = "inputQuery"; + public static final String NAME_SPLIT_NUM = "splitNum"; + public static final String NAME_ORDER_BY = "orderBy"; + + @Name(Neo4jConstants.NAME_REFERENCE_NAME) + @Description("This will be used to uniquely identify this source for lineage, annotating metadata, etc.") + private String referenceName; + + @Macro + @Name(Neo4jConstants.NAME_DRIVER_NAME) + @Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " + + "file for the JDBC plugin.") + private String jdbcPluginName; + + @Macro + @Name(Neo4jConstants.NAME_HOST_STRING) + @Description("Neo4j database host.") + private String neo4jHost; + + @Macro + @Name(Neo4jConstants.NAME_PORT_STRING) + @Description("Neo4j database port.") + private Integer neo4jPort; + + @Macro + @Name(Neo4jConstants.NAME_USERNAME) + @Description("User to use to connect to the Neo4j database.") + private String username; + + @Macro + @Name(Neo4jConstants.NAME_PASSWORD) + @Description("Password to use to connect to the Neo4j database.") + private String password; + + @Name(NAME_INPUT_QUERY) + @Description("The query to use to import data from the Neo4j database. " + + "Query example: 'MATCH (n:Label) RETURN n.property_1, n.property_2'.") + private String inputQuery; + + @Macro + @Nullable + @Name(NAME_SPLIT_NUM) + @Description("The number of splits to generate. If set to one, the orderBy is not needed.") + private Integer splitNum; + + @Macro + @Nullable + @Name(NAME_ORDER_BY) + @Description("Field Name which will be used for ordering during splits generation. " + + "This is required unless numSplits is set to one and 'ORDER BY' keyword not exist in Input Query.") + private String orderBy; + + public Neo4jSourceConfig(String referenceName, String jdbcPluginName, String neo4jHost, Integer neo4jPort, + String username, String password, String inputQuery, int splitNum, + @Nullable String orderBy) { + this.referenceName = referenceName; + this.jdbcPluginName = jdbcPluginName; + this.neo4jHost = neo4jHost; + this.neo4jPort = neo4jPort; + this.username = username; + this.password = password; + this.inputQuery = inputQuery; + this.splitNum = splitNum; + this.orderBy = orderBy; + } + + private Neo4jSourceConfig(Builder builder) { + referenceName = builder.referenceName; + jdbcPluginName = builder.jdbcPluginName; + neo4jHost = builder.neo4jHost; + neo4jPort = builder.neo4jPort; + username = builder.username; + password = builder.password; + inputQuery = builder.inputQuery; + splitNum = builder.splitNum; + orderBy = builder.orderBy; + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(Neo4jSourceConfig copy) { + return builder() + .setReferenceName(copy.referenceName) + .setJdbcPluginName(copy.jdbcPluginName) + .setNeo4jHost(copy.neo4jHost) + .setNeo4jPort(copy.neo4jPort) + .setUsername(copy.username) + .setPassword(copy.password) + .setInputQuery(copy.inputQuery) + .setSplitNum(copy.splitNum) + .setOrderBy(copy.orderBy); + } + + public String getReferenceName() { + return referenceName; + } + + public String getJdbcPluginName() { + return jdbcPluginName; + } + + public String getNeo4jHost() { + return neo4jHost; + } + + public int getNeo4jPort() { + return neo4jPort == null ? 0 : neo4jPort; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getInputQuery() { + return inputQuery; + } + + public int getSplitNum() { + return splitNum == null ? 1 : splitNum; + } + + @Nullable + public String getOrderBy() { + return orderBy; + } + + public String getConnectionString() { + return String.format(Neo4jConstants.NEO4J_CONNECTION_STRING_FORMAT, getNeo4jHost(), getNeo4jPort(), + getUsername(), getPassword()); + } + + public void validate(FailureCollector collector) { + if (UNAVAILABLE_QUERY_KEYWORDS.stream().parallel().anyMatch(inputQuery.toUpperCase()::contains)) { + collector.addFailure( + String.format("The input request must not contain any of the following keywords: '%s'", + UNAVAILABLE_QUERY_KEYWORDS.toString()), + "Proved correct Input query.") + .withConfigProperty(NAME_INPUT_QUERY); + } + if (!REQUIRED_QUERY_KEYWORDS.stream().parallel().allMatch(inputQuery.toUpperCase()::contains)) { + collector.addFailure( + String.format("The input request must contain following keywords: '%s'", + REQUIRED_QUERY_KEYWORDS.toString()), + "Proved correct Input query.") + .withConfigProperty(NAME_INPUT_QUERY); + } + if (!containsMacro(NAME_SPLIT_NUM) && getSplitNum() < 1) { + collector.addFailure( + String.format("Invalid value for Splits Number. Must be at least 1, but got: '%d'", getSplitNum()), + null) + .withConfigProperty(NAME_SPLIT_NUM); + } + if (!containsMacro(NAME_SPLIT_NUM) && getSplitNum() > 1) { + boolean existOrderBy = inputQuery.toUpperCase().contains(" ORDER BY "); + if (!containsMacro(NAME_ORDER_BY) && !existOrderBy && orderBy == null) { + collector.addFailure( + "Order by field required if Splits number greater than 1 and ORDER BY not exists in Input query.", + null) + .withConfigProperty(NAME_ORDER_BY); + } + } + } + + /** + * Builder for Neo4jSourceConfig + */ + public static final class Builder { + private String referenceName; + private String jdbcPluginName; + private String neo4jHost; + private Integer neo4jPort; + private String username; + private String password; + private String inputQuery; + @Nullable + private Integer splitNum; + @Nullable + private String orderBy; + + private Builder() { + } + + public Builder setReferenceName(String referenceName) { + this.referenceName = referenceName; + return this; + } + + public Builder setJdbcPluginName(String jdbcPluginName) { + this.jdbcPluginName = jdbcPluginName; + return this; + } + + public Builder setNeo4jHost(String neo4jHost) { + this.neo4jHost = neo4jHost; + return this; + } + + public Builder setNeo4jPort(int neo4jPort) { + this.neo4jPort = neo4jPort; + return this; + } + + public Builder setUsername(String username) { + this.username = username; + return this; + } + + public Builder setPassword(String password) { + this.password = password; + return this; + } + + public Builder setInputQuery(String inputQuery) { + this.inputQuery = inputQuery; + return this; + } + + public Builder setSplitNum(Integer splitNum) { + this.splitNum = splitNum; + return this; + } + + public Builder setOrderBy(String orderBy) { + this.orderBy = orderBy; + return this; + } + + public Neo4jSourceConfig build() { + return new Neo4jSourceConfig(this); + } + } +} diff --git a/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/NeojlPluginTestBase.java b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/NeojlPluginTestBase.java new file mode 100644 index 000000000..d428cc9b2 --- /dev/null +++ b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/NeojlPluginTestBase.java @@ -0,0 +1,184 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.neo4j; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.plugin.PluginClass; +import io.cdap.cdap.datapipeline.DataPipelineApp; +import io.cdap.cdap.proto.id.ArtifactId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.batch.DatabasePluginTestBase; +import io.cdap.plugin.neo4j.sink.Neo4jDataDriveDBOutputFormat; +import io.cdap.plugin.neo4j.sink.Neo4jSink; +import io.cdap.plugin.neo4j.source.Neo4jDataDriveDBInputFormat; +import io.cdap.plugin.neo4j.source.Neo4jSource; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class NeojlPluginTestBase extends DatabasePluginTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(NeojlPluginTestBase.class); + + protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0"); + protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0"); + protected static final long CURRENT_TS = System.currentTimeMillis(); + protected static final String DRIVER_CLASS = "org.neo4j.jdbc.bolt.BoltDriver"; + protected static final String JDBC_DRIVER_NAME = "neo4j"; + protected static final ZoneId UTC_ZONE = ZoneId.ofOffset("UTC", ZoneOffset.UTC); + + protected static String connectionUrl; + protected static boolean tearDown = true; + private static int startCount; + private static final List QUERY = Arrays.asList( + "CREATE (SleeplessInSeattle:CDAP_Movie {title:'Sleepless in Seattle', released:1993, " + + "tagline:'What if someone you never met, someone you never saw, someone you never knew was the only " + + "someone for you?'})", + "CREATE (TheDaVinciCode:CDAP_Movie {title:'The Da Vinci Code', released:2006, tagline:'Break The Codes'})", + + "CREATE (TomH:CDAP_Person {name:'Tom Hanks', born:1956})", + "CREATE (MegR:CDAP_Person {name:'Meg Ryan', born:1961})", + "CREATE (RitaW:CDAP_Person {name:'Rita Wilson', born:1956})", + "CREATE (BillPull:CDAP_Person {name:'Bill Pullman', born:1953})", + "CREATE (VictorG:CDAP_Person {name:'Victor Garber', born:1949})", + "CREATE (RosieO:CDAP_Person {name:\"Rosie O'Donnell\", born:1962})", + "CREATE (NoraE:CDAP_Person {name:'Nora Ephron', born:1941})", + "CREATE (IanM:CDAP_Person {name:'Ian McKellen', born:1939})", + "CREATE (AudreyT:CDAP_Person {name:'Audrey Tautou', born:1976})", + "CREATE (PaulB:CDAP_Person {name:'Paul Bettany', born:1971})", + "CREATE (RonH:CDAP_Person {name:'Ron Howard', born:1954})", + + "MATCH (TomH {name:'Tom Hanks'}), (MegR {name:'Meg Ryan'}), (RitaW {name:'Rita Wilson'})," + + "(BillPull {name:'Bill Pullman'}), (VictorG {name:'Victor Garber'}), (RosieO {name:\"Rosie O'Donnell\"})," + + "(NoraE {name:'Nora Ephron'}), (SleeplessInSeattle {title:'Sleepless in Seattle'}) " + + "CREATE (TomH)-[:ACTED_IN {roles:['Sam Baldwin']}]->(SleeplessInSeattle), " + + "(MegR)-[:ACTED_IN {roles:['Annie Reed']}]->(SleeplessInSeattle), " + + "(RitaW)-[:ACTED_IN {roles:['Suzy']}]->(SleeplessInSeattle), " + + "(BillPull)-[:ACTED_IN {roles:['Walter']}]->(SleeplessInSeattle), " + + "(VictorG)-[:ACTED_IN {roles:['Greg']}]->(SleeplessInSeattle), " + + "(RosieO)-[:ACTED_IN {roles:['Becky']}]->(SleeplessInSeattle), " + + "(NoraE)-[:DIRECTED]->(SleeplessInSeattle)", + + "MATCH (TomH {name:'Tom Hanks'}), (IanM {name:'Ian McKellen'}), (AudreyT {name:'Audrey Tautou'}), " + + "(PaulB {name:'Paul Bettany'}), (RonH {name:'Ron Howard'}), (TheDaVinciCode {title:'The Da Vinci Code'}) " + + "CREATE (TomH)-[:ACTED_IN {roles:['Dr. Robert Langdon']}]->(TheDaVinciCode), " + + "(IanM)-[:ACTED_IN {roles:['Sir Leight Teabing']}]->(TheDaVinciCode), " + + "(AudreyT)-[:ACTED_IN {roles:['Sophie Neveu']}]->(TheDaVinciCode), " + + "(PaulB)-[:ACTED_IN {roles:['Silas']}]->(TheDaVinciCode), " + + "(RonH)-[:DIRECTED]->(TheDaVinciCode)", + + "CREATE (t:CDAP_Test {string_val: \"string\", long_val: 123, double_val: 20.32, boolean_val: true, " + + "date_val: date('2015-07-21'), time_val: time('125035.556+0100'), local_time_val: localtime('12:50:35.556'), " + + "date_time_val: datetime('2015-06-24T12:50:35.556+0100'), " + + "local_date_time_val: localdatetime('2015185T19:32:24'), array_int_val: [1,2,3,4], " + + "array_string_val: ['a','b','c'], duration: duration(\"P14DT16H12M\"), " + + "point_2d: point({ x:3, y:0 }), point_3d: point({ x:0, y:4, z:1 }), " + + "geo_2d: point({ latitude: 12, longitude: 56 }), geo_3d: point({ latitude: 12, longitude: 56, height: 1000 })})"); + + + private static final List DROP_QUERY_LIST = Arrays.asList( + "MATCH (p:CDAP_Person), (m:CDAP_Movie), (t:CDAP_Test) DETACH DELETE p, m, t", + "MATCH (ts:CDAP_TEST_1) DETACH DELETE ts", + "MATCH (ts1: CDAP_T_1), (ts2:CDAP_T_2) DETACH DELETE ts1, ts2" + ); + + protected static final Map BASE_PROPS = ImmutableMap.builder() + .put(Neo4jConstants.NAME_HOST_STRING, System.getProperty("neo4j.host", "localhost")) + .put(Neo4jConstants.NAME_PORT_STRING, System.getProperty("neo4j.port", "7687")) + .put(Neo4jConstants.NAME_USERNAME, System.getProperty("neo4j.username", "neo4j")) + .put(Neo4jConstants.NAME_PASSWORD, System.getProperty("neo4j.password", "test")) + .build(); + + @BeforeClass + public static void setupTest() throws Exception { + if (startCount++ > 0) { + return; + } + + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + + addPluginArtifact(NamespaceId.DEFAULT.artifact(JDBC_DRIVER_NAME, "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, + Neo4jSource.class, Neo4jSink.class, Neo4jDataDriveDBOutputFormat.class, + Neo4jDataDriveDBInputFormat.class, Neo4jRecord.class); + + Class driverClass = Class.forName(DRIVER_CLASS); + + // add neo4j 3rd party plugin + PluginClass neo4jDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME, + "neoj driver class", + driverClass.getName(), + null, Collections.emptyMap()); + addPluginArtifact(NamespaceId.DEFAULT.artifact("neo4j-jdbc-connector", "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, + Sets.newHashSet(neo4jDriver), driverClass); + + + connectionUrl = "jdbc:neo4j:bolt://" + BASE_PROPS.get(Neo4jConstants.NAME_HOST_STRING) + ":" + + BASE_PROPS.get(Neo4jConstants.NAME_PORT_STRING) + "/?username=" + + BASE_PROPS.get(Neo4jConstants.NAME_USERNAME) + ",password=" + BASE_PROPS.get(Neo4jConstants.NAME_PASSWORD); + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement()) { + createTestData(stmt); + } + } + + private static void createTestData(Statement stmt) throws SQLException { + for (String query : QUERY) { + LOG.debug("run -> " + query); + stmt.execute(query); + } + } + + public static Connection createConnection() { + try { + Class.forName(DRIVER_CLASS); + return DriverManager.getConnection(connectionUrl); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @AfterClass + public static void tearDownDB() throws SQLException { + if (!tearDown) { + return; + } + + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement()) { + for (String query : DROP_QUERY_LIST) { + stmt.execute(query); + } + } + } +} diff --git a/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/ValidationAssertions.java b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/ValidationAssertions.java new file mode 100644 index 000000000..bd256586d --- /dev/null +++ b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/ValidationAssertions.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j; + +import io.cdap.cdap.etl.api.validation.CauseAttributes; +import io.cdap.cdap.etl.api.validation.ValidationFailure; +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import org.junit.Assert; + +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ValidationAssertions { + public static void assertValidationFailed(MockFailureCollector failureCollector, List> paramNames) { + List failureList = failureCollector.getValidationFailures(); + Assert.assertEquals(paramNames.size(), failureList.size()); + Iterator> paramNameIterator = paramNames.iterator(); + failureList.stream().map(failure -> failure.getCauses() + .stream() + .filter(cause -> cause.getAttribute(CauseAttributes.STAGE_CONFIG) != null) + .collect(Collectors.toList())) + .filter(causeList -> paramNameIterator.hasNext()) + .forEach(causeList -> { + List parameters = paramNameIterator.next(); + Assert.assertEquals(parameters.size(), causeList.size()); + IntStream.range(0, parameters.size()).forEach(i -> { + ValidationFailure.Cause cause = causeList.get(i); + Assert.assertEquals(parameters.get(i), cause.getAttribute(CauseAttributes.STAGE_CONFIG)); + }); + }); + } +} diff --git a/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/sink/Neo4jSinkConfigTest.java b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/sink/Neo4jSinkConfigTest.java new file mode 100644 index 000000000..76c8a7599 --- /dev/null +++ b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/sink/Neo4jSinkConfigTest.java @@ -0,0 +1,144 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.sink; + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.neo4j.ValidationAssertions; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +/** + * This is a test suite that cover Neo4j Source config validation. + */ +public class Neo4jSinkConfigTest { + private static final String MOCK_STAGE = "mockStage"; + private static final Schema SCHEMA = + Schema.recordOf("record", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("age", Schema.of(Schema.Type.INT)), + Schema.Field.of("dob", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("profession", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("company", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("rating", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("position", Schema.of(Schema.LogicalType.DATE)) + ); + private static final Neo4jSinkConfig VALID_CONFIG = new Neo4jSinkConfig( + "ref_name", + "neo4j", + "localhost", + 7687, + "user", + "password", + "CREATE (n:Test $(*))" + ); + + @Test + public void testCheckValidConfig() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + VALID_CONFIG.validate(collector, SCHEMA); + + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + + @Test + public void testValidateQueryWithoutCreateKeywords() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSinkConfig config = Neo4jSinkConfig.builder(VALID_CONFIG) + .setOutputQuery("MATCH (n:Test $(*))") + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSinkConfig.NAME_OUTPUT_QUERY) + ); + + config.validate(collector, SCHEMA); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateQueryWithoutPropertyBlock() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSinkConfig config = Neo4jSinkConfig.builder(VALID_CONFIG) + .setOutputQuery("CREATE (n:Test)") + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSinkConfig.NAME_OUTPUT_QUERY) + ); + + config.validate(collector, SCHEMA); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateQueryWithEmptyPropertyBlock() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSinkConfig config = Neo4jSinkConfig.builder(VALID_CONFIG) + .setOutputQuery("CREATE (n:Test $())") + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSinkConfig.NAME_OUTPUT_QUERY) + ); + + config.validate(collector, SCHEMA); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateQueryWithWithInvalidSingleProperty() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSinkConfig config = Neo4jSinkConfig.builder(VALID_CONFIG) + .setOutputQuery("CREATE (n:Test $(test))") + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSinkConfig.NAME_OUTPUT_QUERY) + ); + + config.validate(collector, SCHEMA); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateQueryWithWithInvalidProperty() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSinkConfig config = Neo4jSinkConfig.builder(VALID_CONFIG) + .setOutputQuery("CREATE (n:Test $(id, test))") + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSinkConfig.NAME_OUTPUT_QUERY) + ); + + config.validate(collector, SCHEMA); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateComplexQuery() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSinkConfig config = Neo4jSinkConfig.builder(VALID_CONFIG) + .setOutputQuery( + "CREATE (n:Person $(name, dob, profession)-[r:Work_In $(position)->(c:Company $(company, rating)))") + .build(); + + config.validate(collector, SCHEMA); + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + +} diff --git a/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/sink/Neo4jSinkTestRun.java b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/sink/Neo4jSinkTestRun.java new file mode 100644 index 000000000..0d02fa874 --- /dev/null +++ b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/sink/Neo4jSinkTestRun.java @@ -0,0 +1,335 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.sink; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.plugin.db.CustomAssertions; +import io.cdap.plugin.neo4j.Neo4jConstants; +import io.cdap.plugin.neo4j.NeojlPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class Neo4jSinkTestRun extends NeojlPluginTestBase { + private static final Schema DURATION_SCHEMA = Schema.recordOf( + "duration", + Schema.Field.of("duration", Schema.of(Schema.Type.STRING)), + Schema.Field.of("months", Schema.of(Schema.Type.LONG)), + Schema.Field.of("days", Schema.of(Schema.Type.LONG)), + Schema.Field.of("seconds", Schema.of(Schema.Type.LONG)), + Schema.Field.of("nanoseconds", Schema.of(Schema.Type.INT)) + ); + + private static final Schema POINT_2D_SCHEMA = Schema.recordOf( + "point2d", + Schema.Field.of("crs", Schema.of(Schema.Type.STRING)), + Schema.Field.of("x", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("y", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("srid", Schema.of(Schema.Type.INT)) + ); + + private static final Schema POINT_3D_SCHEMA = Schema.recordOf( + "point3d", + Schema.Field.of("crs", Schema.of(Schema.Type.STRING)), + Schema.Field.of("x", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("y", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("z", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("srid", Schema.of(Schema.Type.INT)) + ); + + private static final Schema GEO_2D_SCHEMA = Schema.recordOf( + "geo2d", + Schema.Field.of("crs", Schema.of(Schema.Type.STRING)), + Schema.Field.of("latitude", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("x", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("y", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("srid", Schema.of(Schema.Type.INT)), + Schema.Field.of("longitude", Schema.of(Schema.Type.DOUBLE)) + ); + + private static final Schema GEO_3D_SCHEMA = Schema.recordOf( + "geo3d", + Schema.Field.of("crs", Schema.of(Schema.Type.STRING)), + Schema.Field.of("latitude", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("x", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("y", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("z", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("srid", Schema.of(Schema.Type.INT)), + Schema.Field.of("longitude", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("height", Schema.of(Schema.Type.DOUBLE)) + ); + + private static final Schema SCHEMA = Schema.recordOf( + "dbRecord", + Schema.Field.of("string_val", Schema.of(Schema.Type.STRING)), + Schema.Field.of("double_val", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("long_val", Schema.of(Schema.Type.LONG)), + Schema.Field.of("boolean_val", Schema.of(Schema.Type.BOOLEAN)), + Schema.Field.of("array_int", Schema.arrayOf(Schema.of(Schema.Type.INT))), + Schema.Field.of("array_string", Schema.arrayOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("date_val", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("time_val", Schema.of(Schema.LogicalType.TIME_MICROS)), + Schema.Field.of("timestamp_val", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)), + Schema.Field.of("duration_val", DURATION_SCHEMA), + Schema.Field.of("point_2d_val", POINT_2D_SCHEMA), + Schema.Field.of("point_3d_val", POINT_3D_SCHEMA), + Schema.Field.of("geo_2d_val", GEO_2D_SCHEMA), + Schema.Field.of("geo_3d_val", GEO_3D_SCHEMA) + ); + + @Test + public void testStoreAllFieldsAsRecord() throws Exception { + ETLPlugin sourceConfig = MockSource.getPlugin("Neo4jSource", SCHEMA); + + ETLPlugin sinkConfig = new ETLPlugin( + "Neo4jSink", + BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(Neo4jConstants.NAME_REFERENCE_NAME, "neo4jSinkStoreAll") + .put(Neo4jSinkConfig.NAME_OUTPUT_QUERY, "CREATE (n:CDAP_TEST_1 $(*))") + .build(), + null); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, + "testStoreAllProperties"); + + // Prepare test input data + List inputRecords = createInputData(); + DataSetManager inputManager = getDataset("Neo4jSource"); + MockSource.writeInput(inputManager, inputRecords); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement(); + ResultSet actual = stmt.executeQuery("MATCH (n:CDAP_TEST_1) RETURN n")) { + + for (StructuredRecord expected : inputRecords) { + Assert.assertTrue(actual.next()); + Map node = (Map) actual.getObject("n"); + + CustomAssertions.assertObjectEquals(expected.get("int_val"), node.get("int_val")); + CustomAssertions.assertObjectEquals(expected.get("string_val"), node.get("string_val")); + CustomAssertions.assertObjectEquals(expected.get("double_val"), node.get("double_val")); + CustomAssertions.assertObjectEquals(expected.get("long_val"), node.get("long_val")); + CustomAssertions.assertObjectEquals(expected.get("boolean_val"), node.get("boolean_val")); + int[] intArray = expected.get("array_int"); + List intList = (List) node.get("array_int"); + Assert.assertEquals(intArray.length, intList.size()); + for (int i = 0; i < intArray.length; i++) { + Assert.assertEquals((long) intArray[i], intList.get(i)); + } + String[] stringArray = expected.get("array_string"); + List stringList = (List) node.get("array_string"); + Assert.assertEquals(stringArray.length, stringList.size()); + for (int i = 0; i < stringArray.length; i++) { + Assert.assertEquals(stringArray[i], stringList.get(i)); + } + Assert.assertEquals(expected.getDate("date_val"), ((Date) node.get("date_val")).toLocalDate()); + Assert.assertEquals(expected.getTime("time_val").toSecondOfDay(), + ((Time) node.get("time_val")).toLocalTime().toSecondOfDay()); + Assert.assertEquals(expected.getTimestamp("timestamp_val"), + ((Timestamp) node.get("timestamp_val")).toInstant().atZone(UTC_ZONE)); + + StructuredRecord record = expected.get("duration_val"); + Map properties = (Map) node.get("duration_val"); + Assert.assertEquals(record.get("duration"), properties.get("duration")); + Assert.assertEquals(record.get("months"), properties.get("months")); + Assert.assertEquals(record.get("days"), properties.get("days")); + Assert.assertEquals(record.get("seconds"), properties.get("seconds")); + Assert.assertEquals(record.get("nanoseconds"), properties.get("nanoseconds")); + + record = expected.get("point_2d_val"); + properties = (Map) node.get("point_2d_val"); + Assert.assertEquals(record.get("crs"), properties.get("crs")); + Assert.assertEquals(record.get("x"), properties.get("x")); + Assert.assertEquals(record.get("y"), properties.get("y")); + Assert.assertEquals(record.get("srid"), properties.get("srid")); + + record = expected.get("point_3d_val"); + properties = (Map) node.get("point_3d_val"); + Assert.assertEquals(record.get("crs"), properties.get("crs")); + Assert.assertEquals(record.get("x"), properties.get("x")); + Assert.assertEquals(record.get("y"), properties.get("y")); + Assert.assertEquals(record.get("z"), properties.get("z")); + Assert.assertEquals(record.get("srid"), properties.get("srid")); + + record = expected.get("geo_2d_val"); + properties = (Map) node.get("geo_2d_val"); + Assert.assertEquals(record.get("crs"), properties.get("crs")); + Assert.assertEquals(record.get("latitude"), properties.get("latitude")); + Assert.assertEquals(record.get("x"), properties.get("x")); + Assert.assertEquals(record.get("y"), properties.get("y")); + Assert.assertEquals(record.get("srid"), properties.get("srid")); + Assert.assertEquals(record.get("longitude"), properties.get("longitude")); + + record = expected.get("geo_3d_val"); + properties = (Map) node.get("geo_3d_val"); + Assert.assertEquals(record.get("crs"), properties.get("crs")); + Assert.assertEquals(record.get("latitude"), properties.get("latitude")); + Assert.assertEquals(record.get("x"), properties.get("x")); + Assert.assertEquals(record.get("y"), properties.get("y")); + Assert.assertEquals(record.get("z"), properties.get("z")); + Assert.assertEquals(record.get("srid"), properties.get("srid")); + Assert.assertEquals(record.get("longitude"), properties.get("longitude")); + } + } + } + + @Test + public void testStorePropAsNodeWithRelation() throws Exception { + ETLPlugin sourceConfig = MockSource.getPlugin("Neo4jSource", SCHEMA); + + ETLPlugin sinkConfig = new ETLPlugin( + "Neo4jSink", + BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(Neo4jConstants.NAME_REFERENCE_NAME, "neo4jSinkStoreAll") + .put(Neo4jSinkConfig.NAME_OUTPUT_QUERY, "CREATE (n:CDAP_T_1 $(string_val, date_val))-" + + "[r:Test_Rel $(long_val, boolean_val)]->(m:CDAP_T_2 $(time_val, duration_val))") + .build(), + null); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, + "testStorePropertiesAAsNodeWithRelation"); + + // Prepare test input data + List inputRecords = createInputData(); + DataSetManager
inputManager = getDataset("Neo4jSource"); + MockSource.writeInput(inputManager, inputRecords); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + String searchQuery = "MATCH (n:CDAP_T_1)-[r]->(m) RETURN n.string_val AS string_val, n.date_val AS date_val, " + + "r.long_val AS long_val, r.boolean_val as boolean_val, m.time_val AS time_val, m.duration_val AS duration_val"; + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement(); + ResultSet actual = stmt.executeQuery(searchQuery)) { + for (StructuredRecord expected : inputRecords) { + Assert.assertTrue(actual.next()); + + CustomAssertions.assertObjectEquals(expected.get("string_val"), actual.getString("string_val")); + CustomAssertions.assertObjectEquals(expected.getDate("date_val"), + actual.getDate("date_val").toLocalDate()); + CustomAssertions.assertObjectEquals(expected.get("long_val"), actual.getLong("long_val")); + CustomAssertions.assertObjectEquals(expected.get("boolean_val"), actual.getBoolean("boolean_val")); + CustomAssertions.assertObjectEquals(expected.getTime("time_val").toSecondOfDay(), + actual.getTime("time_val").toLocalTime().toSecondOfDay()); + + StructuredRecord record = expected.get("duration_val"); + Map properties = (Map) actual.getObject("duration_val"); + Assert.assertEquals(record.get("duration"), properties.get("duration")); + Assert.assertEquals(record.get("months"), properties.get("months")); + Assert.assertEquals(record.get("days"), properties.get("days")); + Assert.assertEquals(record.get("seconds"), properties.get("seconds")); + Assert.assertEquals(record.get("nanoseconds"), properties.get("nanoseconds")); + } + } + } + + private List createInputData() throws Exception { + List inputRecords = new ArrayList<>(); + LocalDateTime localDateTime = new Timestamp(CURRENT_TS).toLocalDateTime(); + String name = "user"; + StructuredRecord.Builder builder = StructuredRecord.builder(SCHEMA) + .set("string_val", name) + .set("double_val", 3.451) + .set("long_val", 125L) + .set("boolean_val", true) + .set("array_int", new int[]{1, 2, 3}) + .set("array_string", new String[]{"a", "b", "c"}) + .setDate("date_val", localDateTime.toLocalDate()) + .setTime("time_val", localDateTime.toLocalTime()) + .setTimestamp("timestamp_val", localDateTime.atZone(UTC_ZONE)) + .set("duration_val", StructuredRecord.builder(DURATION_SCHEMA) + .set("duration", "P0M14DT58320S") + .set("months", 0L) + .set("days", 14L) + .set("seconds", 58320L) + .set("nanoseconds", 0) + .build() + ) + .set("point_2d_val", StructuredRecord.builder(POINT_2D_SCHEMA) + .set("crs", "cartesian") + .set("x", 3.0) + .set("y", 0.0) + .set("srid", 7203) + .build() + ) + .set("point_3d_val", StructuredRecord.builder(POINT_3D_SCHEMA) + .set("crs", "cartesian-3d") + .set("x", 0.0) + .set("y", 4.0) + .set("z", 1.0) + .set("srid", 9157) + .build() + ) + .set("geo_2d_val", StructuredRecord.builder(GEO_2D_SCHEMA) + .set("crs", "wgs-84") + .set("latitude", 12.0) + .set("x", 56.0) + .set("y", 12.0) + .set("srid", 4326) + .set("longitude", 56.0) + .build() + ) + .set("geo_3d_val", StructuredRecord.builder(GEO_3D_SCHEMA) + .set("crs", "wgs-84-3d") + .set("latitude", 12.0) + .set("x", 56.0) + .set("y", 12.0) + .set("z", 1000.0) + .set("srid", 4979) + .set("longitude", 56.0) + .set("height", 1000.0) + .build() + ); + + inputRecords.add(builder.build()); + + return inputRecords; + } + + private ETLPlugin getSinkConfig() { + return new ETLPlugin( + "Neo4jSink", + BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(Neo4jSinkConfig.NAME_OUTPUT_QUERY, "") + .build(), + null); + } +} diff --git a/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/source/Neo4jSourceConfigTest.java b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/source/Neo4jSourceConfigTest.java new file mode 100644 index 000000000..b74452b54 --- /dev/null +++ b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/source/Neo4jSourceConfigTest.java @@ -0,0 +1,134 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.source; + +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.neo4j.ValidationAssertions; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * This is a test suite that cover Neo4j Source config validation. + */ +public class Neo4jSourceConfigTest { + private static final String MOCK_STAGE = "mockStage"; + private static final Neo4jSourceConfig VALID_CONFIG = new Neo4jSourceConfig( + "ref_name", + "neo4j", + "localhost", + 7687, + "user", + "password", + "MATCH (n:Test) RETURN n", + 1, + null + ); + + @Test + public void testCheckValidConfig() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + VALID_CONFIG.validate(collector); + + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + + @Test + public void testValidateQueryWithUnavailableKeywords() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSourceConfig config = Neo4jSourceConfig.builder(VALID_CONFIG) + .setInputQuery("MERGE (robert:Critic) RETURN robert, labels(robert)") + .build(); + List> paramNames = Arrays.asList( + Collections.singletonList(Neo4jSourceConfig.NAME_INPUT_QUERY), + Collections.singletonList(Neo4jSourceConfig.NAME_INPUT_QUERY) + ); + + config.validate(collector); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateQueryWithoutRequiredKeywords() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSourceConfig config = Neo4jSourceConfig.builder(VALID_CONFIG) + .setInputQuery("MATCH (robert:Critic)") + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSourceConfig.NAME_INPUT_QUERY) + ); + + config.validate(collector); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateInvalidSplitNumber() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSourceConfig config = Neo4jSourceConfig.builder(VALID_CONFIG) + .setSplitNum(-2) + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSourceConfig.NAME_SPLIT_NUM) + ); + + config.validate(collector); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateSplitNumberWithoutOrderBy() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSourceConfig config = Neo4jSourceConfig.builder(VALID_CONFIG) + .setSplitNum(10) + .build(); + List> paramNames = Collections.singletonList( + Collections.singletonList(Neo4jSourceConfig.NAME_ORDER_BY) + ); + + config.validate(collector); + ValidationAssertions.assertValidationFailed(collector, paramNames); + } + + @Test + public void testValidateSplitNumberWithOrderBy() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSourceConfig config = Neo4jSourceConfig.builder(VALID_CONFIG) + .setSplitNum(10) + .setOrderBy("n.id") + .build(); + + config.validate(collector); + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + + @Test + public void testValidateSplitNumberWithOrderByInQuery() { + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Neo4jSourceConfig config = Neo4jSourceConfig.builder(VALID_CONFIG) + .setSplitNum(10) + .setInputQuery("MATCH (n:Test) RETURN n ORDER BY n.id") + .build(); + + config.validate(collector); + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + } + +} diff --git a/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/source/Neo4jSourceTestRun.java b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/source/Neo4jSourceTestRun.java new file mode 100644 index 000000000..ef6726d4d --- /dev/null +++ b/neo4j-plugin/src/test/java/io/cdap/plugin/neo4j/source/Neo4jSourceTestRun.java @@ -0,0 +1,320 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.neo4j.source; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.plugin.neo4j.Neo4jConstants; +import io.cdap.plugin.neo4j.NeojlPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class Neo4jSourceTestRun extends NeojlPluginTestBase { + + @Test + public void testGetNodeAndRelation() throws Exception { + String importQuery = + "MATCH (person:CDAP_Person {name: \"Nora Ephron\"})-[rel]-(movie {title: \"Sleepless in Seattle\"}) " + + "RETURN person, rel, movie"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .put(Neo4jConstants.NAME_REFERENCE_NAME, "neo4jSource") + .putAll(BASE_PROPS) + .put(Neo4jSourceConfig.NAME_INPUT_QUERY, importQuery) + .put(Neo4jSourceConfig.NAME_SPLIT_NUM, "1") + .put(Neo4jSourceConfig.NAME_ORDER_BY, "") + .build(); + + ETLPlugin sourceConfig = new ETLPlugin( + "Neo4jSource", + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("nodeOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testNodeAndRelation"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager
outputManager = getDataset("nodeOutputTable"); + List records = MockSink.readOutput(outputManager); + Assert.assertEquals(1, records.size()); + + StructuredRecord record = records.get(0); + StructuredRecord person = record.get("person"); + StructuredRecord relation = record.get("rel"); + StructuredRecord movie = record.get("movie"); + + Assert.assertEquals("Nora Ephron", person.get("name")); + Assert.assertEquals(1941, (long) person.get("born")); + Assert.assertEquals(1, ((ArrayList) person.get("_labels")).size()); + Assert.assertEquals("CDAP_Person", ((ArrayList) person.get("_labels")).get(0)); + + Assert.assertEquals("DIRECTED", relation.get("_type")); + + Assert.assertEquals("Sleepless in Seattle", movie.get("title")); + Assert.assertEquals(1993, (long) movie.get("released")); + Assert.assertEquals(1, ((ArrayList) movie.get("_labels")).size()); + Assert.assertEquals("CDAP_Movie", ((ArrayList) movie.get("_labels")).get(0)); + Assert.assertEquals("What if someone you never met, someone you never saw, someone you never knew was " + + "the only someone for you?", movie.get("tagline")); + } + + @Test + public void testGetProperties() throws Exception { + String importQuery = + "MATCH (person)-[rel:ACTED_IN]->(movie {title: \"The Da Vinci Code\"}) " + + "RETURN person.name AS name, person.born AS born ORDER BY born"; + + List names = Arrays.asList("Ian McKellen", "Tom Hanks", "Paul Bettany", "Audrey Tautou"); + List borns = Arrays.asList(1939, 1956, 1971, 1976); + + ImmutableMap sourceProps = ImmutableMap.builder() + .put(Neo4jConstants.NAME_REFERENCE_NAME, "neo4jSource") + .putAll(BASE_PROPS) + .put(Neo4jSourceConfig.NAME_INPUT_QUERY, importQuery) + .put(Neo4jSourceConfig.NAME_SPLIT_NUM, "1") + .put(Neo4jSourceConfig.NAME_ORDER_BY, "") + .build(); + + ETLPlugin sourceConfig = new ETLPlugin( + "Neo4jSource", + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("propertiesOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testProperties"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager
outputManager = getDataset("propertiesOutputTable"); + List records = MockSink.readOutput(outputManager); + Assert.assertEquals(4, records.size()); + + for (int i = 0; i < records.size(); i++) { + Assert.assertEquals(names.get(i), records.get(i).get("name")); + Assert.assertEquals((long) borns.get(i), (long) records.get(i).get("born")); + } + } + + @Test + public void testSplitAndOrderBy() throws Exception { + String importQuery = + "MATCH (person:CDAP_Person) RETURN person.name AS name"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .put(Neo4jConstants.NAME_REFERENCE_NAME, "neo4jSource") + .putAll(BASE_PROPS) + .put(Neo4jSourceConfig.NAME_INPUT_QUERY, importQuery) + .put(Neo4jSourceConfig.NAME_SPLIT_NUM, "3") + .put(Neo4jSourceConfig.NAME_ORDER_BY, "person.born") + .build(); + + ETLPlugin sourceConfig = new ETLPlugin( + "Neo4jSource", + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("splitAndOrderOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testSplitAndOrder"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager
outputManager = getDataset("splitAndOrderOutputTable"); + List records = MockSink.readOutput(outputManager); + Assert.assertEquals(11, records.size()); + + } + + @Test + public void testSplitAndOrderByInQuery() throws Exception { + String importQuery = + "MATCH (person:CDAP_Person) RETURN person.name AS name ORDER BY name"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .put(Neo4jConstants.NAME_REFERENCE_NAME, "neo4jSource") + .putAll(BASE_PROPS) + .put(Neo4jSourceConfig.NAME_INPUT_QUERY, importQuery) + .put(Neo4jSourceConfig.NAME_SPLIT_NUM, "3") + .put(Neo4jSourceConfig.NAME_ORDER_BY, "person.born") + .build(); + + ETLPlugin sourceConfig = new ETLPlugin( + "Neo4jSource", + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("splitAndQueryOrderOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testSplitAndQueryOrder"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager
outputManager = getDataset("splitAndQueryOrderOutputTable"); + List records = MockSink.readOutput(outputManager); + Assert.assertEquals(11, records.size()); + + } + + @Test + public void testGetAllPropertiesType() throws Exception { + String importQuery = + "MATCH (t:CDAP_Test) RETURN t.string_val, t.long_val, t.double_val, t.boolean_val, t.date_val, t.time_val, " + + "t.local_time_val, t.date_time_val, t.local_date_time_val, t.array_int_val, t.array_string_val, " + + "t.duration, t.point_2d, t.point_3d, t.geo_2d, t.geo_3d"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .put(Neo4jConstants.NAME_REFERENCE_NAME, "neo4jSource") + .putAll(BASE_PROPS) + .put(Neo4jSourceConfig.NAME_INPUT_QUERY, importQuery) + .put(Neo4jSourceConfig.NAME_SPLIT_NUM, "1") + .put(Neo4jSourceConfig.NAME_ORDER_BY, "") + .build(); + + ETLPlugin sourceConfig = new ETLPlugin( + "Neo4jSource", + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("allPropertiesTypeOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testAllProperties"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager
outputManager = getDataset("allPropertiesTypeOutputTable"); + List records = MockSink.readOutput(outputManager); + Assert.assertEquals(1, records.size()); + + StructuredRecord structuredRecord = records.get(0); + for (Schema.Field field : structuredRecord.getSchema().getFields()) { + switch (field.getName()) { + case "t_string_val": + Assert.assertEquals("string", structuredRecord.get(field.getName())); + break; + case "t_long_val": + Assert.assertEquals(123, (long) structuredRecord.get(field.getName())); + break; + case "t_double_val": + Assert.assertEquals(20.32, structuredRecord.get(field.getName()), 0); + break; + case "t_boolean_val": + Assert.assertEquals(true, structuredRecord.get(field.getName())); + break; + case "t_date_val": + Assert.assertEquals(LocalDate.parse("2015-07-21"), structuredRecord.getDate(field.getName())); + break; + case "t_time_val": + OffsetTime o = OffsetTime.parse("12:50:35.556000000+01:00"); + Assert.assertEquals( + LocalTime.ofNanoOfDay(o.toLocalTime().toNanoOfDay() + + TimeUnit.SECONDS.toNanos(o.getOffset().getTotalSeconds())).withNano(0), + structuredRecord.getTime(field.getName())); + break; + case "t_local_time_val": + Assert.assertEquals(LocalTime.parse("12:50:35.556000000").withNano(0), + structuredRecord.getTime(field.getName())); + break; + case "t_date_time_val": + ZonedDateTime dateTime = OffsetDateTime.parse("2015-06-24T12:50:35.556000000+01:00", + DateTimeFormatter.ISO_OFFSET_DATE_TIME) + .atZoneSameInstant(ZoneOffset.UTC); + Assert.assertEquals(dateTime.toLocalDateTime().atZone(ZoneId.of("UTC")), + structuredRecord.getTimestamp(field.getName())); + break; + case "t_local_date_time_val": + Assert.assertEquals( + Instant.from(ZonedDateTime.of( + LocalDateTime.parse("2015-07-04T19:32:24"), ZoneId.systemDefault())) + .atZone(ZoneId.ofOffset("UTC", ZoneOffset.UTC)), + structuredRecord.getTimestamp(field.getName())); + break; + case "t_array_int_val": + Assert.assertArrayEquals(Arrays.asList(1L, 2L, 3L, 4L).toArray(), + ((ArrayList) structuredRecord.get(field.getName())).toArray()); + break; + case "t_array_string_val": + Assert.assertArrayEquals(Arrays.asList("a", "b", "c").toArray(), + ((ArrayList) structuredRecord.get(field.getName())).toArray()); + break; + case "t_duration": + StructuredRecord duration = structuredRecord.get(field.getName()); + Assert.assertEquals("P0M14DT58320S", duration.get("duration")); + Assert.assertEquals(0, (long) duration.get("months")); + Assert.assertEquals(14, (long) duration.get("days")); + Assert.assertEquals(58320, (long) duration.get("seconds")); + Assert.assertEquals(0, (int) duration.get("nanoseconds")); + break; + case "t_point_2d": + StructuredRecord point2d = structuredRecord.get(field.getName()); + Assert.assertEquals(7203, (int) point2d.get("srid")); + Assert.assertEquals(3, (double) point2d.get("x"), 0); + Assert.assertEquals(0, (double) point2d.get("y"), 0); + break; + case "t_point_3d": + StructuredRecord point3d = structuredRecord.get(field.getName()); + Assert.assertEquals(9157, (int) point3d.get("srid")); + Assert.assertEquals(0, (double) point3d.get("x"), 0); + Assert.assertEquals(4, (double) point3d.get("y"), 0); + Assert.assertEquals(1, (double) point3d.get("z"), 0); + break; + case "t_geo_2d": + StructuredRecord geo2d = structuredRecord.get(field.getName()); + Assert.assertEquals(4326, (int) geo2d.get("srid")); + Assert.assertEquals(56, (double) geo2d.get("x"), 0); + Assert.assertEquals(12, (double) geo2d.get("y"), 0); + break; + case "t_geo_3d": + StructuredRecord geo3d = structuredRecord.get(field.getName()); + Assert.assertEquals(4979, (int) geo3d.get("srid")); + Assert.assertEquals(56, (double) geo3d.get("x"), 0); + Assert.assertEquals(12, (double) geo3d.get("y"), 0); + Assert.assertEquals(1000, (double) geo3d.get("z"), 0); + break; + } + } + } +} diff --git a/neo4j-plugin/widgets/Neo4jSink-batchsink.json b/neo4j-plugin/widgets/Neo4jSink-batchsink.json new file mode 100644 index 000000000..e9a9cd154 --- /dev/null +++ b/neo4j-plugin/widgets/Neo4jSink-batchsink.json @@ -0,0 +1,76 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name" : "Neo4j Sink", + "configuration-groups": [ + { + "label": "General", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this source for lineage." + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "neo4j" + } + }, + { + "widget-type": "textbox", + "label": "Neo4j Host", + "name": "neo4jHost", + "widget-attributes": { + "placeholder": "Neo4j host." + } + }, + { + "widget-type": "textbox", + "label": "Neo4j Port", + "name": "neo4jPort", + "widget-attributes": { + "default": 7687, + "placeholder": "Neo4j port." + } + }, + { + "widget-type": "textbox", + "label": "Output Query", + "name": "outputQuery", + "widget-attributes": { + "placeholder": "Query for storing data in to Neo4j." + } + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "username", + "widget-attributes" : { + "placeholder": "Username for connect to Neo4j." + } + }, + { + "widget-type": "password", + "label": "Password", + "name": "password", + "widget-attributes" : { + "placeholder": "Password for connect to Neo4j." + } + } + ] + } + ], + "outputs": [] +} \ No newline at end of file diff --git a/neo4j-plugin/widgets/Neo4jSource-batchsource.json b/neo4j-plugin/widgets/Neo4jSource-batchsource.json new file mode 100644 index 000000000..afd2d56e0 --- /dev/null +++ b/neo4j-plugin/widgets/Neo4jSource-batchsource.json @@ -0,0 +1,114 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Neo4j Source", + "configuration-groups": [ + { + "label": "General", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this source for lineage." + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "neo4j" + } + }, + { + "widget-type": "textbox", + "label": "Neo4j Host", + "name": "neo4jHost", + "widget-attributes": { + "placeholder": "Neo4j host." + } + }, + { + "widget-type": "textbox", + "label": "Neo4j Port", + "name": "neo4jPort", + "widget-attributes": { + "default": 7687, + "placeholder": "Neo4j port." + } + }, + { + "widget-type": "textbox", + "label": "Input Query", + "name": "inputQuery", + "widget-attributes": { + "placeholder": "Query for getting data from Neo4j." + } + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "username", + "widget-attributes": { + "placeholder": "Username for connect to Neo4j." + } + }, + { + "widget-type": "password", + "label": "Password", + "name": "password", + "widget-attributes": { + "placeholder": "Password for connect to Neo4j." + } + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "number", + "label": "Splits Number", + "name": "splitNum", + "widget-attributes": { + "default": 1, + "placeholder": "Number of splits." + } + }, + { + "widget-type": "textbox", + "label": "Order By", + "name": "orderBy", + "widget-attributes": { + } + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "label": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "long", + "double", + "bytes", + "string", + "array" + ], + "schema-default-type": "string" + } + } + ] +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index cbb1fe5fe..7e0334320 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ aurora-mysql-plugin aurora-postgresql-plugin memsql-plugin + neo4j-plugin