Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2930: Support for query cancellation for spatial property functions #2931

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset
// See also query substitution handled in QueryExecBuilder
this.initialBinding = initialToEngine;

// Cancel signal may originate from an e.c. an update execution.
// Cancel signal may originate from e.g. an update execution.
this.cancelSignal = Context.getOrSetCancelSignal(context);

init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.jena.geosparql.geo.topological;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.*;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.geosparql.geof.topological.GenericFilterFunction;
import org.apache.jena.geosparql.implementation.GeometryWrapper;
import org.apache.jena.geosparql.implementation.index.QueryRewriteIndex;
Expand All @@ -37,9 +36,7 @@
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;
import org.apache.jena.sparql.engine.iterator.QueryIterConcat;
import org.apache.jena.sparql.engine.iterator.QueryIterNullIterator;
import org.apache.jena.sparql.engine.iterator.QueryIterSingleton;
import org.apache.jena.sparql.engine.iterator.*;
import org.apache.jena.sparql.expr.ExprEvalException;
import org.apache.jena.sparql.pfunction.PFuncSimple;
import org.apache.jena.sparql.util.FmtUtils;
Expand Down Expand Up @@ -87,10 +84,8 @@ public QueryIterator execEvaluated(Binding binding, Node subject, Node predicate
}

private QueryIterator bothBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

Graph graph = execCxt.getActiveGraph();
QueryRewriteIndex queryRewriteIndex = QueryRewriteIndex.retrieve(execCxt);

Boolean isPositiveResult = queryRewrite(graph, subject, predicate, object, queryRewriteIndex);
if (isPositiveResult) {
//Filter function test succeded so retain binding.
Expand All @@ -99,47 +94,37 @@ private QueryIterator bothBound(Binding binding, Node subject, Node predicate, N
//Filter function test failed so null result.
return QueryIterNullIterator.create(execCxt);
}

}

private QueryIterator bothUnbound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
Var subjectVar = Var.alloc(subject.getName());

Graph graph = execCxt.getActiveGraph();

//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
ExtendedIterator<Triple> subjectTriples;
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
subjectTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
} else if (graph.contains(null, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(null, RDF.type.asNode(), Geo.GEOMETRY_NODE)) {
ExtendedIterator<Triple> featureTriples = graph.find(null, RDF.type.asNode(), Geo.FEATURE_NODE);
ExtendedIterator<Triple> geometryTriples = graph.find(null, RDF.type.asNode(), Geo.GEOMETRY_NODE);
subjectTriples = featureTriples.andThen(geometryTriples);
} else {
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
subjectTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
}

//Bind all the Spatial Objects or Geo Predicates once as the subject and search for corresponding Objects.
while (subjectTriples.hasNext()) {
Triple subjectTriple = subjectTriples.next();
Node boundSubject = subjectTriple.getSubject();
Binding subjectBind = BindingFactory.binding(binding, subjectVar, boundSubject);
QueryIterator queryIter = oneBound(subjectBind, boundSubject, predicate, object, execCxt);
queryIterConcat.add(queryIter);
}
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);
ExtendedIterator<Binding> iterator = spatialTriples
.mapWith(Triple::getSubject)
.mapWith(node -> BindingFactory.binding(binding, subjectVar, node));

QueryIter queryIter = QueryIter.flatMap(
QueryIterPlainWrapper.create(iterator, execCxt),
b -> oneBound(b, b.get(subjectVar), predicate, object, execCxt),
execCxt
);
return queryIter;
}

return queryIterConcat;
private static boolean isCancelled(ExecutionContext execCxt) {
return execCxt.getCancelSignal() != null && execCxt.getCancelSignal().get();
}

private QueryIterator oneBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

Graph graph = execCxt.getActiveGraph();
Node boundNode;
Node unboundNode;
Boolean isSubjectBound;
boolean isSubjectBound;
if (subject.isConcrete()) {
//Subject is bound, object is unbound.
boundNode = subject;
Expand All @@ -152,15 +137,18 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No
isSubjectBound = false;
}

if (!(boundNode.isLiteral() || graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
if (!(boundNode.isLiteral() ||
graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) ||
graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) ||
graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
if (!graph.contains(boundNode, SpatialExtension.GEO_LAT_NODE, null)) {
//Bound node is not a Feature or a Geometry or has Geo predicates so exit.
return QueryIterNullIterator.create(execCxt);
}
}

boolean isSpatialIndex = SpatialIndex.isDefined(execCxt);
QueryIterConcat queryIterConcat;
QueryIter queryIterConcat;
if (!isSpatialIndex || filterFunction.isDisjoint() || filterFunction.isDisconnected()) {
//Disjointed so retrieve all cases.
queryIterConcat = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
Expand All @@ -172,13 +160,33 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No
return queryIterConcat;
}

private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {
private QueryIter findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {

//Prepare the results.
Var unboundVar = Var.alloc(unboundNode.getName());
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);

//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);

ExtendedIterator<Binding> iterator = spatialTriples.
mapWith(Triple::getSubject).
mapWith(node -> BindingFactory.binding(binding, unboundVar, node));
return QueryIter.flatMap(
QueryIterPlainWrapper.create(iterator, execCxt),
b -> {
Node spatialNode = b.get(unboundVar);
QueryIterator iter;
if (isSubjectBound) {
iter = bothBound(b, boundNode, predicate, spatialNode, execCxt);
} else {
iter = bothBound(b, spatialNode, predicate, boundNode, execCxt);
}
return iter;
},
execCxt);
}

private static ExtendedIterator<Triple> findSpatialTriples(Graph graph) {
ExtendedIterator<Triple> spatialTriples;
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
spatialTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
Expand All @@ -190,21 +198,7 @@ private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, B
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
spatialTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
}

while (spatialTriples.hasNext()) {
Triple spatialTriple = spatialTriples.next();
Node spatialNode = spatialTriple.getSubject();
Binding newBind = BindingFactory.binding(binding, unboundVar, spatialNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, spatialNode, execCxt);
} else {
queryIter = bothBound(newBind, spatialNode, predicate, boundNode, execCxt);
}
queryIterConcat.add(queryIter);
}

return queryIterConcat;
return spatialTriples;
}

private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException {
Expand Down Expand Up @@ -238,39 +232,54 @@ private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode,
Envelope searchEnvelope = transformedGeom.getEnvelope();
HashSet<Resource> features = spatialIndex.query(searchEnvelope);

//Check each of the Features that match the search.
for (Resource feature : features) {
Node featureNode = feature.asNode();

//Ensure not already an asserted node.
if (!assertedNodes.contains(featureNode)) {

Binding newBind = BindingFactory.binding(binding, unboundVar, featureNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, featureNode, execCxt);
} else {
queryIter = bothBound(newBind, featureNode, predicate, boundNode, execCxt);
}
queryIterConcat.add(queryIter);
}

//Also test all Geometry of the Features. All, some or one Geometry may have matched.
List<Node> featureGeometryTriples = G.listSP(graph, feature.asNode(), Geo.HAS_GEOMETRY_NODE);
for ( Node geomNode : featureGeometryTriples) {
//Ensure not already an asserted node.
if (!assertedNodes.contains(geomNode)) {
Binding newBind = BindingFactory.binding(binding, unboundVar, geomNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, geomNode, execCxt);
} else {
queryIter = bothBound(newBind, geomNode, predicate, boundNode, execCxt);
// Check each of the Features that match the search.
QueryIterator queryIterator = QueryIterPlainWrapper.create(
Iter.map(features.iterator(),
feature -> BindingFactory.binding(binding, unboundVar, feature.asNode())),
execCxt);

queryIterator = QueryIter.flatMap(queryIterator, b -> {
Node featureNode = b.get(unboundVar);
QueryIterConcat featureIterConcat = new QueryIterConcat(execCxt);

// Check Features directly if not already asserted
if (!assertedNodes.contains(featureNode)) {
QueryIterator tmpIter;
if (isSubjectBound) {
tmpIter = bothBound(b, boundNode, predicate, featureNode, execCxt);
} else {
tmpIter = bothBound(b, featureNode, predicate, boundNode, execCxt);
}
featureIterConcat.add(tmpIter);
}
queryIterConcat.add(queryIter);
}
}
}

// Also test all Geometry of the Features. All, some or one Geometry may have matched.
ExtendedIterator<Node> featureGeometries = G.iterSP(graph, featureNode, Geo.HAS_GEOMETRY_NODE);
QueryIterator geometriesQueryIterator = QueryIterPlainWrapper.create(
Iter.map(
Iter.filter( // omit asserted
featureGeometries,
geometry -> !assertedNodes.contains(geometry)
),
geometryNode -> BindingFactory.binding(binding, unboundVar, geometryNode)),
execCxt);
geometriesQueryIterator = QueryIter.flatMap(
geometriesQueryIterator,
b2 -> {
Node geomNode = b2.get(unboundVar);

if (isSubjectBound) {
return bothBound(b2, boundNode, predicate, geomNode, execCxt);
} else {
return bothBound(b2, geomNode, predicate, boundNode, execCxt);
}
},
execCxt);
featureIterConcat.add(geometriesQueryIterator);
return featureIterConcat;
},
execCxt);
queryIterConcat.add(queryIterator);

return queryIterConcat;
} catch (MismatchedDimensionException | TransformException | FactoryException | SpatialIndexException ex) {
Expand Down
Loading
Loading