Skip to content

Commit b83865f

Browse files
authored
Optimize gRPC Log reporter to set service name for the first element (#206)
1 parent 88a7778 commit b83865f

File tree

6 files changed

+58
-47
lines changed

6 files changed

+58
-47
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Release Notes.
2424
* Add type name checking in ArgumentTypeNameMatch and ReturnTypeNameMatch
2525
* Highlight ArgumentTypeNameMatch and ReturnTypeNameMatch type naming rule in docs/en/setup/service-agent/java-agent/Java-Plugin-Development-Guide.md
2626
* Fix FileWriter scheduled task NPE
27+
* Optimize gRPC Log reporter to set service name for the first element in the streaming.(No change for Kafka reporter)
2728

2829
#### Documentation
2930

apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.grpc.Channel;
2222
import io.grpc.stub.StreamObserver;
2323
import java.util.List;
24-
2524
import java.util.Objects;
2625
import java.util.Properties;
2726
import java.util.concurrent.TimeUnit;
@@ -42,10 +41,10 @@
4241
import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;
4342

4443
@DefaultImplementor
45-
public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData> {
44+
public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData.Builder> {
4645
private static final ILog LOGGER = LogManager.getLogger(LogReportServiceClient.class);
4746

48-
private volatile DataCarrier<LogData> carrier;
47+
private volatile DataCarrier<LogData.Builder> carrier;
4948
private volatile GRPCChannelStatus status;
5049

5150
private volatile LogReportServiceGrpc.LogReportServiceStub logReportServiceStub;
@@ -70,7 +69,7 @@ public void onComplete() throws Throwable {
7069

7170
}
7271

73-
public void produce(LogData logData) {
72+
public void produce(LogData.Builder logData) {
7473
if (Objects.nonNull(logData) && !carrier.produce(logData)) {
7574
if (LOGGER.isDebugEnable()) {
7675
LOGGER.debug("One log has been abandoned, cause by buffer is full.");
@@ -84,7 +83,7 @@ public void init(final Properties properties) {
8483
}
8584

8685
@Override
87-
public void consume(final List<LogData> dataList) {
86+
public void consume(final List<LogData.Builder> dataList) {
8887
if (CollectionUtil.isEmpty(dataList)) {
8988
return;
9089
}
@@ -95,39 +94,47 @@ public void consume(final List<LogData> dataList) {
9594
StreamObserver<LogData> logDataStreamObserver = logReportServiceStub
9695
.withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
9796
.collect(
98-
new StreamObserver<Commands>() {
99-
@Override
100-
public void onNext(final Commands commands) {
101-
102-
}
103-
104-
@Override
105-
public void onError(final Throwable throwable) {
106-
status.finished();
107-
LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
108-
dataList.size()
109-
);
110-
ServiceManager.INSTANCE
111-
.findService(GRPCChannelManager.class)
112-
.reportError(throwable);
113-
}
114-
115-
@Override
116-
public void onCompleted() {
117-
status.finished();
118-
}
119-
});
120-
121-
for (final LogData logData : dataList) {
122-
logDataStreamObserver.onNext(logData);
97+
new StreamObserver<Commands>() {
98+
@Override
99+
public void onNext(final Commands commands) {
100+
101+
}
102+
103+
@Override
104+
public void onError(final Throwable throwable) {
105+
status.finished();
106+
LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
107+
dataList.size()
108+
);
109+
ServiceManager.INSTANCE
110+
.findService(GRPCChannelManager.class)
111+
.reportError(throwable);
112+
}
113+
114+
@Override
115+
public void onCompleted() {
116+
status.finished();
117+
}
118+
});
119+
120+
boolean isFirst = true;
121+
for (final LogData.Builder logData : dataList) {
122+
if (isFirst) {
123+
// Only set service name of the first element in one stream
124+
// https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
125+
// Log collecting protocol defines LogData#service is required in the first element only.
126+
logData.setService(Config.Agent.SERVICE_NAME);
127+
isFirst = false;
128+
}
129+
logDataStreamObserver.onNext(logData.build());
123130
}
124131
logDataStreamObserver.onCompleted();
125132
status.wait4Finish();
126133
}
127134
}
128135

129136
@Override
130-
public void onError(final List<LogData> data, final Throwable t) {
137+
public void onError(final List<LogData.Builder> data, final Throwable t) {
131138
LOGGER.error(t, "Try to consume {} log data to sender, with unexpected exception.", data.size());
132139
}
133140

apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,11 @@ public void handleMethodException(EnhancedInstance objInst, Method method, Objec
7474
*
7575
* @param appender the real {@link AppenderSkeleton appender}
7676
* @param event {@link LoggingEvent}
77-
* @return {@link LogData} with filtered trace context in order to reduce the cost on the network
77+
* @return {@link LogData.Builder} with filtered trace context in order to reduce the cost on the network
7878
*/
79-
private LogData transform(final AppenderSkeleton appender, LoggingEvent event) {
79+
private LogData.Builder transform(final AppenderSkeleton appender, LoggingEvent event) {
8080
LogData.Builder builder = LogData.newBuilder()
8181
.setTimestamp(event.getTimeStamp())
82-
.setService(Config.Agent.SERVICE_NAME)
8382
.setServiceInstance(Config.Agent.INSTANCE_NAME)
8483
.setTraceContext(TraceContext.newBuilder()
8584
.setTraceId(ContextManager.getGlobalTraceId())
@@ -102,12 +101,12 @@ private LogData transform(final AppenderSkeleton appender, LoggingEvent event) {
102101
builder.setEndpoint(primaryEndpointName);
103102
}
104103

105-
return -1 == ContextManager.getSpanId() ? builder.build()
104+
return -1 == ContextManager.getSpanId() ? builder
106105
: builder.setTraceContext(TraceContext.newBuilder()
107106
.setTraceId(ContextManager.getGlobalTraceId())
108107
.setSpanId(ContextManager.getSpanId())
109108
.setTraceSegmentId(ContextManager.getSegmentId())
110-
.build()).build();
109+
.build());
111110
}
112111

113112
private String transformLogText(final AppenderSkeleton appender, final LoggingEvent event) {

apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-2.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v2/x/log/GRPCLogAppenderInterceptor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void handleMethodException(EnhancedInstance objInst, Method method, Objec
8080
* @param event {@link LogEvent}
8181
* @return {@link LogData} with filtered trace context in order to reduce the cost on the network
8282
*/
83-
private LogData transform(final AbstractAppender appender, LogEvent event) {
83+
private LogData.Builder transform(final AbstractAppender appender, LogEvent event) {
8484
LogTags.Builder logTags = LogTags.newBuilder()
8585
.addData(KeyStringValuePair.newBuilder()
8686
.setKey("level").setValue(event.getLevel().toString()).build())
@@ -123,14 +123,14 @@ private LogData transform(final AbstractAppender appender, LogEvent event) {
123123
.setTraceId(context.getTraceId())
124124
.setSpanId(context.getSpanId())
125125
.setTraceSegmentId(context.getTraceSegmentId())
126-
.build()).build();
126+
.build());
127127
} else {
128-
return -1 == ContextManager.getSpanId() ? builder.build()
128+
return -1 == ContextManager.getSpanId() ? builder
129129
: builder.setTraceContext(TraceContext.newBuilder()
130130
.setTraceId(ContextManager.getGlobalTraceId())
131131
.setSpanId(ContextManager.getSpanId())
132132
.setTraceSegmentId(ContextManager.getSegmentId())
133-
.build()).build();
133+
.build());
134134
}
135135
}
136136

apm-sniffer/apm-toolkit-activation/apm-toolkit-logback-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/logback/v1/x/log/GRPCLogAppenderInterceptor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void handleMethodException(EnhancedInstance objInst, Method method, Objec
8080
* @param event {@link ILoggingEvent}
8181
* @return {@link LogData} with filtered trace context in order to reduce the cost on the network
8282
*/
83-
private LogData transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) {
83+
private LogData.Builder transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) {
8484
LogTags.Builder logTags = LogTags.newBuilder()
8585
.addData(KeyStringValuePair.newBuilder()
8686
.setKey("level").setValue(event.getLevel().toString()).build())
@@ -118,12 +118,12 @@ private LogData transform(final OutputStreamAppender<ILoggingEvent> appender, IL
118118
builder.setEndpoint(primaryEndpointName);
119119
}
120120

121-
return -1 == ContextManager.getSpanId() ? builder.build()
121+
return -1 == ContextManager.getSpanId() ? builder
122122
: builder.setTraceContext(TraceContext.newBuilder()
123123
.setTraceId(ContextManager.getGlobalTraceId())
124124
.setSpanId(ContextManager.getSpanId())
125125
.setTraceSegmentId(ContextManager.getSegmentId())
126-
.build()).build();
126+
.build());
127127
}
128128

129129
private String transformLogText(final OutputStreamAppender<ILoggingEvent> appender, final ILoggingEvent event) {

apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.utils.Bytes;
2424
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
2525
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
26+
import org.apache.skywalking.apm.agent.core.conf.Config;
2627
import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
2728
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
2829
import org.apache.skywalking.apm.network.logging.v3.LogData;
@@ -41,18 +42,21 @@ public void prepare() {
4142
}
4243

4344
@Override
44-
public void produce(final LogData logData) {
45+
public void produce(final LogData.Builder logData) {
4546
super.produce(logData);
4647
}
4748

4849
@Override
49-
public void consume(final List<LogData> dataList) {
50+
public void consume(final List<LogData.Builder> dataList) {
5051
if (producer == null || CollectionUtil.isEmpty(dataList)) {
5152
return;
5253
}
5354

54-
for (LogData data : dataList) {
55-
producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.toByteArray())));
55+
for (LogData.Builder data : dataList) {
56+
// Kafka Log reporter sends one log per time.
57+
// Every time, service name should be set to keep data integrity.
58+
data.setService(Config.Agent.SERVICE_NAME);
59+
producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.build().toByteArray())));
5660
}
5761
}
5862

0 commit comments

Comments
 (0)