Skip to content

Fix missing headers in csv output and enable schema inference in export-pg-from-queries #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
### New Features and Improvements:

- Migrate to AWS SDK for Java v2
- Enable csv file rewrites for `export-pg-from-queries` to ensure output conforms to inferred schema

### Bug Fixes:

- Add retries to Neptune clone creation
- Fix missing csv headers from `export-pg-from-queries`

## Neptune Export v1.1.11 (Release Date: Mar 10, 2025):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ public void run() {
directories.writeResultsDirectoryPathAsMessage(target.description(), target);

queriesResource.writeResourcePathAsMessage(target);
configFileResource.save(graphSchema, false);
statsFileResource.save(exportStats, graphSchema);
if (structuredOutput) {
configFileResource.save(graphSchema, false);
statsFileResource.save(exportStats, graphSchema);
}

directories.writeRootDirectoryPathAsReturnValue(target);
onExportComplete(directories, exportStats, cluster, graphSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionExce

Collection<FileSpecificLabelSchemas> nodesFileSpecificLabelSchemas = new ArrayList<>();
Collection<FileSpecificLabelSchemas> edgesFileSpecificLabelSchemas = new ArrayList<>();
Collection<FileSpecificLabelSchemas> queryResultsFileSpecificLabelSchemas = new ArrayList<>();

LabelsFilter nodeLabelFilter = new AllLabels(NodeLabelStrategy.nodeLabelsOnly);
LabelsFilter edgeLabelFilter = new AllLabels(EdgeLabelStrategy.edgeLabelsOnly);
Expand All @@ -97,7 +98,7 @@ private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionExce
if (exportSpecification.getGraphElementType() == GraphElementType.nodes) {
nodeLabelFilter = exportSpecification.getLabelsFilter();
}
else {
else if (exportSpecification.getGraphElementType() == GraphElementType.edges) {
edgeLabelFilter = exportSpecification.getLabelsFilter();
}
}
Expand Down Expand Up @@ -139,18 +140,28 @@ private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionExce
Map<GraphElementType, FileSpecificLabelSchemas> result = future.get();
nodesFileSpecificLabelSchemas.add(result.get(GraphElementType.nodes));
edgesFileSpecificLabelSchemas.add(result.get(GraphElementType.edges));
queryResultsFileSpecificLabelSchemas.add(result.get(GraphElementType.queryResults));
}

RewriteCommand rewriteCommand = targetConfig.createRewriteCommand(concurrencyConfig, featureToggles);
Map<GraphElementType, GraphElementSchemas> graphElementSchemas = new HashMap<>();

for(ExportSpecification exportSpecification : exportSpecifications) {
MasterLabelSchemas masterLabelSchemas = exportSpecification.createMasterLabelSchemas(
exportSpecification.getGraphElementType().equals(GraphElementType.nodes) ?
nodesFileSpecificLabelSchemas : edgesFileSpecificLabelSchemas
);
if (structuredOutput) {
for(ExportSpecification exportSpecification : exportSpecifications) {
MasterLabelSchemas masterLabelSchemas = exportSpecification.createMasterLabelSchemas(
exportSpecification.getGraphElementType().equals(GraphElementType.nodes) ?
nodesFileSpecificLabelSchemas : edgesFileSpecificLabelSchemas
);
try {
graphElementSchemas.put(exportSpecification.getGraphElementType(), rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
} else {
MasterLabelSchemas masterLabelSchemas = new MasterLabelSchemas(queryResultsFileSpecificLabelSchemas, GraphElementType.queryResults);
try {
graphElementSchemas.put(exportSpecification.getGraphElementType(), rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
graphElementSchemas.put(GraphElementType.queryResults, rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public Map<GraphElementType, FileSpecificLabelSchemas> call() throws Exception {
Map<GraphElementType, FileSpecificLabelSchemas> fileSpecificLabelSchemasMap = new HashMap<>();
fileSpecificLabelSchemasMap.put(GraphElementType.nodes, new FileSpecificLabelSchemas());
fileSpecificLabelSchemasMap.put(GraphElementType.edges, new FileSpecificLabelSchemas());
fileSpecificLabelSchemasMap.put(GraphElementType.queryResults, new FileSpecificLabelSchemas());

try {

Expand Down Expand Up @@ -186,6 +187,7 @@ private void executeQuery(NamedQuery namedQuery,
}
else {
ResultsHandler resultsHandler = new ResultsHandler(
fileSpecificLabelSchemasMap.get(GraphElementType.queryResults),
new Label(namedQuery.name()),
labelWriters,
writerFactory,
Expand Down Expand Up @@ -219,8 +221,10 @@ private class ResultsHandler implements GraphElementHandler<Map<?, ?>> {
private final Map<Label, LabelWriter<Map<?, ?>>> labelWriters;
private final QueriesWriterFactory writerFactory;
private final GraphElementSchemas graphElementSchemas;
private final FileSpecificLabelSchemas fileSpecificLabelSchemas;

private ResultsHandler(Label label,
private ResultsHandler(FileSpecificLabelSchemas fileSpecificLabelSchemas,
Label label,
Map<Label, LabelWriter<Map<?, ?>>> labelWriters,
QueriesWriterFactory writerFactory,
GraphElementSchemas graphElementSchemas) {
Expand All @@ -229,6 +233,7 @@ private ResultsHandler(Label label,
this.writerFactory = writerFactory;

this.graphElementSchemas = graphElementSchemas;
this.fileSpecificLabelSchemas = fileSpecificLabelSchemas;
}

private void createWriter(Map<?, ?> properties, boolean allowStructuralElements) {
Expand All @@ -242,7 +247,10 @@ private void createWriter(Map<?, ?> properties, boolean allowStructuralElements)
PropertyGraphPrinter propertyGraphPrinter =
writerFactory.createPrinter(Directories.fileName(label.fullyQualifiedLabel(), index), labelSchema, targetConfig);

labelWriters.put(label, writerFactory.createLabelWriter(propertyGraphPrinter, label));
LabelWriter<Map<?, ?>> labelWriter = writerFactory.createLabelWriter(propertyGraphPrinter, label);

labelWriters.put(label, labelWriter);
fileSpecificLabelSchemas.add(labelWriter.outputId(), targetConfig.format(), labelSchema);

} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private MasterLabelSchema rewriteAndMerge(PropertyGraphTargetConfig targetConfig

if (graphElementType == GraphElementType.nodes) {
printer.printNode(record.get("~id"), Arrays.asList(record.get("~label").split(";")));
} else {
} else if (graphElementType == GraphElementType.edges) {
if (label.hasFromAndToLabels()) {
printer.printEdge(
record.get("~id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private MasterLabelSchema rewrite(PropertyGraphTargetConfig targetConfig,

if (graphElementType == GraphElementType.nodes) {
target.printNode(record.get("~id"), Arrays.asList(record.get("~label").split(";")));
} else {
} else if (graphElementType == GraphElementType.edges) {
if (label.hasFromAndToLabels()) {
target.printEdge(
record.get("~id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,40 +121,7 @@ public ExportPropertyGraphTask createExportTask(GraphSchema graphSchema,
}

public MasterLabelSchemas createMasterLabelSchemas(Collection<FileSpecificLabelSchemas> fileSpecificLabelSchemasCollection) {

Set<Label> labels = new HashSet<>();

fileSpecificLabelSchemasCollection.forEach(s -> labels.addAll(s.labels()));

Map<Label, MasterLabelSchema> masterLabelSchemas = new HashMap<>();

for (Label label : labels) {

LabelSchema masterLabelSchema = new LabelSchema(label);
Collection<FileSpecificLabelSchema> fileSpecificLabelSchemas = new ArrayList<>();

for (FileSpecificLabelSchemas fileSpecificLabelSchemasForTask : fileSpecificLabelSchemasCollection) {
if (fileSpecificLabelSchemasForTask.hasSchemasForLabel(label)) {
Set<LabelSchema> labelSchemaSet = new HashSet<>();
for (FileSpecificLabelSchema fileSpecificLabelSchema :
fileSpecificLabelSchemasForTask.fileSpecificLabelSchemasFor(label)) {
fileSpecificLabelSchemas.add(fileSpecificLabelSchema);
labelSchemaSet.add(fileSpecificLabelSchema.labelSchema());
}
for (LabelSchema labelSchema : labelSchemaSet) {
masterLabelSchema = masterLabelSchema.union(labelSchema);
}
}
}

masterLabelSchemas.put(
label,
new MasterLabelSchema(masterLabelSchema, fileSpecificLabelSchemas));


}

return new MasterLabelSchemas(masterLabelSchemas, graphElementType);
return new MasterLabelSchemas(fileSpecificLabelSchemasCollection, graphElementType);
}

public Collection<ExportSpecification> splitByLabel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import com.amazonaws.services.neptune.propertygraph.NodesClient;
import com.amazonaws.services.neptune.propertygraph.io.EdgesWriterFactory;
import com.amazonaws.services.neptune.propertygraph.io.NodesWriterFactory;
import com.amazonaws.services.neptune.propertygraph.io.QueriesWriterFactory;
import com.amazonaws.services.neptune.propertygraph.io.WriterFactory;
import com.amazonaws.services.neptune.propertygraph.io.result.PGResult;
import com.amazonaws.services.neptune.util.NotImplementedException;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

public enum GraphElementType {

Expand Down Expand Up @@ -59,6 +62,22 @@ public GraphClient<? extends PGResult> graphClient(GraphTraversalSource g, boole
public WriterFactory<? extends PGResult> writerFactory() {
return new EdgesWriterFactory();
}
},
queryResults {
@Override
public Collection<String> tokenNames() {
return Collections.emptyList();
}

@Override
public GraphClient<? extends PGResult> graphClient(GraphTraversalSource g, boolean tokensOnly, ExportStats stats, FeatureToggles featureToggles) {
throw new NotImplementedException();
}

@Override
public WriterFactory writerFactory() {
return new QueriesWriterFactory();
}
};

public abstract Collection<String> tokenNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

import com.amazonaws.services.neptune.propertygraph.Label;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class MasterLabelSchemas {

Expand All @@ -27,6 +31,10 @@ public MasterLabelSchemas(Map<Label, MasterLabelSchema> masterLabelSchemas, Grap
this.graphElementType = graphElementType;
}

public MasterLabelSchemas(Collection<FileSpecificLabelSchemas> fileSpecificLabelSchemasCollection, GraphElementType graphElementType) {
this(convertFileSpecificLabelSchemas(fileSpecificLabelSchemasCollection), graphElementType);
}

public Collection<MasterLabelSchema> schemas() {
return masterLabelSchemas.values();
}
Expand All @@ -42,4 +50,37 @@ public GraphElementSchemas toGraphElementSchemas() {
}
return graphElementSchemas;
}

private static Map<Label, MasterLabelSchema> convertFileSpecificLabelSchemas(Collection<FileSpecificLabelSchemas> fileSpecificLabelSchemasCollection) {
Set<Label> labels = new HashSet<>();

fileSpecificLabelSchemasCollection.forEach(s -> labels.addAll(s.labels()));

Map<Label, MasterLabelSchema> masterLabelSchemas = new HashMap<>();

for (Label label : labels) {

LabelSchema masterLabelSchema = new LabelSchema(label);
Collection<FileSpecificLabelSchema> fileSpecificLabelSchemas = new ArrayList<>();

for (FileSpecificLabelSchemas fileSpecificLabelSchemasForTask : fileSpecificLabelSchemasCollection) {
if (fileSpecificLabelSchemasForTask.hasSchemasForLabel(label)) {
Set<LabelSchema> labelSchemaSet = new HashSet<>();
for (FileSpecificLabelSchema fileSpecificLabelSchema :
fileSpecificLabelSchemasForTask.fileSpecificLabelSchemasFor(label)) {
fileSpecificLabelSchemas.add(fileSpecificLabelSchema);
labelSchemaSet.add(fileSpecificLabelSchema.labelSchema());
}
for (LabelSchema labelSchema : labelSchemaSet) {
masterLabelSchema = masterLabelSchema.union(labelSchema);
}
}
}

masterLabelSchemas.put(
label,
new MasterLabelSchema(masterLabelSchema, fileSpecificLabelSchemas));
}
return masterLabelSchemas;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,48 @@ public void testExportPgFromQueries() {
assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueries"), resultDir);
}

@Test
public void testExportPgFromQueriesNoHeaders() {
final String[] command = {"export-pg-from-queries", "-e", neptuneEndpoint,
"-d", outputDir.getPath(), "--format", "csvNoHeaders",
"-q", "airport=g.V().hasLabel('airport').has('runways', gt(2)).project('code', 'runways', 'city', 'country').by('code').by('runways').by('city').by('country')"
};
final NeptuneExportRunner runner = new NeptuneExportRunner(command);
runner.run();

final File resultDir = outputDir.listFiles()[0];

assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesNoHeaders"), resultDir);
}

@Test
public void testExportPgFromQueriesWithStaggeredResults() {
final String[] command = {"export-pg-from-queries", "-e", neptuneEndpoint,
"-d", outputDir.getPath(),
"-q", "airport=g.inject(['code': 'YYC'], ['city': 'Vancouver', 'code': 'YVR'], ['code':'SEA', 'city':'Seattle', 'runways': 3])"
};
final NeptuneExportRunner runner = new NeptuneExportRunner(command);
runner.run();

final File resultDir = outputDir.listFiles()[0];

assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesWithStaggeredResults"), resultDir);
}

@Test
public void testExportPgFromQueriesWithStaggeredResultsNoHeaders() {
final String[] command = {"export-pg-from-queries", "-e", neptuneEndpoint,
"-d", outputDir.getPath(), "--format", "csvNoHeaders",
"-q", "airport=g.inject(['code':'SEA', 'city':'Seattle', 'runways': 3], ['city': 'Vancouver', 'code': 'YVR'], ['code': 'YYC'])"
};
final NeptuneExportRunner runner = new NeptuneExportRunner(command);
runner.run();

final File resultDir = outputDir.listFiles()[0];

assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesWithStaggeredResultsNoHeaders"), resultDir);
}

@Test
public void testExportPgFromQueriesSplitQueries() {
final String[] command = {"export-pg-from-queries", "-e", neptuneEndpoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
code,runways,city,country
"ANC",3,"Anchorage","US"
"BWI",3,"Baltimore","US"
"DCA",3,"Washington D.C.","US"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[ {
"name" : "airport",
"queries" : [ "g.V().hasLabel('airport').has('runways', gt(2)).project('code', 'runways', 'city', 'country').by('code').by('runways').by('city').by('country')" ]
} ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"ANC",3,"Anchorage","US"
"BWI",3,"Baltimore","US"
"DCA",3,"Washington D.C.","US"
"PBI",3,"West Palm Beach","US"
"PHX",3,"Phoenix","US"
"RDU",3,"Raleigh","US"
"SEA",3,"Seattle","US"
"SJC",3,"San Jose","US"
"TPA",3,"Tampa","US"
"LGB",3,"Long Beach","US"
"SAT",3,"San Antonio","US"
"EWR",3,"Newark","US"
"ELP",3,"El Paso","US"
"CLE",3,"Cleveland","US"
"TUS",3,"Tucson","US"
"SAF",3,"Santa Fe","US"
"BNA",4,"Nashville","US"
"IAD",4,"Washington D.C.","US"
"JFK",4,"New York","US"
"LAX",4,"Los Angeles","US"
"MCO",4,"Orlando","US"
"MIA",4,"Miami","US"
"MSP",4,"Minneapolis","US"
"SFO",4,"San Francisco","US"
"SLC",4,"Salt Lake City","US"
"LAS",4,"Las Vegas","US"
"HNL",4,"Honolulu","US"
"HOU",4,"Houston","US"
"OAK",4,"Oakland","US"
"PHL",4,"Philadelphia","US"
"ATL",5,"Atlanta","US"
"IAH",5,"Houston","US"
"BOS",6,"Boston","US"
"DEN",6,"Denver","US"
"DTW",6,"Detroit","US"
"DFW",7,"Dallas","US"
"ORD",8,"Chicago","US"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[ {
"name" : "airport",
"queries" : [ "g.inject(['code': 'YYC'], ['city': 'Vancouver', 'code': 'YVR'], ['code':'SEA', 'city':'Seattle', 'runways': 3])" ]
} ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
code,city,runways
"YYC",,
"YVR","Vancouver",
"SEA","Seattle",3
Loading
Loading