Skip to content

Commit 6bd312b

Browse files
committed
Render streamed proto output in parallel
1 parent 24ada2c commit 6bd312b

File tree

1 file changed

+63
-2
lines changed

1 file changed

+63
-2
lines changed

src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
import com.google.devtools.build.lib.packages.LabelPrinter;
1717
import com.google.devtools.build.lib.packages.Target;
1818
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
19+
import com.google.devtools.build.lib.query2.proto.proto2api.Build;
20+
21+
import java.io.ByteArrayOutputStream;
1922
import java.io.IOException;
2023
import java.io.OutputStream;
24+
import java.util.stream.StreamSupport;
2125

2226
/**
2327
* An output formatter that outputs a protocol buffer representation of a query result and outputs
@@ -34,13 +38,70 @@ public String getName() {
3438
public OutputFormatterCallback<Target> createPostFactoStreamCallback(
3539
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
3640
return new OutputFormatterCallback<Target>() {
41+
private final LabelPrinter ourLabelPrinter = labelPrinter;
42+
3743
@Override
3844
public void processOutput(Iterable<Target> partialResult)
3945
throws IOException, InterruptedException {
40-
for (Target target : partialResult) {
41-
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out);
46+
try {
47+
StreamSupport.stream(partialResult.spliterator(), /* parallel= */true)
48+
.map(this::toProto)
49+
.map(StreamedProtoOutputFormatter::writeDelimited)
50+
.forEach(this::writeToOutputStreamThreadSafe);
51+
} catch (WrappedIOException e) {
52+
throw e.getCause();
53+
} catch (WrappedInterruptedException e) {
54+
throw e.getCause();
55+
}
56+
}
57+
58+
private Build.Target toProto(Target target) {
59+
try {
60+
return toTargetProtoBuffer(target, ourLabelPrinter);
61+
} catch (InterruptedException e) {
62+
throw new WrappedInterruptedException(e);
63+
}
64+
}
65+
66+
private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bout) {
67+
try {
68+
bout.writeTo(out);
69+
} catch (IOException e) {
70+
throw new RuntimeException(e);
4271
}
4372
}
4473
};
4574
}
75+
76+
private static ByteArrayOutputStream writeDelimited(Build.Target targetProtoBuffer) {
77+
try {
78+
var bout = new ByteArrayOutputStream(targetProtoBuffer.getSerializedSize() + 10);
79+
targetProtoBuffer.writeDelimitedTo(bout);
80+
return bout;
81+
} catch (IOException e) {
82+
throw new WrappedIOException(e);
83+
}
84+
}
85+
86+
private static class WrappedIOException extends RuntimeException {
87+
private WrappedIOException(IOException cause) {
88+
super(cause);
89+
}
90+
91+
@Override
92+
public synchronized IOException getCause() {
93+
return (IOException) super.getCause();
94+
}
95+
}
96+
97+
private static class WrappedInterruptedException extends RuntimeException {
98+
private WrappedInterruptedException(InterruptedException cause) {
99+
super(cause);
100+
}
101+
102+
@Override
103+
public synchronized InterruptedException getCause() {
104+
return (InterruptedException) super.getCause();
105+
}
106+
}
46107
}

0 commit comments

Comments
 (0)