diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java index a51af34575e..37b2aaf5e22 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java @@ -282,14 +282,14 @@ protected Iterator> computerAlgorithm() throws NoSuchElementE throw new IllegalStateException("The repeat()-traversal was not defined: " + this); final Traverser.Admin start = this.starts.next(); - System.out.printf("RepeatStep.computerAlgorithm: received traverser%n"); +// System.out.printf("RepeatStep.computerAlgorithm: received traverser%n"); if (doUntil(start, true)) { - System.out.printf("RepeatStep.computerAlgorithm: doUntil=true, exiting%n"); +// System.out.printf("RepeatStep.computerAlgorithm: doUntil=true, exiting%n"); start.setStepId(this.getNextStep().getId()); start.addLabels(this.labels); return IteratorUtils.of(start); } else { - System.out.printf("RepeatStep.computerAlgorithm: entering repeat body%n"); +// System.out.printf("RepeatStep.computerAlgorithm: entering repeat body%n"); start.setStepId(this.repeatTraversal.getStartStep().getId()); String ln; if (this.loopName != null) { @@ -392,6 +392,7 @@ protected Iterator> computerAlgorithm() throws NoSuchElementE // System.out.printf("RepeatEndStep.computerAlgorithm: %s loops=%d before incrLoops%n", start.path(), start.loops()); start.incrLoops(); // System.out.printf("RepeatEndStep.computerAlgorithm: %s loops=%d after incrLoops%n", start.path(), start.loops()); +// System.out.println("RepeatEndStep traverser: " + start); if (repeatStep.doUntil(start, false)) { // System.out.printf("RepeatEndStep.computerAlgorithm: doUntil=true, calling resetLoops for %s%n", start.path()); start.resetLoops(); diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java index fbc3e8d66b3..0405b14966e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java @@ -18,6 +18,17 @@ */ package org.apache.tinkerpop.gremlin.process.traversal.step.filter; +import java.io.Serializable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.BinaryOperator; +import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal; @@ -32,12 +43,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Random; -import java.util.Set; - /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ @@ -91,7 +96,9 @@ public String toString() { @Override public void processAllStarts() { while (this.starts.hasNext()) { - this.createProjectedTraverser(this.starts.next()).ifPresent(traverserSet::add); + Traverser.Admin next = this.starts.next(); + //System.out.println("Adding start: " + next + " to traverserSet: " + this.traverserSet); + this.createProjectedTraverser(next).ifPresent(traverserSet::add); } } @@ -106,6 +113,7 @@ public void barrierConsumer(final TraverserSet traverserSet) { totalWeight = totalWeight + (((ProjectedTraverser) s).getProjections().get(0).doubleValue() * s.bulk()); } /////// + //System.out.println("Sampling from traverserSet: " + traverserSet); final TraverserSet sampledSet = (TraverserSet) this.traversal.getTraverserSetSupplier().get(); int runningAmountToSample = 0; while (runningAmountToSample < this.amountToSample) { @@ -119,6 +127,7 @@ public void barrierConsumer(final TraverserSet traverserSet) { if (random.nextDouble() <= ((currentWeight / runningTotalWeight))) { final Traverser.Admin split = s.split(); split.setBulk(1L); + //System.out.println("Adding sample: " + split); sampledSet.add(split); runningAmountToSample++; reSample = true; @@ -132,7 +141,63 @@ public void barrierConsumer(final TraverserSet traverserSet) { } } traverserSet.clear(); + //System.out.println("SampledSet: " + sampledSet); traverserSet.addAll(sampledSet); + //System.out.println("TraverserSet: " + traverserSet); + } + + @Override + public MemoryComputeKey> getMemoryComputeKey() { +// return MemoryComputeKey.of(this.getId(), (ts1, ts2) -> { +// System.out.println("Merging traverserSets: " + ts1 + " + " + ts2); +// return (TraverserSet) Operator.addAll.apply(ts1, ts2); +// }, false, true); + return MemoryComputeKey.of(this.getId(), new SampleBiOperator<>(this.amountToSample), false, true); + } + + public static final class SampleBiOperator implements BinaryOperator>, Serializable { + + private long sampleSize; + + private SampleBiOperator() { + // for serializers that need a no-arg constructor + } + + public SampleBiOperator(final long sampleSize) { + this.sampleSize = sampleSize; + } + + @Override + public TraverserSet apply(final TraverserSet setA, final TraverserSet setB) { + //System.out.println("SampleBiOperator.apply: " + setA + " -> " + setB); + + Map loopMap = new TreeMap<>(Comparator.reverseOrder()); + + for (Traverser.Admin traverser : setA) { + loopMap.computeIfAbsent(traverser.loops(), integer -> new TraverserSet<>()).add(traverser); + } + for (Traverser.Admin traverser : setB) { + loopMap.computeIfAbsent(traverser.loops(), integer -> new TraverserSet<>()).add(traverser); + } + + return loopMap.entrySet().iterator().next().getValue(); + +// return (TraverserSet) Operator.addAll.apply(setA, setB); +// setA.addAll(setB); +// if (this.sampleSize != -1 && setA.bulkSize() > this.sampleSize) { +// long counter = 0L; +// final Iterator> traversers = setA.iterator(); +// while (traversers.hasNext()) { +// final Traverser.Admin traverser = traversers.next(); +// if (counter > this.sampleSize) { +// System.out.println("SampleBiOperator removed traverser: " + traverser); +// traversers.remove(); +// } +// counter = counter + traverser.bulk(); +// } +// } +// return setA; + } } @@ -154,7 +219,7 @@ private Optional> createProjectedTraverser(final T @Override public Set getRequirements() { - return this.getSelfAndChildRequirements(TraverserRequirement.BULK); + return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.NESTED_LOOP); } @Override diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/B_NL_O_S_SE_SL_Traverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/B_NL_O_S_SE_SL_Traverser.java index 3ce0884c6eb..b1688553088 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/B_NL_O_S_SE_SL_Traverser.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/B_NL_O_S_SE_SL_Traverser.java @@ -44,7 +44,7 @@ public B_NL_O_S_SE_SL_Traverser(final T t, final Step step, final long ini @Override public int loops() { - return this.nestedLoops.peek().count(); + return this.nestedLoops.isEmpty() ? 0 : this.nestedLoops.peek().count(); } @Override