How does thingsboard maintain equipment status

This paper is based on thingsboard-3.1 1 as an example

text

thingsboard records the online status of each device (including gateway) in memory. In the data attribute_kv table, the active field corresponds to the value of the online status of the device.

MqttTransportHandler handles mqtt messages in thingsboard. The underlying communication is implemented based on netty. Developers familiar with netty must be particularly familiar with ChannelInboundHandlerAdapter. Let's take a direct look at how MqttTransportHandler overloads the channelRead method, as shown below:

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.trace("[{}] Processing msg: {}", sessionId, msg);
        try {
            if (msg instanceof MqttMessage) {
                processMqttMsg(ctx, (MqttMessage) msg);
            } else {
                ctx.close();
            }
        } finally {
            ReferenceCountUtil.safeRelease(msg);
        }
    }
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
            processDisconnect(ctx);
            return;
        }
        deviceSessionCtx.setChannel(ctx);
        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx, msg)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
                    transportService.reportActivity(sessionInfo);
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx, msg)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;
        }
    }

From the above methods, you can see how thingsboard handles mqtt messages. It handles message types such as connect, publish and dusbsscribe. processConnect and processDisconnect handle device connection / disconnection. In processConnect, the online information of the device is created in memory, while processDisconnect is the opposite.

processConnect is to establish a connection, but to maintain the real-time connection status of the device, it is certainly not enough to only process connection messages. thingsboard also processes publish (attribute update and telemetry value upload) and other messages, and also updates the activity status of the device. For details, please refer to the TransportService.reportActivity method.

// Attribute update and telemetry value upload
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
        if (!checkConnected(ctx, mqttMsg)) {
            return;
        }
        String topicName = mqttMsg.variableHeader().topicName();
        int msgId = mqttMsg.variableHeader().packetId();
        log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);

        if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
            if (gatewaySessionHandler != null) {
                handleGatewayPublishMsg(topicName, msgId, mqttMsg);
                transportService.reportActivity(sessionInfo);
            }
        } else {
            processDevicePublish(ctx, mqttMsg, topicName, msgId);
        }
    }
@Override
    public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
        reportActivityInternal(sessionInfo);
    }

    private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
        UUID sessionId = toSessionId(sessionInfo);
        SessionMetaData sessionMetaData = sessions.get(sessionId);
        if (sessionMetaData != null) {
            sessionMetaData.updateLastActivityTime();
        }
        return sessionMetaData;
    }

You can see every device (upload data through the device itself or through the gateway) will update the last active time field of the device. I haven't seen here how thingsboard cleans up expired connections except when the device actively closes the connection. Next is the protagonist of this scene: DefaultTransportService.checkInactivityAndReportActivity method:

    private void checkInactivityAndReportActivity() {
        long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
        sessions.forEach((uuid, sessionMD) -> {
            long lastActivityTime = sessionMD.getLastActivityTime();
            TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
            if (sessionInfo.getGwSessionIdMSB() > 0 &&
                    sessionInfo.getGwSessionIdLSB() > 0) {
                SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
                if (gwMetaData != null) {
                    lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
                }
            }
            
            if (lastActivityTime < expTime) {
                // There is no conversation for a long time, and the session expires
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
                }
                process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
                sessions.remove(uuid);
                sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
            } else {
                if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
                    final long lastActivityTimeFinal = lastActivityTime;
                    process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
                            .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
                            .setRpcSubscription(sessionMD.isSubscribedToRPC())
                            .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
                        @Override
                        public void onSuccess(Void msg) {
                            sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
                        }

                        @Override
                        public void onError(Throwable e) {
                            log.warn("[{}] Failed to report last activity time", uuid, e);
                        }
                    });
                }
            }
        });
    }

The checkInactivityAndReportActivity method is a scheduled detection task started when the DefaultTransportService is created,

private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();

Sessions is a ConcurrentMap object with the device id as the primary key. This method will scan the session data in sessions regularly. If there is no session with thingsboard for a long time, the session connection with the device will be closed and the session data saved in memory will be cleared.

Keywords: thingsboard

Added by nesargha on Wed, 15 Dec 2021 02:33:50 +0200