Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit 6570da6

Browse files
committed
3.4 batching similarity writes (#808)
* move exporter into the proper package * batch writes of similarity results * default write batch size = 10000
1 parent 8e08e13 commit 6570da6

12 files changed

+76
-31
lines changed

algo/src/main/java/org/neo4j/graphalgo/similarity/CosineProc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Stream<SimilaritySummaryResult> cosine(
7979
Stream<SimilarityResult> stream = generateWeightedStream(configuration, inputs, similarityCutoff, topN, topK, computer);
8080

8181
boolean write = configuration.isWriteFlag(false) && similarityCutoff > 0.0;
82-
return writeAndAggregateResults(stream, inputs.length, write, writeRelationshipType, writeProperty);
82+
return writeAndAggregateResults(stream, inputs.length, configuration, write, writeRelationshipType, writeProperty);
8383
}
8484

8585
private SimilarityComputer<WeightedInput> similarityComputer(Double skipValue) {

algo/src/main/java/org/neo4j/graphalgo/similarity/EuclideanProc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Stream<SimilaritySummaryResult> euclidean(
8080
Stream<SimilarityResult> stream = generateWeightedStream(configuration, inputs, similarityCutoff, topN, topK, computer);
8181

8282
boolean write = configuration.isWriteFlag(false); // && similarityCutoff != 0.0;
83-
return writeAndAggregateResults(stream, inputs.length, write, writeRelationshipType, writeProperty);
83+
return writeAndAggregateResults(stream, inputs.length, configuration, write, writeRelationshipType, writeProperty );
8484
}
8585

8686
Stream<SimilarityResult> generateWeightedStream(ProcedureConfiguration configuration, WeightedInput[] inputs,

algo/src/main/java/org/neo4j/graphalgo/similarity/JaccardProc.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
*/
1919
package org.neo4j.graphalgo.similarity;
2020

21-
import org.HdrHistogram.DoubleHistogram;
2221
import org.neo4j.graphalgo.core.ProcedureConfiguration;
2322
import org.neo4j.procedure.*;
2423

2524
import java.util.*;
26-
import java.util.concurrent.atomic.AtomicLong;
2725
import java.util.stream.Stream;
2826

2927
import static org.neo4j.graphalgo.impl.util.TopKConsumer.topK;
@@ -69,7 +67,7 @@ public Stream<SimilaritySummaryResult> jaccard(
6967
similarityCutoff, getTopK(configuration)), getTopN(configuration));
7068

7169
boolean write = configuration.isWriteFlag(false) && similarityCutoff > 0.0;
72-
return writeAndAggregateResults(stream, inputs.length, write, writeRelationshipType, writeProperty);
70+
return writeAndAggregateResults(stream, inputs.length, configuration, write, writeRelationshipType, writeProperty );
7371
}
7472

7573
private SimilarityComputer<CategoricalInput> similarityComputer() {

algo/src/main/java/org/neo4j/graphalgo/similarity/OverlapProc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public Stream<SimilaritySummaryResult> overlap(
7373
Stream<SimilarityResult> stream = topN(similarityStream(inputs, computer, configuration, () -> null, similarityCutoff, getTopK(configuration)), getTopN(configuration));
7474

7575
boolean write = configuration.isWriteFlag(false) && similarityCutoff > 0.0;
76-
return writeAndAggregateResults(stream, inputs.length, write, writeRelationshipType, writeProperty);
76+
return writeAndAggregateResults(stream, inputs.length, configuration, write, writeRelationshipType, writeProperty);
7777
}
7878

7979

algo/src/main/java/org/neo4j/graphalgo/similarity/PearsonProc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Stream<SimilaritySummaryResult> pearson(
8080
Stream<SimilarityResult> stream = generateWeightedStream(configuration, inputs, similarityCutoff, topN, topK, computer);
8181

8282
boolean write = configuration.isWriteFlag(false) && similarityCutoff > 0.0;
83-
return writeAndAggregateResults(stream, inputs.length, write, writeRelationshipType, writeProperty);
83+
return writeAndAggregateResults(stream, inputs.length, configuration, write, writeRelationshipType, writeProperty);
8484
}
8585

8686
private SimilarityComputer<WeightedInput> similarityComputer(Double skipValue) {

algo/src/main/java/org/neo4j/graphalgo/impl/yens/SimilarityExporter.java renamed to algo/src/main/java/org/neo4j/graphalgo/similarity/SimilarityExporter.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,36 @@
11
/**
22
* Copyright (c) 2017 "Neo4j, Inc." <http://neo4j.com>
3-
*
3+
* <p>
44
* This file is part of Neo4j Graph Algorithms <http://github.com/neo4j-contrib/neo4j-graph-algorithms>.
5-
*
5+
* <p>
66
* Neo4j Graph Algorithms is free software: you can redistribute it and/or modify
77
* it under the terms of the GNU General Public License as published by
88
* the Free Software Foundation, either version 3 of the License, or
99
* (at your option) any later version.
10-
*
10+
* <p>
1111
* This program is distributed in the hope that it will be useful,
1212
* but WITHOUT ANY WARRANTY; without even the implied warranty of
1313
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1414
* GNU General Public License for more details.
15-
*
15+
* <p>
1616
* You should have received a copy of the GNU General Public License
1717
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1818
*/
19-
package org.neo4j.graphalgo.impl.yens;
19+
package org.neo4j.graphalgo.similarity;
2020

21-
import org.neo4j.graphalgo.similarity.SimilarityResult;
2221
import org.neo4j.graphalgo.core.utils.ExceptionUtil;
2322
import org.neo4j.graphalgo.core.utils.StatementApi;
23+
import org.neo4j.internal.kernel.api.exceptions.EntityNotFoundException;
24+
import org.neo4j.internal.kernel.api.exceptions.InvalidTransactionTypeKernelException;
2425
import org.neo4j.internal.kernel.api.exceptions.KernelException;
26+
import org.neo4j.internal.kernel.api.exceptions.explicitindex.AutoIndexingKernelException;
27+
import org.neo4j.kernel.api.KernelTransaction;
2528
import org.neo4j.kernel.internal.GraphDatabaseAPI;
2629
import org.neo4j.values.storable.Values;
2730

31+
import java.util.ArrayList;
32+
import java.util.Iterator;
33+
import java.util.List;
2834
import java.util.stream.Stream;
2935

3036
public class SimilarityExporter extends StatementApi {
@@ -40,20 +46,14 @@ public SimilarityExporter(GraphDatabaseAPI api,
4046
relationshipTypeId = getOrCreateRelationshipId(relationshipType);
4147
}
4248

43-
public void export(Stream<SimilarityResult> similarityPairs) {
44-
writeSequential(similarityPairs);
49+
public void export(Stream<SimilarityResult> similarityPairs, long batchSize) {
50+
writeSequential(similarityPairs, batchSize);
4551
}
4652

4753
private void export(SimilarityResult similarityResult) {
4854
applyInTransaction(statement -> {
49-
long node1 = similarityResult.item1;
50-
long node2 = similarityResult.item2;
51-
5255
try {
53-
long relationshipId = statement.dataWrite().relationshipCreate(node1, relationshipTypeId, node2);
54-
55-
statement.dataWrite().relationshipSetProperty(
56-
relationshipId, propertyId, Values.doubleValue(similarityResult.similarity));
56+
createRelationship(similarityResult, statement);
5757
} catch (KernelException e) {
5858
ExceptionUtil.throwKernelException(e);
5959
}
@@ -62,6 +62,29 @@ private void export(SimilarityResult similarityResult) {
6262

6363
}
6464

65+
private void export(List<SimilarityResult> similarityResults) {
66+
applyInTransaction(statement -> {
67+
for (SimilarityResult similarityResult : similarityResults) {
68+
try {
69+
createRelationship(similarityResult, statement);
70+
} catch (KernelException e) {
71+
ExceptionUtil.throwKernelException(e);
72+
}
73+
}
74+
return null;
75+
});
76+
77+
}
78+
79+
private void createRelationship(SimilarityResult similarityResult, KernelTransaction statement) throws EntityNotFoundException, InvalidTransactionTypeKernelException, AutoIndexingKernelException {
80+
long node1 = similarityResult.item1;
81+
long node2 = similarityResult.item2;
82+
long relationshipId = statement.dataWrite().relationshipCreate(node1, relationshipTypeId, node2);
83+
84+
statement.dataWrite().relationshipSetProperty(
85+
relationshipId, propertyId, Values.doubleValue(similarityResult.similarity));
86+
}
87+
6588
private int getOrCreateRelationshipId(String relationshipType) {
6689
return applyInTransaction(stmt -> stmt
6790
.tokenWrite()
@@ -74,8 +97,23 @@ private int getOrCreatePropertyId(String propertyName) {
7497
.propertyKeyGetOrCreateForName(propertyName));
7598
}
7699

77-
private void writeSequential(Stream<SimilarityResult> similarityPairs) {
78-
similarityPairs.forEach(this::export);
100+
private void writeSequential(Stream<SimilarityResult> similarityPairs, long batchSize) {
101+
if (batchSize == 1) {
102+
similarityPairs.forEach(this::export);
103+
} else {
104+
Iterator<SimilarityResult> iterator = similarityPairs.iterator();
105+
do {
106+
export(take(iterator, Math.toIntExact(batchSize)));
107+
} while (iterator.hasNext());
108+
}
109+
}
110+
111+
private static List<SimilarityResult> take(Iterator<SimilarityResult> iterator, int batchSize) {
112+
List<SimilarityResult> result = new ArrayList<>(batchSize);
113+
while (iterator.hasNext() && batchSize-- > 0) {
114+
result.add(iterator.next());
115+
}
116+
return result;
79117
}
80118

81119

algo/src/main/java/org/neo4j/graphalgo/similarity/SimilarityProc.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.neo4j.graphalgo.core.utils.QueueBasedSpliterator;
1313
import org.neo4j.graphalgo.core.utils.TerminationFlag;
1414
import org.neo4j.graphalgo.impl.util.TopKConsumer;
15-
import org.neo4j.graphalgo.impl.yens.SimilarityExporter;
1615
import org.neo4j.graphdb.Result;
1716
import org.neo4j.kernel.api.KernelTransaction;
1817
import org.neo4j.kernel.internal.GraphDatabaseAPI;
@@ -75,7 +74,12 @@ Long getDegreeCutoff(ProcedureConfiguration configuration) {
7574
return configuration.get("degreeCutoff", 0L);
7675
}
7776

78-
Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult> stream, int length, boolean write, String writeRelationshipType, String writeProperty) {
77+
Long getWriteBatchSize(ProcedureConfiguration configuration) {
78+
return configuration.get("writeBatchSize", 10000L);
79+
}
80+
81+
Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult> stream, int length, ProcedureConfiguration configuration, boolean write, String writeRelationshipType, String writeProperty) {
82+
long writeBatchSize = getWriteBatchSize(configuration);
7983
AtomicLong similarityPairs = new AtomicLong();
8084
DoubleHistogram histogram = new DoubleHistogram(5);
8185
Consumer<SimilarityResult> recorder = result -> {
@@ -85,7 +89,7 @@ Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult
8589

8690
if (write) {
8791
SimilarityExporter similarityExporter = new SimilarityExporter(api, writeRelationshipType, writeProperty);
88-
similarityExporter.export(stream.peek(recorder));
92+
similarityExporter.export(stream.peek(recorder), writeBatchSize);
8993
} else {
9094
stream.forEach(recorder);
9195
}

algo/src/main/java/org/neo4j/graphalgo/similarity/SimilarityResult.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import java.util.Objects;
2525

2626
public class SimilarityResult implements Comparable<SimilarityResult> {
27+
public final long item1;
2728
public final long item2;
2829
public final long count1;
29-
public final long item1;
3030
public final long count2;
3131
public final long intersection;
3232
public double similarity;
@@ -59,13 +59,14 @@ public boolean equals(Object o) {
5959
count1 == that.count1 &&
6060
count2 == that.count2 &&
6161
intersection == that.intersection &&
62-
Double.compare(that.similarity, similarity) == 0;
62+
Double.compare(that.similarity, similarity) == 0 &&
63+
bidirectional == that.bidirectional &&
64+
reversed == that.reversed;
6365
}
6466

6567
@Override
6668
public int hashCode() {
67-
68-
return Objects.hash(item1, item2, count1, count2, intersection, similarity);
69+
return Objects.hash(item1, item2, count1, count2, intersection, similarity, bidirectional, reversed);
6970
}
7071

7172
/**

doc/asciidoc/similarity-cosine.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ YIELD nodes, similarityPairs, write, writeRelationshipType, writeProperty, min,
284284
| `concurrency` | int | available CPUs | yes | The number of concurrent threads.
285285
| `graph` | string | dense | yes | The graph name ('dense' or 'cypher').
286286
| `write` | boolean | false | yes | Indicates whether results should be stored.
287+
| `writeBatchSize` | int | 10000 | yes | The batch size to use when storing results.
287288
| `writeRelationshipType` | string | SIMILAR | yes | The relationship type to use when storing results.
288289
| `writeProperty` | string | score | yes | The property to use when storing results.
289290
|===

doc/asciidoc/similarity-euclidean.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ YIELD nodes, similarityPairs, write, writeRelationshipType, writeProperty, min,
273273
| `concurrency` | int | available CPUs | yes | The number of concurrent threads.
274274
| `graph` | string | dense | yes | The graph name ('dense' or 'cypher').
275275
| `write` | boolean | false | yes | Indicates whether results should be stored.
276+
| `writeBatchSize` | int | 10000 | yes | The batch size to use when storing results.
276277
| `writeRelationshipType` | string | SIMILAR | yes | The relationship type to use when storing results.
277278
| `writeProperty` | string | score | yes | The property to use when storing results.
278279
|===

0 commit comments

Comments
 (0)