Skip to content

Commit

Permalink
move query cancel signal flag
Browse files Browse the repository at this point in the history
- moved query cancel signal creation to a place before the method
terminates early
- add some simple test
  • Loading branch information
Lorenz Buehmann authored and Aklakan committed Feb 6, 2025
1 parent 3fe5150 commit 90ba206
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ private void startQueryIteratorActual() {

execInit();

context.set(ARQConstants.symCancelQuery, cancelSignal);

/* Timeouts:
* -1,-1 No timeouts
* N, same as -1,N Overall timeout only. No wrapper needed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package org.apache.jena.geosparql.geo.topological;


import org.apache.jena.geosparql.configuration.GeoSPARQLConfig;
import org.apache.jena.geosparql.implementation.datatype.WKTDatatype;
import org.apache.jena.geosparql.implementation.index.IndexConfiguration;
import org.apache.jena.geosparql.implementation.vocabulary.Geo;
import org.apache.jena.geosparql.spatial.SpatialIndex;
import org.apache.jena.geosparql.spatial.SpatialIndexException;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.NodeFactory;
import org.apache.jena.graph.Triple;
import org.apache.jena.query.*;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.sparql.graph.GraphFactory;
import org.apache.jena.vocabulary.RDF;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.IntStream;

public class CancelQueryTest {

@Test//(timeout = 10000)
public void test_cancel_spatial_property_function1() {
GeoSPARQLConfig.setup(IndexConfiguration.IndexOption.MEMORY, Boolean.TRUE);

int maxCancelDelayInMillis = 10000;
boolean useIndex = true;

// Create a model with spatial triples
int numGeometries = 1000;

Graph graph = GraphFactory.createDefaultGraph();
// features
IntStream.range(0, numGeometries)
.mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" + i))
.forEach(feature -> graph.add(feature, RDF.type.asNode(), Geo.FEATURE_NODE));
// geometries
IntStream.range(0, numGeometries)
.mapToObj(i ->
Triple.create(
NodeFactory.createURI("http://www.example.org/r" + i),
Geo.HAS_GEOMETRY_PROP.asNode(),
NodeFactory.createURI("http://www.example.org/r" + i + "/geometry")
)
)
.forEach(graph::add);
// geo:Geometry type triples
IntStream.range(0, numGeometries)
.mapToObj(i ->
Triple.create(
NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"),
RDF.type.asNode(),
Geo.GEOMETRY_NODE
)
)
.forEach(graph::add);
// geometry WKT literals
IntStream.range(0, numGeometries)
.mapToObj(i ->
Triple.create(
NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"),
Geo.AS_WKT_PROP.asNode(),
NodeFactory.createLiteralDT("POINT(2 2)", WKTDatatype.INSTANCE)
)
)
.forEach(graph::add);
Model model = ModelFactory.createModelForGraph(graph);
Dataset ds = DatasetFactory.create(model);

// create spatial index
if (useIndex){
try {
SpatialIndex index = SpatialIndex.buildSpatialIndex(ds);
SpatialIndex.setSpatialIndex(ds, index);
} catch (SpatialIndexException e) {
throw new RuntimeException(e);
}
}


// Create a query that queries for spatial relation with both sides being unbound, thus, pairwise comparison would be needed
Query query = QueryFactory.create("PREFIX geo: <http://www.opengis.net/ont/geosparql#> SELECT * { ?a geo:sfIntersects ?b . }");
Callable<QueryExecution> qeFactory = () -> QueryExecution.dataset(ds).query(query).build();//.timeout(2000, TimeUnit.MILLISECONDS).build();

runAsyncAbort(maxCancelDelayInMillis, qeFactory, CancelQueryTest::doCount);
}

private static int doCount(QueryExecution qe) {
System.out.println("Executing query ...");
try (QueryExecution qe2 = qe) {
ResultSet rs = qe2.execSelect();
int size = ResultSetFormatter.consume(rs);
return size;
}
}

/**
* Reusable method that creates a parallel stream that starts query executions
* and schedules cancel tasks on a separate thread pool.
*/
public static void runAsyncAbort(int delayToAbort, Callable<QueryExecution> qeFactory, Function<QueryExecution, ?> processor) {
ExecutorService executorService = Executors.newCachedThreadPool();

QueryExecution qe;
try {
qe = qeFactory.call();
} catch (Exception e) {
throw new RuntimeException("Failed to build a query execution", e);
}
Future<?> future = executorService.submit(() -> processor.apply(qe));
try {
Thread.sleep(delayToAbort);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Abort: " + qe);
qe.abort();
try {
// System.out.println("Waiting for: " + qe);
future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (!(cause instanceof QueryCancelledException)) {
// Unexpected exception - print out the stack trace
e.printStackTrace();
}
Assert.assertEquals(QueryCancelledException.class, cause.getClass());
} catch (InterruptedException e) {
// Ignored
} finally {
// System.out.println("Completed: " + qe);
executorService.shutdownNow();
}
}
}

0 comments on commit 90ba206

Please sign in to comment.