Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit 99d3884

Browse files
committed
[WIP] Degree Centrality (#830)
* uncomment test * For Cypher loading we will always treat the graph as outgoing, and let the user handle the actual direction in the query * deduplicate nodes in Cypher loading land * deduplicate nodes in Graph View land * tests for new custom counting * degree centrality proc * Integration test * Integration test for weighted * tests for outgoing * use the proper threading approach * use the proper threading approach for weighted * Partition Degree Centrality computation * weighted degree in the same class * remove unused code * put the direction logic into the loader * handle Cypher loading * use the sum strategy for accumulating weight degrees * use the correct duplicates strategy in these tests
1 parent b349844 commit 99d3884

28 files changed

+1889
-192
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/**
2+
* Copyright (c) 2017 "Neo4j, Inc." <http://neo4j.com>
3+
*
4+
* This file is part of Neo4j Graph Algorithms <http://github.com/neo4j-contrib/neo4j-graph-algorithms>.
5+
*
6+
* Neo4j Graph Algorithms is free software: you can redistribute it and/or modify
7+
* it under the terms of the GNU General Public License as published by
8+
* the Free Software Foundation, either version 3 of the License, or
9+
* (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
* GNU General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU General Public License
17+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*/
19+
package org.neo4j.graphalgo;
20+
21+
import org.neo4j.graphalgo.api.Graph;
22+
import org.neo4j.graphalgo.api.GraphFactory;
23+
import org.neo4j.graphalgo.api.HugeGraph;
24+
import org.neo4j.graphalgo.core.GraphLoader;
25+
import org.neo4j.graphalgo.core.ProcedureConfiguration;
26+
import org.neo4j.graphalgo.core.ProcedureConstants;
27+
import org.neo4j.graphalgo.core.utils.Pools;
28+
import org.neo4j.graphalgo.core.utils.ProgressTimer;
29+
import org.neo4j.graphalgo.core.utils.TerminationFlag;
30+
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;
31+
import org.neo4j.graphalgo.core.write.Exporter;
32+
import org.neo4j.graphalgo.impl.Algorithm;
33+
import org.neo4j.graphalgo.impl.degree.DegreeCentrality;
34+
import org.neo4j.graphalgo.impl.results.CentralityResult;
35+
import org.neo4j.graphalgo.impl.pagerank.DegreeCentralityAlgorithm;
36+
import org.neo4j.graphalgo.results.DegreeCentralityScore;
37+
import org.neo4j.graphdb.Direction;
38+
import org.neo4j.kernel.api.KernelTransaction;
39+
import org.neo4j.kernel.internal.GraphDatabaseAPI;
40+
import org.neo4j.logging.Log;
41+
import org.neo4j.procedure.*;
42+
43+
import java.util.Map;
44+
import java.util.stream.IntStream;
45+
import java.util.stream.LongStream;
46+
import java.util.stream.Stream;
47+
48+
import static org.neo4j.graphalgo.core.ProcedureConstants.CYPHER_QUERY;
49+
50+
public final class DegreeCentralityProc {
51+
52+
public static final String DEFAULT_SCORE_PROPERTY = "degree";
53+
public static final String CONFIG_WEIGHT_KEY = "weightProperty";
54+
55+
@Context
56+
public GraphDatabaseAPI api;
57+
58+
@Context
59+
public Log log;
60+
61+
@Context
62+
public KernelTransaction transaction;
63+
64+
@Procedure(value = "algo.degree", mode = Mode.WRITE)
65+
@Description("CALL algo.degree(label:String, relationship:String, " +
66+
"{ weightProperty: null, write: true, writeProperty:'degree', concurrency:4}) " +
67+
"YIELD nodes, iterations, loadMillis, computeMillis, writeMillis, dampingFactor, write, writeProperty" +
68+
" - calculates page rank and potentially writes back")
69+
public Stream<DegreeCentralityScore.Stats> degree(
70+
@Name(value = "label", defaultValue = "") String label,
71+
@Name(value = "relationship", defaultValue = "") String relationship,
72+
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
73+
74+
ProcedureConfiguration configuration = ProcedureConfiguration.create(config);
75+
final String weightPropertyKey = configuration.getString(CONFIG_WEIGHT_KEY, null);
76+
77+
DegreeCentralityScore.Stats.Builder statsBuilder = new DegreeCentralityScore.Stats.Builder();
78+
AllocationTracker tracker = AllocationTracker.create();
79+
Direction direction = getDirection(configuration);
80+
final Graph graph = load(label, relationship, tracker, configuration.getGraphImpl(), statsBuilder, configuration, weightPropertyKey, direction);
81+
82+
if(graph.nodeCount() == 0) {
83+
graph.release();
84+
return Stream.of(statsBuilder.build());
85+
}
86+
87+
TerminationFlag terminationFlag = TerminationFlag.wrap(transaction);
88+
CentralityResult scores = evaluate(graph, tracker, terminationFlag, configuration, statsBuilder, weightPropertyKey, direction);
89+
90+
logMemoryUsage(tracker);
91+
92+
write(graph, terminationFlag, scores, configuration, statsBuilder);
93+
94+
return Stream.of(statsBuilder.build());
95+
}
96+
97+
private Direction getDirection(ProcedureConfiguration configuration) {
98+
String graphName = configuration.getGraphName(ProcedureConstants.DEFAULT_GRAPH_IMPL);
99+
Direction direction = configuration.getDirection(Direction.INCOMING);
100+
return CYPHER_QUERY.equals(graphName) ? Direction.OUTGOING : direction;
101+
}
102+
103+
@Procedure(value = "algo.degree.stream", mode = Mode.READ)
104+
@Description("CALL algo.degree.stream(label:String, relationship:String, " +
105+
"{weightProperty: null, concurrency:4}) " +
106+
"YIELD node, score - calculates page rank and streams results")
107+
public Stream<DegreeCentralityScore> degreeStream(
108+
@Name(value = "label", defaultValue = "") String label,
109+
@Name(value = "relationship", defaultValue = "") String relationship,
110+
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
111+
112+
ProcedureConfiguration configuration = ProcedureConfiguration.create(config);
113+
114+
final String weightPropertyKey = configuration.getString(CONFIG_WEIGHT_KEY, null);
115+
116+
DegreeCentralityScore.Stats.Builder statsBuilder = new DegreeCentralityScore.Stats.Builder();
117+
Direction direction = getDirection(configuration);
118+
AllocationTracker tracker = AllocationTracker.create();
119+
final Graph graph = load(label, relationship, tracker, configuration.getGraphImpl(), statsBuilder, configuration, weightPropertyKey, direction);
120+
121+
if(graph.nodeCount() == 0) {
122+
graph.release();
123+
return Stream.empty();
124+
}
125+
126+
TerminationFlag terminationFlag = TerminationFlag.wrap(transaction);
127+
CentralityResult scores = evaluate(graph, tracker, terminationFlag, configuration, statsBuilder, weightPropertyKey, direction);
128+
129+
logMemoryUsage(tracker);
130+
131+
if (graph instanceof HugeGraph) {
132+
HugeGraph hugeGraph = (HugeGraph) graph;
133+
return LongStream.range(0, hugeGraph.nodeCount())
134+
.mapToObj(i -> {
135+
final long nodeId = hugeGraph.toOriginalNodeId(i);
136+
return new DegreeCentralityScore(
137+
nodeId,
138+
scores.score(i)
139+
);
140+
});
141+
}
142+
143+
return IntStream.range(0, Math.toIntExact(graph.nodeCount()))
144+
.mapToObj(i -> {
145+
final long nodeId = graph.toOriginalNodeId(i);
146+
return new DegreeCentralityScore(
147+
nodeId,
148+
scores.score(i)
149+
);
150+
});
151+
}
152+
153+
private void logMemoryUsage(AllocationTracker tracker) {
154+
log.info("Degree Centrality: overall memory usage: %s", tracker.getUsageString());
155+
}
156+
157+
private Graph load(
158+
String label,
159+
String relationship,
160+
AllocationTracker tracker,
161+
Class<? extends GraphFactory> graphFactory,
162+
DegreeCentralityScore.Stats.Builder statsBuilder,
163+
ProcedureConfiguration configuration,
164+
String weightPropertyKey, Direction direction) {
165+
GraphLoader graphLoader = new GraphLoader(api, Pools.DEFAULT)
166+
.init(log, label, relationship, configuration)
167+
.withAllocationTracker(tracker)
168+
.withOptionalRelationshipWeightsFromProperty(weightPropertyKey, configuration.getWeightPropertyDefaultValue(0.0));
169+
170+
graphLoader.direction(direction);
171+
172+
try (ProgressTimer timer = statsBuilder.timeLoad()) {
173+
Graph graph = graphLoader.load(graphFactory);
174+
statsBuilder.withNodes(graph.nodeCount());
175+
return graph;
176+
}
177+
}
178+
179+
private CentralityResult evaluate(
180+
Graph graph,
181+
AllocationTracker tracker,
182+
TerminationFlag terminationFlag,
183+
ProcedureConfiguration configuration,
184+
DegreeCentralityScore.Stats.Builder statsBuilder,
185+
String weightPropertyKey, Direction direction) {
186+
187+
final int concurrency = configuration.getConcurrency(Pools.getNoThreadsInDefaultPool());
188+
189+
if (direction == Direction.BOTH) {
190+
direction = Direction.OUTGOING;
191+
}
192+
193+
DegreeCentralityAlgorithm algo = new DegreeCentrality(graph, Pools.DEFAULT, concurrency, direction, weightPropertyKey != null);
194+
statsBuilder.timeEval(algo::compute);
195+
Algorithm<?> algorithm = algo.algorithm();
196+
algorithm.withTerminationFlag(terminationFlag);
197+
198+
final CentralityResult pageRank = algo.result();
199+
algo.algorithm().release();
200+
graph.release();
201+
return pageRank;
202+
}
203+
204+
private void write(
205+
Graph graph,
206+
TerminationFlag terminationFlag,
207+
CentralityResult result,
208+
ProcedureConfiguration configuration,
209+
final DegreeCentralityScore.Stats.Builder statsBuilder) {
210+
if (configuration.isWriteFlag(true)) {
211+
log.debug("Writing results");
212+
String propertyName = configuration.getWriteProperty(DEFAULT_SCORE_PROPERTY);
213+
try (ProgressTimer timer = statsBuilder.timeWrite()) {
214+
Exporter exporter = Exporter
215+
.of(api, graph)
216+
.withLog(log)
217+
.parallel(Pools.DEFAULT, configuration.getConcurrency(), terminationFlag)
218+
.build();
219+
result.export(propertyName, exporter);
220+
}
221+
statsBuilder
222+
.withWrite(true)
223+
.withProperty(propertyName);
224+
} else {
225+
statsBuilder.withWrite(false);
226+
}
227+
}
228+
229+
230+
}

algo/src/main/java/org/neo4j/graphalgo/impl/DegreeCentrality.java

Lines changed: 0 additions & 127 deletions
This file was deleted.

0 commit comments

Comments
 (0)