order
This paper mainly studies grpcstream service status of skywalking
GRPCStreamServiceStatus
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.java
public class GRPCStreamServiceStatus { private static final ILog logger = LogManager.getLogger(GRPCStreamServiceStatus.class); private volatile boolean status; public GRPCStreamServiceStatus(boolean status) { this.status = status; } public boolean isStatus() { return status; } public void finished() { this.status = true; } /** * @param maxTimeout max wait time, milliseconds. */ public boolean wait4Finish(long maxTimeout) { long time = 0; while (!status) { if (time > maxTimeout) { break; } try2Sleep(5); time += 5; } return status; } /** * Wait until success status reported. */ public void wait4Finish() { long recheckCycle = 5; long hasWaited = 0L; long maxCycle = 30 * 1000L;// 30 seconds max. while (!status) { try2Sleep(recheckCycle); hasWaited += recheckCycle; if (recheckCycle >= maxCycle) { logger.warn("Collector traceSegment service doesn't response in {} seconds.", hasWaited / 1000); } else { recheckCycle = recheckCycle * 2 > maxCycle ? maxCycle : recheckCycle * 2; } } } /** * Try to sleep, and ignore the {@link InterruptedException} * * @param millis the length of time to sleep in milliseconds */ private void try2Sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { } } }
- GRPCStreamServiceStatus provides the finished method to set the status to true; it also provides the wait4Finish method, which will wait for the status to change to true all the time, and print the warn log when the recheckCycle is greater than or equal to maxCycle. The log format is Collector traceSegment service doesn't response in {} seconds. The parameter value is hasWaited / 1000
TraceSegmentServiceClient
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@DefaultImplementor public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener { //...... @Override public void consume(List<TraceSegment> data) { if (CONNECTED.equals(status)) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { @Override public void onNext(Commands commands) { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } @Override public void onError(Throwable throwable) { status.finished(); if (logger.isErrorEnable()) { logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception."); } ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable); } @Override public void onCompleted() { status.finished(); } }); try { for (TraceSegment segment : data) { UpstreamSegment upstreamSegment = segment.transform(); upstreamSegmentStreamObserver.onNext(upstreamSegment); } } catch (Throwable t) { logger.error(t, "Transform and send UpstreamSegment to collector fail."); } upstreamSegmentStreamObserver.onCompleted(); status.wait4Finish(); segmentUplinkedCounter += data.size(); } else { segmentAbandonedCounter += data.size(); } printUplinkStatus(); } private void printUplinkStatus() { long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis - lastLogTime > 30 * 1000) { lastLogTime = currentTimeMillis; if (segmentUplinkedCounter > 0) { logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter); segmentUplinkedCounter = 0; } if (segmentAbandonedCounter > 0) { logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter); segmentAbandonedCounter = 0; } } } //...... }
- TraceSegmentServiceClient's consume method sends tracesegement through upstreamstreamobserver. The value of withDeadlineAfter of the upstreamstreamobserver is grpc ﹣ upstream ﹣ timeout, which is 30 seconds by default. Its onCompleted method of the streamstreamservicestatus will call GRPCStreamServiceStatus.finished method. The consume method finally executes printUplinkStatus method and prints tra Sending or discarding information of CE segments
Summary
GRPCStreamServiceStatus provides the finished method to set the status to true; it also provides the wait4Finish method, which will wait for the status to become true all the time and print the warn log when the recheckCycle is greater than or equal to maxCycle