Reasons for Failure of MQTT Big Message

Background

The MQTT protocol was used to build a chat server in the group. The connection became unavailable the day before yesterday when the big news (more than 5,000 Chinese characters) was tested. No reply was received for all the messages sent subsequently.

Server environment:
Netty : 4.1.32.Final
Use the MqttDecoder that comes with Netty packages

Client: Android

Investigation process

  1. Since all messages are printed in the log, we first searched the server log and found that there was no message content sent in the log.
  2. Is it that the client did not send a very long message? Using tcpdump to catch the package, we found that the client sent it normally, and all the package servers had ack, but the follow-up server did not send back the response, guessing that the server failed in the case of large messages.
    1. Tcpdump prints ip and port using-nn, and-X prints the contents of network packages. It can also be saved in files using-w option, and then analyzed using tcpdump or wireshare.
  3. So we looked up the maximum payload supported by MQTT. MQTT official document It says 256M, and this size will certainly not exceed.
  4. Grabbed the package at the server and confirmed that the message had been received, but no confirmation message was returned.
  5. Open debug online and find that a PUBLISH type message has been received, but the message class is not MqttPublishMessage, and there is no data in payload, but there is an error message too large in Message: 56234 bytes
  6. Google, some netizens met The same question Although MQTT is C language in this problem.
  7. Looking at MqttDecoder, we find that decoder has a maximum payload limit (part of the code below). The default constructor is called in the startup code, so the default maximum data is 8092 bytes.
public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
    public MqttDecoder() {
      this(DEFAULT_MAX_BYTES_IN_MESSAGE);
    }

    public MqttDecoder(int maxBytesInMessage) {
        super(DecoderState.READ_FIXED_HEADER);
        this.maxBytesInMessage = maxBytesInMessage;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        switch (state()) {
            case READ_FIXED_HEADER: try {
                mqttFixedHeader = decodeFixedHeader(buffer);
                bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
                checkpoint(DecoderState.READ_VARIABLE_HEADER);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_VARIABLE_HEADER:  try {
                final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
                variableHeader = decodedVariableHeader.value;
                if (bytesRemainingInVariablePart > maxBytesInMessage) {
                    throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
                }
                bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
                checkpoint(DecoderState.READ_PAYLOAD);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_PAYLOAD: try {
                final Result<?> decodedPayload =
                        decodePayload(
                                buffer,
                                mqttFixedHeader.messageType(),
                                bytesRemainingInVariablePart,
                                variableHeader);
                bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
                if (bytesRemainingInVariablePart != 0) {
                    throw new DecoderException(
                            "non-zero remaining payload bytes: " +
                                    bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
                }
                checkpoint(DecoderState.READ_FIXED_HEADER);
                MqttMessage message = MqttMessageFactory.newMessage(
                        mqttFixedHeader, variableHeader, decodedPayload.value);
                mqttFixedHeader = null;
                variableHeader = null;
                out.add(message);
                break;
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case BAD_MESSAGE:
                // Keep discarding until disconnection.
                buffer.skipBytes(actualReadableBytes());
                break;

            default:
                // Shouldn't reach here.
                throw new Error();
        }
    }

    private MqttMessage invalidMessage(Throwable cause) {
      checkpoint(DecoderState.BAD_MESSAGE);
      return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
    }
}
  1. The reason for the long message has been found. There is still a question. Why can't follow-up messages, including ping messages, go out again? After looking at the code, this has something to do with the parent of MqttDecoder, Replaying Decoder. See the source code for details. Class description When reading variable length headers, if payload exceeds the maximum limit, an exception is thrown directly. The code is extracted as follows:
case READ_VARIABLE_HEADER:  try {
    final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
    variableHeader = decodedVariableHeader.value;
    if (bytesRemainingInVariablePart > maxBytesInMessage) {
        throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
    }
    bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
    checkpoint(DecoderState.READ_PAYLOAD);
    // fall through
} catch (Exception cause) {
    out.add(invalidMessage(cause));
    return;
}

In exception handling, invalidMessage method is called, which sets the state to DecoderState.BAD_MESSAGE, in which all bytes are discarded directly.

case BAD_MESSAGE:
    // Keep discarding until disconnection.
    buffer.skipBytes(actualReadableBytes());
    break;

That is to say, the message will not enter the business processing logic after that, and the long connection is abolished.

Solution

  1. The client restricts and splits the number of long messages to ensure that a single message does not exceed the maximum limit.
  2. The server increases the maximum load length, and MqttDecoder provides a constructor (not recommended, which increases the server processing time and memory burden).

Keywords: Programming Netty Android network Google

Added by le007 on Thu, 25 Jul 2019 11:59:49 +0300