Talk about GRPCStreamServiceStatus of skywalking

order

This paper mainly studies grpcstream service status of skywalking

GRPCStreamServiceStatus

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.java

public class GRPCStreamServiceStatus {
    private static final ILog logger = LogManager.getLogger(GRPCStreamServiceStatus.class);
    private volatile boolean status;

    public GRPCStreamServiceStatus(boolean status) {
        this.status = status;
    }

    public boolean isStatus() {
        return status;
    }

    public void finished() {
        this.status = true;
    }

    /**
     * @param maxTimeout max wait time, milliseconds.
     */
    public boolean wait4Finish(long maxTimeout) {
        long time = 0;
        while (!status) {
            if (time > maxTimeout) {
                break;
            }
            try2Sleep(5);
            time += 5;
        }
        return status;
    }

    /**
     * Wait until success status reported.
     */
    public void wait4Finish() {
        long recheckCycle = 5;
        long hasWaited = 0L;
        long maxCycle = 30 * 1000L;// 30 seconds max.
        while (!status) {
            try2Sleep(recheckCycle);
            hasWaited += recheckCycle;

            if (recheckCycle >= maxCycle) {
                logger.warn("Collector traceSegment service doesn't response in {} seconds.", hasWaited / 1000);
            } else {
                recheckCycle = recheckCycle * 2 > maxCycle ? maxCycle : recheckCycle * 2;
            }
        }
    }

    /**
     * Try to sleep, and ignore the {@link InterruptedException}
     *
     * @param millis the length of time to sleep in milliseconds
     */
    private void try2Sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {

        }
    }
}
  • GRPCStreamServiceStatus provides the finished method to set the status to true; it also provides the wait4Finish method, which will wait for the status to change to true all the time, and print the warn log when the recheckCycle is greater than or equal to maxCycle. The log format is Collector traceSegment service doesn't response in {} seconds. The parameter value is hasWaited / 1000

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {

	//......

    @Override
    public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
                @Override
                public void onNext(Commands commands) {
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                }

                @Override
                public void onError(Throwable throwable) {
                    status.finished();
                    if (logger.isErrorEnable()) {
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
                    }
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });

            try {
                for (TraceSegment segment : data) {
                    UpstreamSegment upstreamSegment = segment.transform();
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                }
            } catch (Throwable t) {
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");
            }

            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }

        printUplinkStatus();
    }

    private void printUplinkStatus() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - lastLogTime > 30 * 1000) {
            lastLogTime = currentTimeMillis;
            if (segmentUplinkedCounter > 0) {
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);
                segmentUplinkedCounter = 0;
            }
            if (segmentAbandonedCounter > 0) {
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);
                segmentAbandonedCounter = 0;
            }
        }
    }

	//......

}
  • TraceSegmentServiceClient's consume method sends tracesegement through upstreamstreamobserver. The value of withDeadlineAfter of the upstreamstreamobserver is grpc ﹣ upstream ﹣ timeout, which is 30 seconds by default. Its onCompleted method of the streamstreamservicestatus will call GRPCStreamServiceStatus.finished method. The consume method finally executes printUplinkStatus method and prints tra Sending or discarding information of CE segments

Summary

GRPCStreamServiceStatus provides the finished method to set the status to true; it also provides the wait4Finish method, which will wait for the status to become true all the time and print the warn log when the recheckCycle is greater than or equal to maxCycle

doc

Keywords: Programming Java Apache

Added by s_dhumal on Sat, 28 Mar 2020 16:38:26 +0200