diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java index f79786f4991..1dcfcbb3310 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java @@ -462,6 +462,8 @@ private void startQueryIteratorActual() { execInit(); + context.set(ARQConstants.symCancelQuery, cancelSignal); + /* Timeouts: * -1,-1 No timeouts * N, same as -1,N Overall timeout only. No wrapper needed. diff --git a/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java b/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java new file mode 100644 index 00000000000..26e640713eb --- /dev/null +++ b/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java @@ -0,0 +1,140 @@ +package org.apache.jena.geosparql.geo.topological; + + +import org.apache.jena.geosparql.configuration.GeoSPARQLConfig; +import org.apache.jena.geosparql.implementation.datatype.WKTDatatype; +import org.apache.jena.geosparql.implementation.index.IndexConfiguration; +import org.apache.jena.geosparql.implementation.vocabulary.Geo; +import org.apache.jena.geosparql.spatial.SpatialIndex; +import org.apache.jena.geosparql.spatial.SpatialIndexException; +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.NodeFactory; +import org.apache.jena.graph.Triple; +import org.apache.jena.query.*; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.sparql.graph.GraphFactory; +import org.apache.jena.vocabulary.RDF; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.stream.IntStream; + +public class CancelQueryTest { + + @Test//(timeout = 10000) + public void test_cancel_spatial_property_function1() { + GeoSPARQLConfig.setup(IndexConfiguration.IndexOption.MEMORY, Boolean.TRUE); + + int maxCancelDelayInMillis = 10000; + boolean useIndex = true; + + // Create a model with spatial triples + int numGeometries = 1000; + + Graph graph = GraphFactory.createDefaultGraph(); + // features + IntStream.range(0, numGeometries) + .mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" + i)) + .forEach(feature -> graph.add(feature, RDF.type.asNode(), Geo.FEATURE_NODE)); + // geometries + IntStream.range(0, numGeometries) + .mapToObj(i -> + Triple.create( + NodeFactory.createURI("http://www.example.org/r" + i), + Geo.HAS_GEOMETRY_PROP.asNode(), + NodeFactory.createURI("http://www.example.org/r" + i + "/geometry") + ) + ) + .forEach(graph::add); + // geo:Geometry type triples + IntStream.range(0, numGeometries) + .mapToObj(i -> + Triple.create( + NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"), + RDF.type.asNode(), + Geo.GEOMETRY_NODE + ) + ) + .forEach(graph::add); + // geometry WKT literals + IntStream.range(0, numGeometries) + .mapToObj(i -> + Triple.create( + NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"), + Geo.AS_WKT_PROP.asNode(), + NodeFactory.createLiteralDT("POINT(2 2)", WKTDatatype.INSTANCE) + ) + ) + .forEach(graph::add); + Model model = ModelFactory.createModelForGraph(graph); + Dataset ds = DatasetFactory.create(model); + + // create spatial index + if (useIndex){ + try { + SpatialIndex index = SpatialIndex.buildSpatialIndex(ds); + SpatialIndex.setSpatialIndex(ds, index); + } catch (SpatialIndexException e) { + throw new RuntimeException(e); + } + } + + + // Create a query that queries for spatial relation with both sides being unbound, thus, pairwise comparison would be needed + Query query = QueryFactory.create("PREFIX geo: SELECT * { ?a geo:sfIntersects ?b . }"); + Callable qeFactory = () -> QueryExecution.dataset(ds).query(query).build();//.timeout(2000, TimeUnit.MILLISECONDS).build(); + + runAsyncAbort(maxCancelDelayInMillis, qeFactory, CancelQueryTest::doCount); + } + + private static int doCount(QueryExecution qe) { + System.out.println("Executing query ..."); + try (QueryExecution qe2 = qe) { + ResultSet rs = qe2.execSelect(); + int size = ResultSetFormatter.consume(rs); + return size; + } + } + + /** + * Reusable method that creates a parallel stream that starts query executions + * and schedules cancel tasks on a separate thread pool. + */ + public static void runAsyncAbort(int delayToAbort, Callable qeFactory, Function processor) { + ExecutorService executorService = Executors.newCachedThreadPool(); + + QueryExecution qe; + try { + qe = qeFactory.call(); + } catch (Exception e) { + throw new RuntimeException("Failed to build a query execution", e); + } + Future future = executorService.submit(() -> processor.apply(qe)); + try { + Thread.sleep(delayToAbort); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("Abort: " + qe); + qe.abort(); + try { + // System.out.println("Waiting for: " + qe); + future.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof QueryCancelledException)) { + // Unexpected exception - print out the stack trace + e.printStackTrace(); + } + Assert.assertEquals(QueryCancelledException.class, cause.getClass()); + } catch (InterruptedException e) { + // Ignored + } finally { + // System.out.println("Completed: " + qe); + executorService.shutdownNow(); + } + } +} \ No newline at end of file