Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit 7914cd5

Browse files
committed
return write duration
1 parent 1207fbb commit 7914cd5

File tree

3 files changed

+120
-50
lines changed

3 files changed

+120
-50
lines changed

algo/src/main/java/org/neo4j/graphalgo/similarity/ParallelSimilarityExporter.java

+31-37
Original file line numberDiff line numberDiff line change
@@ -94,51 +94,45 @@ public void export(Stream<SimilarityResult> similarityPairs, long batchSize) {
9494

9595
ArrayBlockingQueue<List<SimilarityResult>> outQueue = new ArrayBlockingQueue<>(queueSize);
9696

97-
ExecutorService executor = Executors.newFixedThreadPool(1);
98-
Future<Integer> inQueueBatchCountFuture = executor.submit(() -> {
99-
AtomicInteger inQueueBatchCount = new AtomicInteger(0);
100-
stream.parallel().forEach(partition -> {
101-
IntSet nodesInPartition = new IntHashSet();
102-
for (DisjointSetStruct.InternalResult internalResult : partition) {
103-
nodesInPartition.add(internalResult.internalNodeId);
104-
}
10597

106-
List<SimilarityResult> inPartition = new ArrayList<>();
107-
List<SimilarityResult> outPartition = new ArrayList<>();
98+
AtomicInteger inQueueBatchCount = new AtomicInteger(0);
99+
stream.parallel().forEach(partition -> {
100+
IntSet nodesInPartition = new IntHashSet();
101+
for (DisjointSetStruct.InternalResult internalResult : partition) {
102+
nodesInPartition.add(internalResult.internalNodeId);
103+
}
108104

109-
for (DisjointSetStruct.InternalResult result : partition) {
110-
int nodeId = result.internalNodeId;
111-
graph.forEachRelationship(nodeId, Direction.OUTGOING, (sourceNodeId, targetNodeId, relationId, weight) -> {
112-
SimilarityResult similarityRelationship = new SimilarityResult(idMap.toOriginalNodeId(sourceNodeId), idMap.toOriginalNodeId(targetNodeId), -1, -1, -1, weight);
105+
List<SimilarityResult> inPartition = new ArrayList<>();
106+
List<SimilarityResult> outPartition = new ArrayList<>();
113107

114-
if (nodesInPartition.contains(targetNodeId)) {
115-
inPartition.add(similarityRelationship);
116-
} else {
117-
outPartition.add(similarityRelationship);
118-
}
108+
for (DisjointSetStruct.InternalResult result : partition) {
109+
int nodeId = result.internalNodeId;
110+
graph.forEachRelationship(nodeId, Direction.OUTGOING, (sourceNodeId, targetNodeId, relationId, weight) -> {
111+
SimilarityResult similarityRelationship = new SimilarityResult(idMap.toOriginalNodeId(sourceNodeId),
112+
idMap.toOriginalNodeId(targetNodeId), -1, -1, -1, weight);
119113

120-
return false;
121-
});
122-
}
114+
if (nodesInPartition.contains(targetNodeId)) {
115+
inPartition.add(similarityRelationship);
116+
} else {
117+
outPartition.add(similarityRelationship);
118+
}
123119

124-
if (!inPartition.isEmpty()) {
125-
int inQueueBatches = writeSequential(inPartition.stream(), batchSize);
126-
inQueueBatchCount.addAndGet(inQueueBatches);
127-
}
120+
return false;
121+
});
122+
}
128123

129-
if (!outPartition.isEmpty()) {
130-
put(outQueue, outPartition);
131-
}
132-
});
133-
return inQueueBatchCount.get();
124+
if (!inPartition.isEmpty()) {
125+
int inQueueBatches = writeSequential(inPartition.stream(), batchSize);
126+
inQueueBatchCount.addAndGet(inQueueBatches);
127+
}
128+
129+
if (!outPartition.isEmpty()) {
130+
put(outQueue, outPartition);
131+
}
134132
});
135133

136-
Integer inQueueBatches = null;
137-
try {
138-
inQueueBatches = inQueueBatchCountFuture.get();
139-
} catch (InterruptedException | ExecutionException e) {
140-
e.printStackTrace();
141-
}
134+
135+
int inQueueBatches = inQueueBatchCount.get();
142136

143137

144138
int outQueueBatches = writeSequential(outQueue.stream().flatMap(Collection::stream), batchSize);

algo/src/main/java/org/neo4j/graphalgo/similarity/SimilarityProc.java

+83-10
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@
77
import org.HdrHistogram.DoubleHistogram;
88
import org.neo4j.graphalgo.core.ProcedureConfiguration;
99
import org.neo4j.graphalgo.core.ProcedureConstants;
10-
import org.neo4j.graphalgo.core.utils.ParallelUtil;
11-
import org.neo4j.graphalgo.core.utils.Pools;
12-
import org.neo4j.graphalgo.core.utils.QueueBasedSpliterator;
13-
import org.neo4j.graphalgo.core.utils.TerminationFlag;
10+
import org.neo4j.graphalgo.core.utils.*;
1411
import org.neo4j.graphalgo.impl.util.TopKConsumer;
1512
import org.neo4j.graphdb.Result;
1613
import org.neo4j.kernel.api.KernelTransaction;
@@ -77,7 +74,76 @@ Long getWriteBatchSize(ProcedureConfiguration configuration) {
7774
return configuration.get("writeBatchSize", 10000L);
7875
}
7976

77+
78+
public class SimilarityResultBuilder {
79+
protected long writeDuration = -1;
80+
protected boolean write = false;
81+
private int nodes;
82+
private String writeRelationshipType;
83+
private String writeProperty;
84+
private AtomicLong similarityPairs;
85+
private DoubleHistogram histogram;
86+
87+
88+
public SimilarityResultBuilder withWriteDuration(long writeDuration) {
89+
this.writeDuration = writeDuration;
90+
return this;
91+
}
92+
93+
public SimilarityResultBuilder withWrite(boolean write) {
94+
this.write = write;
95+
return this;
96+
}
97+
98+
/**
99+
* returns an AutoClosable which measures the time
100+
* until it gets closed. Saves the duration as writeMillis
101+
*
102+
* @return
103+
*/
104+
public ProgressTimer timeWrite() {
105+
return ProgressTimer.start(this::withWriteDuration);
106+
}
107+
108+
public SimilaritySummaryResult build() {
109+
return SimilaritySummaryResult.from(nodes, similarityPairs, writeRelationshipType, writeProperty, write, histogram, writeDuration);
110+
}
111+
112+
public SimilarityResultBuilder nodes(int nodes) {
113+
this.nodes = nodes;
114+
return this;
115+
}
116+
117+
public SimilarityResultBuilder write(boolean write) {
118+
this.write = write;
119+
return this;
120+
}
121+
122+
public SimilarityResultBuilder writeRelationshipType(String writeRelationshipType) {
123+
this.writeRelationshipType = writeRelationshipType;
124+
return this;
125+
}
126+
127+
public SimilarityResultBuilder writeProperty(String writeProperty) {
128+
this.writeProperty = writeProperty;
129+
return this;
130+
}
131+
132+
public SimilarityResultBuilder similarityPairs(AtomicLong similarityPairs) {
133+
this.similarityPairs = similarityPairs;
134+
return this;
135+
}
136+
137+
public SimilarityResultBuilder histogram(DoubleHistogram histogram) {
138+
this.histogram = histogram;
139+
return this;
140+
}
141+
}
142+
80143
Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult> stream, int length, ProcedureConfiguration configuration, boolean write, String writeRelationshipType, String writeProperty, boolean writeParallel) {
144+
SimilarityResultBuilder builder = new SimilarityResultBuilder();
145+
builder.nodes(length).write(write).writeRelationshipType(writeRelationshipType).writeProperty(writeProperty);
146+
81147
long writeBatchSize = getWriteBatchSize(configuration);
82148
AtomicLong similarityPairs = new AtomicLong();
83149
DoubleHistogram histogram = new DoubleHistogram(5);
@@ -88,24 +154,31 @@ Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult
88154

89155
if (write) {
90156
if (writeParallel) {
91-
ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter(api, log, writeRelationshipType, writeProperty, length);
92-
parallelSimilarityExporter.export(stream.peek(recorder), writeBatchSize);
157+
try (ProgressTimer timer = builder.timeWrite()) {
158+
ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter(api, log, writeRelationshipType, writeProperty, length);
159+
parallelSimilarityExporter.export(stream.peek(recorder), writeBatchSize);
160+
}
93161

94162
} else {
95-
SimilarityExporter similarityExporter = new SimilarityExporter(api, log, writeRelationshipType, writeProperty);
96-
similarityExporter.export(stream.peek(recorder), writeBatchSize);
163+
try (ProgressTimer timer = builder.timeWrite()) {
164+
SimilarityExporter similarityExporter = new SimilarityExporter(api, log, writeRelationshipType, writeProperty);
165+
similarityExporter.export(stream.peek(recorder), writeBatchSize);
166+
}
97167
}
98168

99169
} else {
100170
stream.forEach(recorder);
101171
}
102172

103-
return Stream.of(SimilaritySummaryResult.from(length, similarityPairs, writeRelationshipType, writeProperty, write, histogram));
173+
builder.similarityPairs(similarityPairs).histogram(histogram);
174+
return Stream.of(builder.build());
175+
176+
// return Stream.of(SimilaritySummaryResult.from(length, similarityPairs, writeRelationshipType, writeProperty, write, histogram));
104177
}
105178

106179
Stream<SimilaritySummaryResult> emptyStream(String writeRelationshipType, String writeProperty) {
107180
return Stream.of(SimilaritySummaryResult.from(0, new AtomicLong(0), writeRelationshipType,
108-
writeProperty, false, new DoubleHistogram(5)));
181+
writeProperty, false, new DoubleHistogram(5), -1));
109182
}
110183

111184
Double getSimilarityCutoff(ProcedureConfiguration configuration) {

algo/src/main/java/org/neo4j/graphalgo/similarity/SimilaritySummaryResult.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
public class SimilaritySummaryResult {
2626

27+
public final long writeDuration;
2728
public final long nodes;
2829
public final long similarityPairs;
2930
public final boolean write;
@@ -46,7 +47,7 @@ public SimilaritySummaryResult(long nodes, long similarityPairs,
4647
boolean write, String writeRelationshipType, String writeProperty,
4748
double min, double max, double mean, double stdDev,
4849
double p25, double p50, double p75, double p90, double p95,
49-
double p99, double p999, double p100) {
50+
double p99, double p999, double p100, long writeDuration) {
5051
this.nodes = nodes;
5152
this.similarityPairs = similarityPairs;
5253
this.write = write;
@@ -64,9 +65,10 @@ public SimilaritySummaryResult(long nodes, long similarityPairs,
6465
this.p99 = p99;
6566
this.p999 = p999;
6667
this.p100 = p100;
68+
this.writeDuration = writeDuration;
6769
}
6870

69-
static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, String writeRelationshipType, String writeProperty, boolean write, DoubleHistogram histogram) {
71+
static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, String writeRelationshipType, String writeProperty, boolean write, DoubleHistogram histogram, long writeDuration) {
7072
return new SimilaritySummaryResult(
7173
length,
7274
similarityPairs.get(),
@@ -84,7 +86,8 @@ static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, Str
8486
histogram.getValueAtPercentile(95D),
8587
histogram.getValueAtPercentile(99D),
8688
histogram.getValueAtPercentile(99.9D),
87-
histogram.getValueAtPercentile(100D)
89+
histogram.getValueAtPercentile(100D),
90+
writeDuration
8891
);
8992
}
9093
}

0 commit comments

Comments
 (0)