Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,14 @@ protected Iterator<Traverser.Admin<S>> computerAlgorithm() throws NoSuchElementE
throw new IllegalStateException("The repeat()-traversal was not defined: " + this);

final Traverser.Admin<S> start = this.starts.next();
System.out.printf("RepeatStep.computerAlgorithm: received traverser%n");
// System.out.printf("RepeatStep.computerAlgorithm: received traverser%n");
if (doUntil(start, true)) {
System.out.printf("RepeatStep.computerAlgorithm: doUntil=true, exiting%n");
// System.out.printf("RepeatStep.computerAlgorithm: doUntil=true, exiting%n");
start.setStepId(this.getNextStep().getId());
start.addLabels(this.labels);
return IteratorUtils.of(start);
} else {
System.out.printf("RepeatStep.computerAlgorithm: entering repeat body%n");
// System.out.printf("RepeatStep.computerAlgorithm: entering repeat body%n");
start.setStepId(this.repeatTraversal.getStartStep().getId());
String ln;
if (this.loopName != null) {
Expand Down Expand Up @@ -392,6 +392,7 @@ protected Iterator<Traverser.Admin<S>> computerAlgorithm() throws NoSuchElementE
// System.out.printf("RepeatEndStep.computerAlgorithm: %s loops=%d before incrLoops%n", start.path(), start.loops());
start.incrLoops();
// System.out.printf("RepeatEndStep.computerAlgorithm: %s loops=%d after incrLoops%n", start.path(), start.loops());
// System.out.println("RepeatEndStep traverser: " + start);
if (repeatStep.doUntil(start, false)) {
// System.out.printf("RepeatEndStep.computerAlgorithm: doUntil=true, calling resetLoops for %s%n", start.path());
start.resetLoops();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.filter;

import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BinaryOperator;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
Expand All @@ -32,12 +43,6 @@
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;

/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
Expand Down Expand Up @@ -91,7 +96,9 @@ public String toString() {
@Override
public void processAllStarts() {
while (this.starts.hasNext()) {
this.createProjectedTraverser(this.starts.next()).ifPresent(traverserSet::add);
Traverser.Admin<S> next = this.starts.next();
//System.out.println("Adding start: " + next + " to traverserSet: " + this.traverserSet);
this.createProjectedTraverser(next).ifPresent(traverserSet::add);
}
}

Expand All @@ -106,6 +113,7 @@ public void barrierConsumer(final TraverserSet<S> traverserSet) {
totalWeight = totalWeight + (((ProjectedTraverser<S, Number>) s).getProjections().get(0).doubleValue() * s.bulk());
}
///////
//System.out.println("Sampling from traverserSet: " + traverserSet);
final TraverserSet<S> sampledSet = (TraverserSet<S>) this.traversal.getTraverserSetSupplier().get();
int runningAmountToSample = 0;
while (runningAmountToSample < this.amountToSample) {
Expand All @@ -119,6 +127,7 @@ public void barrierConsumer(final TraverserSet<S> traverserSet) {
if (random.nextDouble() <= ((currentWeight / runningTotalWeight))) {
final Traverser.Admin<S> split = s.split();
split.setBulk(1L);
//System.out.println("Adding sample: " + split);
sampledSet.add(split);
runningAmountToSample++;
reSample = true;
Expand All @@ -132,7 +141,63 @@ public void barrierConsumer(final TraverserSet<S> traverserSet) {
}
}
traverserSet.clear();
//System.out.println("SampledSet: " + sampledSet);
traverserSet.addAll(sampledSet);
//System.out.println("TraverserSet: " + traverserSet);
}

@Override
public MemoryComputeKey<TraverserSet<S>> getMemoryComputeKey() {
// return MemoryComputeKey.of(this.getId(), (ts1, ts2) -> {
// System.out.println("Merging traverserSets: " + ts1 + " + " + ts2);
// return (TraverserSet<S>) Operator.addAll.apply(ts1, ts2);
// }, false, true);
return MemoryComputeKey.of(this.getId(), new SampleBiOperator<>(this.amountToSample), false, true);
}

public static final class SampleBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable {

private long sampleSize;

private SampleBiOperator() {
// for serializers that need a no-arg constructor
}

public SampleBiOperator(final long sampleSize) {
this.sampleSize = sampleSize;
}

@Override
public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) {
//System.out.println("SampleBiOperator.apply: " + setA + " -> " + setB);

Map<Integer, TraverserSet> loopMap = new TreeMap<>(Comparator.reverseOrder());

for (Traverser.Admin<S> traverser : setA) {
loopMap.computeIfAbsent(traverser.loops(), integer -> new TraverserSet<>()).add(traverser);
}
for (Traverser.Admin<S> traverser : setB) {
loopMap.computeIfAbsent(traverser.loops(), integer -> new TraverserSet<>()).add(traverser);
}

return loopMap.entrySet().iterator().next().getValue();

// return (TraverserSet<S>) Operator.addAll.apply(setA, setB);
// setA.addAll(setB);
// if (this.sampleSize != -1 && setA.bulkSize() > this.sampleSize) {
// long counter = 0L;
// final Iterator<Traverser.Admin<S>> traversers = setA.iterator();
// while (traversers.hasNext()) {
// final Traverser.Admin<S> traverser = traversers.next();
// if (counter > this.sampleSize) {
// System.out.println("SampleBiOperator removed traverser: " + traverser);
// traversers.remove();
// }
// counter = counter + traverser.bulk();
// }
// }
// return setA;
}
}


Expand All @@ -154,7 +219,7 @@ private Optional<ProjectedTraverser<S, Number>> createProjectedTraverser(final T

@Override
public Set<TraverserRequirement> getRequirements() {
return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.NESTED_LOOP);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I don't think it's necessary to add a nested loop requirement to this step, as in theory the only time this will be needed is if its inside a nested repeat step, in which case RepeatStep itself will already be providing the nested loop requirement. A similar argument would apply to any other steps which may be given "loop awareness" such as RangeGlobalStep. Note how even LoopsStep does not explicitly set a single loop or nested loop requirement.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public B_NL_O_S_SE_SL_Traverser(final T t, final Step<T, ?> step, final long ini

@Override
public int loops() {
return this.nestedLoops.peek().count();
return this.nestedLoops.isEmpty() ? 0 : this.nestedLoops.peek().count();
}

@Override
Expand Down
Loading