From 00e19cd7a8b43cb80b533e38da1f4bbb19aa13ed Mon Sep 17 00:00:00 2001 From: Lorenz Buehmann Date: Mon, 18 Nov 2024 12:35:10 +0100 Subject: [PATCH] GH-2930: Support for query cancelation for spatial property functions by creating iterators lazily. --- .../jena/sparql/exec/QueryExecDataset.java | 2 +- .../jena/util/iterator/NiceIterator.java | 6 +- .../topological/GenericPropertyFunction.java | 221 +++++++++--------- .../geo/topological/CancelQueryTest.java | 166 +++++++++++++ 4 files changed, 286 insertions(+), 109 deletions(-) create mode 100644 jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java index f79786f4991..8f0de2b0b10 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java @@ -111,7 +111,7 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset // See also query substitution handled in QueryExecBuilder this.initialBinding = initialToEngine; - // Cancel signal may originate from an e.c. an update execution. + // Cancel signal may originate from e.g. an update execution. this.cancelSignal = Context.getOrSetCancelSignal(context); init(); diff --git a/jena-core/src/main/java/org/apache/jena/util/iterator/NiceIterator.java b/jena-core/src/main/java/org/apache/jena/util/iterator/NiceIterator.java index 4a6871ac781..a30cb7c117d 100644 --- a/jena-core/src/main/java/org/apache/jena/util/iterator/NiceIterator.java +++ b/jena-core/src/main/java/org/apache/jena/util/iterator/NiceIterator.java @@ -227,7 +227,8 @@ Canonical implementation of toSet(). public static Set asSet( ExtendedIterator it ) { Set result = new HashSet<>(); - it.forEachRemaining(result::add); + try { it.forEachRemaining(result::add); } + finally { it.close(); } return result; } @@ -238,7 +239,8 @@ that iterator. Canonical implementation of toList(). public static List asList( ExtendedIterator it ) { List result = new ArrayList<>(); - it.forEachRemaining(result::add); + try { it.forEachRemaining(result::add); } + finally { it.close(); } return result; } } diff --git a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java index 2c2d4e7f65a..e389a681c60 100644 --- a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java +++ b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java @@ -17,10 +17,11 @@ */ package org.apache.jena.geosparql.geo.topological; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.HashSet; import java.util.List; + +import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.geosparql.geof.topological.GenericFilterFunction; import org.apache.jena.geosparql.implementation.GeometryWrapper; import org.apache.jena.geosparql.implementation.index.QueryRewriteIndex; @@ -37,8 +38,10 @@ import org.apache.jena.sparql.engine.QueryIterator; import org.apache.jena.sparql.engine.binding.Binding; import org.apache.jena.sparql.engine.binding.BindingFactory; +import org.apache.jena.sparql.engine.iterator.QueryIter; import org.apache.jena.sparql.engine.iterator.QueryIterConcat; import org.apache.jena.sparql.engine.iterator.QueryIterNullIterator; +import org.apache.jena.sparql.engine.iterator.QueryIterPlainWrapper; import org.apache.jena.sparql.engine.iterator.QueryIterSingleton; import org.apache.jena.sparql.expr.ExprEvalException; import org.apache.jena.sparql.pfunction.PFuncSimple; @@ -83,14 +86,18 @@ public QueryIterator execEvaluated(Binding binding, Node subject, Node predicate //One bound and one unbound. return oneBound(binding, subject, predicate, object, execCxt); } + } + private QueryIterator bothBound(Binding binding, boolean isSubjectBound, Node subject, Node predicate, Node object, ExecutionContext execCxt) { + QueryIterator iter = isSubjectBound + ? bothBound(binding, subject, predicate, object, execCxt) + : bothBound(binding, object, predicate, subject, execCxt); + return iter; } private QueryIterator bothBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) { - Graph graph = execCxt.getActiveGraph(); QueryRewriteIndex queryRewriteIndex = QueryRewriteIndex.retrieve(execCxt); - Boolean isPositiveResult = queryRewrite(graph, subject, predicate, object, queryRewriteIndex); if (isPositiveResult) { //Filter function test succeded so retain binding. @@ -99,39 +106,25 @@ private QueryIterator bothBound(Binding binding, Node subject, Node predicate, N //Filter function test failed so null result. return QueryIterNullIterator.create(execCxt); } - } private QueryIterator bothUnbound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) { - - QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt); Var subjectVar = Var.alloc(subject.getName()); Graph graph = execCxt.getActiveGraph(); //Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present. - ExtendedIterator subjectTriples; - if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) { - subjectTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE); - } else if (graph.contains(null, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(null, RDF.type.asNode(), Geo.GEOMETRY_NODE)) { - ExtendedIterator featureTriples = graph.find(null, RDF.type.asNode(), Geo.FEATURE_NODE); - ExtendedIterator geometryTriples = graph.find(null, RDF.type.asNode(), Geo.GEOMETRY_NODE); - subjectTriples = featureTriples.andThen(geometryTriples); - } else { - //Check for Geo Predicate Features in the Graph if no GeometryLiterals found. - subjectTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null); - } - - //Bind all the Spatial Objects or Geo Predicates once as the subject and search for corresponding Objects. - while (subjectTriples.hasNext()) { - Triple subjectTriple = subjectTriples.next(); - Node boundSubject = subjectTriple.getSubject(); - Binding subjectBind = BindingFactory.binding(binding, subjectVar, boundSubject); - QueryIterator queryIter = oneBound(subjectBind, boundSubject, predicate, object, execCxt); - queryIterConcat.add(queryIter); - } - - return queryIterConcat; + ExtendedIterator spatialTriples = findSpatialTriples(graph); + ExtendedIterator iterator = spatialTriples + .mapWith(Triple::getSubject) + .mapWith(node -> BindingFactory.binding(binding, subjectVar, node)); + + QueryIter queryIter = QueryIter.flatMap( + QueryIterPlainWrapper.create(iterator, execCxt), + b -> oneBound(b, b.get(subjectVar), predicate, object, execCxt), + execCxt + ); + return queryIter; } private QueryIterator oneBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) { @@ -139,7 +132,7 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No Graph graph = execCxt.getActiveGraph(); Node boundNode; Node unboundNode; - Boolean isSubjectBound; + boolean isSubjectBound; if (subject.isConcrete()) { //Subject is bound, object is unbound. boundNode = subject; @@ -152,7 +145,10 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No isSubjectBound = false; } - if (!(boundNode.isLiteral() || graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) { + if (!(boundNode.isLiteral() || + graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || + graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || + graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) { if (!graph.contains(boundNode, SpatialExtension.GEO_LAT_NODE, null)) { //Bound node is not a Feature or a Geometry or has Geo predicates so exit. return QueryIterNullIterator.create(execCxt); @@ -160,25 +156,40 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No } boolean isSpatialIndex = SpatialIndex.isDefined(execCxt); - QueryIterConcat queryIterConcat; + QueryIterator result; if (!isSpatialIndex || filterFunction.isDisjoint() || filterFunction.isDisconnected()) { //Disjointed so retrieve all cases. - queryIterConcat = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt); + result = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt); } else { //Only retrieve those in the spatial index which are within same bounding box. - queryIterConcat = findIndex(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt); + result = findIndex(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt); } - - return queryIterConcat; + return result; } - private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) { + private QueryIterator findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) { //Prepare the results. Var unboundVar = Var.alloc(unboundNode.getName()); - QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt); //Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present. + ExtendedIterator spatialTriples = findSpatialTriples(graph); + + ExtendedIterator iterator = spatialTriples + .mapWith(Triple::getSubject) + .mapWith(node -> BindingFactory.binding(binding, unboundVar, node)); + + return QueryIter.flatMap( + QueryIterPlainWrapper.create(iterator, execCxt), + b -> { + Node spatialNode = b.get(unboundVar); + QueryIterator iter = bothBound(b, isSubjectBound, boundNode, predicate, spatialNode, execCxt); + return iter; + }, + execCxt); + } + + private static ExtendedIterator findSpatialTriples(Graph graph) { ExtendedIterator spatialTriples; if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) { spatialTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE); @@ -190,45 +201,33 @@ private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, B //Check for Geo Predicate Features in the Graph if no GeometryLiterals found. spatialTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null); } - - while (spatialTriples.hasNext()) { - Triple spatialTriple = spatialTriples.next(); - Node spatialNode = spatialTriple.getSubject(); - Binding newBind = BindingFactory.binding(binding, unboundVar, spatialNode); - QueryIterator queryIter; - if (isSubjectBound) { - queryIter = bothBound(newBind, boundNode, predicate, spatialNode, execCxt); - } else { - queryIter = bothBound(newBind, spatialNode, predicate, boundNode, execCxt); - } - queryIterConcat.add(queryIter); - } - - return queryIterConcat; + return spatialTriples; } - private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException { - + private QueryIterator findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException { try { //Prepare for results. - Var unboundVar = Var.alloc(unboundNode.getName()); - QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt); + Var unboundVar = Var.alloc(unboundNode); //Find the asserted triples. - List assertedNodes = !isSubjectBound || !boundNode.isLiteral() ? findAsserted(graph, boundNode, isSubjectBound, predicate) : Collections.emptyList(); - for (Node node : assertedNodes) { - Binding newBind = BindingFactory.binding(binding, unboundVar, node); - QueryIterator queryIter = QueryIterSingleton.create(newBind, execCxt); - queryIterConcat.add(queryIter); - } + Collection assertedNodes = !isSubjectBound || !boundNode.isLiteral() + ? findAsserted(graph, boundNode, isSubjectBound, predicate) + : List.of(); + + QueryIterator assertedNodesIter = QueryIterPlainWrapper.create( + Iter.map(assertedNodes.iterator(), node -> BindingFactory.binding(binding, unboundVar, node)), + execCxt); //Find the GeometryLiteral of the Bound Node. SpatialObjectGeometryLiteral boundGeometryLiteral = SpatialObjectGeometryLiteral.retrieve(graph, boundNode); if (!boundGeometryLiteral.isValid()) { //Bound Node is not a Feature or a Geometry or there is no GeometryLiteral so exit. - return queryIterConcat; + return assertedNodesIter; } + QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt); + queryIterConcat.add(assertedNodesIter); + Node geometryLiteral = boundGeometryLiteral.getGeometryLiteral(); //Perform the search of the Spatial Index of the Dataset. @@ -238,39 +237,19 @@ private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Envelope searchEnvelope = transformedGeom.getEnvelope(); HashSet features = spatialIndex.query(searchEnvelope); - //Check each of the Features that match the search. - for (Resource feature : features) { - Node featureNode = feature.asNode(); - - //Ensure not already an asserted node. - if (!assertedNodes.contains(featureNode)) { - - Binding newBind = BindingFactory.binding(binding, unboundVar, featureNode); - QueryIterator queryIter; - if (isSubjectBound) { - queryIter = bothBound(newBind, boundNode, predicate, featureNode, execCxt); - } else { - queryIter = bothBound(newBind, featureNode, predicate, boundNode, execCxt); - } - queryIterConcat.add(queryIter); - } - - //Also test all Geometry of the Features. All, some or one Geometry may have matched. - List featureGeometryTriples = G.listSP(graph, feature.asNode(), Geo.HAS_GEOMETRY_NODE); - for ( Node geomNode : featureGeometryTriples) { - //Ensure not already an asserted node. - if (!assertedNodes.contains(geomNode)) { - Binding newBind = BindingFactory.binding(binding, unboundVar, geomNode); - QueryIterator queryIter; - if (isSubjectBound) { - queryIter = bothBound(newBind, boundNode, predicate, geomNode, execCxt); - } else { - queryIter = bothBound(newBind, geomNode, predicate, boundNode, execCxt); - } - queryIterConcat.add(queryIter); - } - } - } + // Check each of the Features that match the search. + QueryIterator featuresIter = QueryIterPlainWrapper.create( + Iter.map(features.iterator(), feature -> BindingFactory.binding(binding, unboundVar, feature.asNode())), + execCxt); + + QueryIterator queryIterator = QueryIter.flatMap(featuresIter, + featureBinding -> { + return findByFeature(graph, binding, featureBinding, + isSubjectBound, boundNode, predicate, unboundVar, + execCxt, assertedNodes); + }, + execCxt); + queryIterConcat.add(queryIterator); return queryIterConcat; } catch (MismatchedDimensionException | TransformException | FactoryException | SpatialIndexException ex) { @@ -278,15 +257,46 @@ private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, } } - private List findAsserted(Graph graph, Node boundNode, boolean isSubjectBound, Node predicate) { - List assertedNodes = new ArrayList<>(); - if (isSubjectBound) { - List x = G.listSP(graph, boundNode, predicate); - assertedNodes.addAll(x); - } else { - List x = G.listPO(graph, predicate, boundNode); - assertedNodes.addAll(x); + private QueryIterator findByFeature(Graph graph, Binding binding, Binding featureBinding, + boolean isSubjectBound, Node boundNode, Node predicate, Var unboundVar, + ExecutionContext execCxt, Collection assertedNodes) { + + Node featureNode = featureBinding.get(unboundVar); + QueryIterConcat featureIterConcat = new QueryIterConcat(execCxt); + + // Check Features directly if not already asserted + if (!assertedNodes.contains(featureNode)) { + QueryIterator tmpIter = bothBound(featureBinding, isSubjectBound, boundNode, predicate, featureNode, execCxt); + featureIterConcat.add(tmpIter); } + + // Also test all Geometry of the Features. All, some or one Geometry may have matched. + ExtendedIterator featureGeometries = G.iterSP(graph, featureNode, Geo.HAS_GEOMETRY_NODE); + QueryIterator geometriesQueryIterator = QueryIterPlainWrapper.create( + Iter.map( + Iter.filter( // omit asserted + featureGeometries, + geometry -> !assertedNodes.contains(geometry) + ), + geometryNode -> BindingFactory.binding(binding, unboundVar, geometryNode)), + execCxt); + + geometriesQueryIterator = QueryIter.flatMap( + geometriesQueryIterator, + b2 -> { + Node geomNode = b2.get(unboundVar); + return bothBound(b2, isSubjectBound, boundNode, predicate, geomNode, execCxt); + }, + execCxt); + + featureIterConcat.add(geometriesQueryIterator); + return featureIterConcat; + } + + private List findAsserted(Graph graph, Node boundNode, boolean isSubjectBound, Node predicate) { + List assertedNodes = isSubjectBound + ? G.listSP(graph, boundNode, predicate) + : G.listPO(graph, predicate, boundNode); return assertedNodes; } @@ -323,5 +333,4 @@ protected final Boolean queryRewrite(Graph graph, Node subject, Node predicate, public Boolean testFilterFunction(Node subjectGeometryLiteral, Node objectGeometryLiteral) { return filterFunction.exec(subjectGeometryLiteral, objectGeometryLiteral); } - } diff --git a/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java b/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java new file mode 100644 index 00000000000..7b54ebb9049 --- /dev/null +++ b/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.geosparql.geo.topological; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; +import java.util.stream.IntStream; + +import org.apache.jena.geosparql.configuration.GeoSPARQLConfig; +import org.apache.jena.geosparql.implementation.datatype.WKTDatatype; +import org.apache.jena.geosparql.implementation.index.IndexConfiguration; +import org.apache.jena.geosparql.implementation.vocabulary.Geo; +import org.apache.jena.geosparql.spatial.SpatialIndex; +import org.apache.jena.geosparql.spatial.SpatialIndexException; +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.NodeFactory; +import org.apache.jena.query.Dataset; +import org.apache.jena.query.DatasetFactory; +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryCancelledException; +import org.apache.jena.query.QueryExecution; +import org.apache.jena.query.QueryFactory; +import org.apache.jena.query.ResultSet; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.sparql.graph.GraphFactory; +import org.apache.jena.vocabulary.RDF; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class CancelQueryTest { + + private final int numGeometries; + + public CancelQueryTest(int numGeometries) { + this.numGeometries = numGeometries; + } + + @Parameterized.Parameters(name = "number of geometries: {0}") + public static List sizes() { + return List.of( + 31_623 // The square root of 1 billion approximated as an integer. + ); + } + + public static Graph createSpatialGraph(int numGeometries) { + Graph graph = GraphFactory.createDefaultGraph(); + + IntStream.range(0, numGeometries).forEach(i -> { + // features + graph.add(NodeFactory.createURI("http://www.example.org/r" + i), RDF.type.asNode(), Geo.FEATURE_NODE); + // geometries + graph.add(NodeFactory.createURI("http://www.example.org/r" + i), + Geo.HAS_GEOMETRY_PROP.asNode(), + NodeFactory.createURI("http://www.example.org/r" + i + "/geometry")); + // geo:Geometry type triples + graph.add(NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"), + RDF.type.asNode(), + Geo.GEOMETRY_NODE); + // geometry WKT literals + graph.add(NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"), + Geo.AS_WKT_PROP.asNode(), + NodeFactory.createLiteralDT("POINT(2 2)", WKTDatatype.INSTANCE)); + }); + // System.out.printf("created graph with %d triples and %d geometries\n", graph.size(), numGeometries); + + return graph; + } + + @Test(timeout = 10000) + public void test_cancel_spatial_property_function1() { + GeoSPARQLConfig.setup(IndexConfiguration.IndexOption.MEMORY, Boolean.TRUE); + + long cancelDelayMillis = 1000; + boolean useIndex = true; + + // Create a dataset with spatial triples + Graph graph = createSpatialGraph(numGeometries); + Model model = ModelFactory.createModelForGraph(graph); + Dataset ds = DatasetFactory.create(model); + + // create spatial index + if (useIndex){ + try { + SpatialIndex index = SpatialIndex.buildSpatialIndex(ds); + SpatialIndex.setSpatialIndex(ds, index); + } catch (SpatialIndexException e) { + throw new RuntimeException(e); + } + } + + // Create a query that queries for spatial relation with both sides being unbound, thus, pairwise comparison would be needed + Query query = QueryFactory.create("PREFIX geo: SELECT * { ?a geo:sfIntersects ?b . }"); + Callable qeFactory = () -> QueryExecution.dataset(ds).query(query).build();//.timeout(2000, TimeUnit.MILLISECONDS).build(); + + runAsyncAbort(cancelDelayMillis, qeFactory, CancelQueryTest::doCount); + } + + private static long doCount(QueryExecution qe) { + // System.out.println("Executing query ..."); + long counter = 0; + try (QueryExecution qe2 = qe) { + ResultSet rs = qe2.execSelect(); + while (rs.hasNext()) { + rs.next(); + ++counter; + } + } finally { + // System.out.println("Aborted after seeing " + counter + " bindings"); + } + return counter; + } + + public static void runAsyncAbort(long cancelDelayMillis, Callable qeFactory, Function processor) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + try (QueryExecution qe = qeFactory.call()){ + Future future = executorService.submit(() -> processor.apply(qe)); + try { + Thread.sleep(cancelDelayMillis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // System.out.println("Aborting query execution: " + qe); + qe.abort(); + try { + future.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof QueryCancelledException)) { + e.printStackTrace(); + } + Assert.assertEquals(QueryCancelledException.class, cause.getClass()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to build a query execution", e); + } finally { + // System.out.println("Completed: " + qe); + executorService.shutdownNow(); + } + } +}