Consumption mode of consumer of pulsar

client processing logic

ConnectionPool

netty related configurations are available in com.yahoo.pulsar.client.impl.ConnectionPool

public ConnectionPool(final PulsarClientImpl client, EventLoopGroup eventLoopGroup) {
        this.eventLoopGroup = eventLoopGroup;
        this.maxConnectionsPerHosts = client.getConfiguration().getConnectionsPerBroker();

        pool = new ConcurrentHashMap<>();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup);
        if (SystemUtils.IS_OS_LINUX && eventLoopGroup instanceof EpollEventLoopGroup) {
            bootstrap.channel(EpollSocketChannel.class);
        } else {
            bootstrap.channel(NioSocketChannel.class);
        }

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
        bootstrap.option(ChannelOption.TCP_NODELAY, client.getConfiguration().isUseTcpNoDelay());
        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            public void initChannel(SocketChannel ch) throws Exception {
                ClientConfiguration clientConfig = client.getConfiguration();
                if (clientConfig.isUseTls()) {
                    SslContextBuilder builder = SslContextBuilder.forClient();
                    if (clientConfig.isTlsAllowInsecureConnection()) {
                        builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
                    } else {
                        if (clientConfig.getTlsTrustCertsFilePath().isEmpty()) {
                            // Use system default
                            builder.trustManager((File) null);
                        } else {
                            File trustCertCollection = new File(clientConfig.getTlsTrustCertsFilePath());
                            builder.trustManager(trustCertCollection);
                        }
                    }

                    // Set client certificate if available
                    AuthenticationDataProvider authData = clientConfig.getAuthentication().getAuthData();
                    if (authData.hasDataForTls()) {
                        builder.keyManager(authData.getTlsPrivateKey(),
                                (X509Certificate[]) authData.getTlsCertificates());
                    }

                    SslContext sslCtx = builder.build();
                    ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
                }
                ch.pipeline().addLast("frameDecoder", new PulsarLengthFieldFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
                ch.pipeline().addLast("handler", new ClientCnx(client));
            }
        });
    }

messageReceived method

void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received message: {}", topic, subscription, messageId);
        }

        MessageMetadata msgMetadata = null;
        ByteBuf payload = headersAndPayload;
        try {
            msgMetadata = Commands.parseMessageMetadata(payload);
        } catch (Throwable t) {
            discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
            return;
        }

        ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, payload, cnx);
        if (uncompressedPayload == null) {
            // Message was discarded on decompression error
            return;
        }

        if (!verifyChecksum(messageId, msgMetadata, uncompressedPayload, cnx)) {
            // Message discarded for checksum error
            return;
        }

        final int numMessages = msgMetadata.getNumMessagesInBatch();

        if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
            final MessageImpl message = new MessageImpl(messageId, msgMetadata, uncompressedPayload,
                    getPartitionIndex(), cnx);
            uncompressedPayload.release();
            msgMetadata.recycle();

            try {
                lock.readLock().lock();
                // Enqueue the message so that it can be retrieved when application calls receive()
                // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
                // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
                boolean asyncReceivedWaiting = !pendingReceives.isEmpty();
                if ((conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) && !asyncReceivedWaiting) {
                    incomingMessages.add(message);
                }
                if (asyncReceivedWaiting) {
                    notifyPendingReceivedCallback(message, null);
                }
            } finally {
                lock.readLock().unlock();
            }
        } else {
            if (conf.getReceiverQueueSize() == 0) {
                log.warn(
                        "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size",
                        subscription, consumerName);
                // close connection
                closeAsync().handle((ok, e) -> {
                    // notify callback with failure result
                    notifyPendingReceivedCallback(null,
                            new PulsarClientException.InvalidMessageException(
                                    format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ",
                                            subscription, consumerName)));
                    return null;
                });
            } else {
                // handle batch message enqueuing; uncompressed payload has all messages in batch
                receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, messageId, cnx);
            }
            uncompressedPayload.release();
            msgMetadata.recycle();
        }

        stats.incrementNumAcksTracker(numMessages);

        if (listener != null) {
            // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
            // thread while the message processing happens
            listenerExecutor.execute(() -> {
                for (int i = 0; i < numMessages; i++) {
                    Message msg;
                    try {
                        msg = internalReceive();
                    } catch (PulsarClientException e) {
                        log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
                        return;
                    }

                    try {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, msg);
                        }
                        listener.received(ConsumerImpl.this, msg);
                    } catch (Throwable t) {
                        log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, msg,
                                t);
                    }
                }
            });
        }
    }

Message uncompressed size exceeds the limit and is discarded directly

private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
            ClientCnx currentCnx) {
        CompressionType compressionType = msgMetadata.getCompression();
        CompressionCodec codec = codecProvider.getCodec(compressionType);
        int uncompressedSize = msgMetadata.getUncompressedSize();
        if (uncompressedSize > PulsarDecoder.MaxMessageSize) {
            // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
            log.error("[{}][{}] Got corrupted uncompressed message size {} at {}", topic, subscription,
                    uncompressedSize, messageId);
            discardCorruptedMessage(messageId, currentCnx, ValidationError.UncompressedSizeCorruption);
            return null;
        }

        try {
            ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
            return uncompressedPayload;
        } catch (IOException e) {
            log.error("[{}][{}] Failed to decompress message with {} at {}: {}", topic, subscription, compressionType,
                    messageId, e.getMessage(), e);
            discardCorruptedMessage(messageId, currentCnx, ValidationError.DecompressionError);
            return null;
        }
    }

discardCorruptedMessage

private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx,
            ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
                messageId.getEntryId());
        ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), AckType.Individual,
                validationError);
        currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
        increaseAvailablePermits(currentCnx);
        stats.incrementNumReceiveFailed();
    }

Incoming messages (BlockingQueue < message >)

Specify the size of the queue in the configuration file, or use GrowableArrayBlockingQueue

protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
            ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture, boolean useGrowableQueue) {
        super(client, topic);
        this.subscription = subscription;
        this.conf = conf;
        this.consumerName = conf.getConsumerName() == null
                ? DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 5) : conf.getConsumerName();
        this.subscribeFuture = subscribeFuture;
        this.listener = conf.getMessageListener();
        if (conf.getReceiverQueueSize() <= 1) {
            this.incomingMessages = Queues.newArrayBlockingQueue(1);
        } else if (useGrowableQueue) {
            this.incomingMessages = new GrowableArrayBlockingQueue<>();
        } else {
            this.incomingMessages = Queues.newArrayBlockingQueue(conf.getReceiverQueueSize());
        }
        this.listenerExecutor = listenerExecutor;
        this.pendingReceives = Queues.newConcurrentLinkedQueue();
        if (conf.getAckTimeoutMillis() != 0) {
            this.unAckedMessageTracker = new UnAckedMessageTracker();
            this.unAckedMessageTracker.start(client, this, conf.getAckTimeoutMillis());
        } else {
            this.unAckedMessageTracker = null;
        }

    }

If the queue is full, an exception will be thrown.

Receive messages in bulk

void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload,
            MessageIdData messageId, ClientCnx cnx) {
        int batchSize = msgMetadata.getNumMessagesInBatch();

        // create ack tracker for entry aka batch
        BitSet bitSet = new BitSet(batchSize);
        MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
                getPartitionIndex());
        bitSet.set(0, batchSize);
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] added bit set for message {}, cardinality {}, length {}", subscription, consumerName,
                    batchMessage, bitSet.cardinality(), bitSet.length());
        }
        batchMessageAckTracker.put(batchMessage, bitSet);
        try {
            for (int i = 0; i < batchSize; ++i) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, i);
                }
                PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
                        .newBuilder();
                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
                        singleMessageMetadataBuilder, i, batchSize);
                BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
                        messageId.getEntryId(), getPartitionIndex(), i);
                final MessageImpl message = new MessageImpl(batchMessageIdImpl, msgMetadata,
                        singleMessageMetadataBuilder.build(), singleMessagePayload, cnx);
                lock.readLock().lock();
                if (pendingReceives.isEmpty()) {
                    incomingMessages.add(message);
                } else {
                    notifyPendingReceivedCallback(message, null);
                }
                lock.readLock().unlock();
                singleMessagePayload.release();
                singleMessageMetadataBuilder.recycle();
            }
        } catch (IOException e) {
            //
            log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
            batchMessageAckTracker.remove(batchMessage);
            discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", subscription,
                    consumerName, incomingMessages.size(), incomingMessages.remainingCapacity());
        }
    }

Push and pull mode of messages

kafka's consumer pull model

kafka's consumers adopt the pull mode. In zk, they save the offset of messages and pull the data by themselves, which can avoid the producer's too fast speed. In the push mode, consumers may not be able to handle the messages.

Push mode of pulsar's consumers

Similar to rabbitmq, broker pushes messages to consumer. If the consumer is not there, because the message is persistent in the pulsar, the message is still there. When the consumer is online, the offline message can be received. Because messages are persistent, there is no memory backlog. But there is a problem. If there are too many messages, which exceed the processing speed of consumer, will consumer crash?

receiverQueueSize

Note the following parameter: receiverQueueSize

this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;

At the beginning, when the consumer starts, it establishes a connection with the broker, sends the fow command to the broker, and begins to accept the data

if (!(firstTimeConnect && partitionIndex > -1) && conf.getReceiverQueueSize() != 0) {
                        receiveMessages(cnx, conf.getReceiverQueueSize());
                    }
void receiveMessages(ClientCnx cnx, int numMessages) {
        if (cnx != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Adding {} additional permits", topic, subscription, numMessages);
            }

            cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise());
        }
    }                    

FlowCommand tells broker how much data can be pushed at most. If there are messages larger than receiverQueueSize, only so many messages can be pushed. If there are not so many messages, they will be pushed to consumer according to the actual number. In this way, the problem of consumer processing speed is solved. The first start-up is to let the broker push the message with the size of receiverQueueSize at most, and then when the consumption reaches half, send the flow name, and request the message with the size of receiverQueueSize/2 at most.

consumer.receive()

public Message receive() throws PulsarClientException {
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set");
        }

        switch (state.get()) {
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
            throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
        case Failed:
        case Uninitialized:
            throw new PulsarClientException.NotConnectedException();
        }

        return internalReceive();
    }

internalReceive

protected Message internalReceive() throws PulsarClientException {
        if (conf.getReceiverQueueSize() == 0) {
            return fetchSingleMessageFromBroker();
        }
        Message message;
        try {
            message = incomingMessages.take();
            messageProcessed(message);
            if (unAckedMessageTracker != null) {
                unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
            }
            return message;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            stats.incrementNumReceiveFailed();
            throw new PulsarClientException(e);
        }
    }

messageProcessed

private synchronized void messageProcessed(Message msg) {
        ClientCnx currentCnx = cnx();
        ClientCnx msgCnx = ((MessageImpl) msg).getCnx();

        if (msgCnx != currentCnx) {
            // The processed message did belong to the old queue that was cleared after reconnection.
            return;
        }

        increaseAvailablePermits(currentCnx);
        stats.updateNumMsgsReceived(msg);
    }

increaseAvailablePermits(receiverQueueRefillThreshold)

Here is a parameter receiverQueueRefillThreshold, which is half of receiverQueueSize. That is, when half of the queue is consumed, it sends flow name to broker again and requests broker to push message.

private void increaseAvailablePermits(ClientCnx currentCnx) {
        int available = availablePermits.incrementAndGet();

        while (available >= receiverQueueRefillThreshold) {
            if (availablePermits.compareAndSet(available, 0)) {
                receiveMessages(currentCnx, available);
                break;
            } else {
                available = availablePermits.get();
            }
        }
    }

availablePermits: The number of messages this consumer has space for in the client library's listen queue. A value of 0 means the client library's queue is full and receive() isn't being called. A nonzero value means this consumer is ready to be dispatched messages.

Deletion of messages

Will persistent messages be deleted after consumers consume them? Can consumers backtrack messages?
When the message is sent to the specified topic, if no consumer has created a subscription, the message sent before will not be received by the consumer. In this case, there is no need for messages to exist. There should be deletion logic. In fact, once the message ack, the broker will delete it.
pulsar/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java

@Override
    public void acknowledgeMessage(PositionImpl position, AckType ackType) {
        if (ackType == AckType.Cumulative) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
            }
            cursor.asyncMarkDelete(position, markDeleteCallback, position);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Individual ack on {}", topicName, subName, position);
            }
            cursor.asyncDelete(position, deleteCallback, position);
        }
    }

docs

Keywords: Programming kafka codec Java Netty

Added by Nothsa on Wed, 27 May 2020 14:04:46 +0300