Apache Pulsar: Pulsar message mechanism

Message mechanism

Pulsar adopts the pub sub design pattern. In this design pattern, the producer publishes messages to topic, and the Consumer subscribes to topic, processes the published messages, and sends a confirmation after processing.

Once a subscription is created, Pulsar can still save all messages even if the consumer is disconnected. The message will not be deleted until the consumer confirms that the message has been processed successfully.

Topic

Logically, a Topic is a log structure in which each message has an offset. Apache Pulsar uses cursors to track offsets.

Pulsar supports two basic topic types: persistent topic and non persistent topic.

{persistent|non-persistent}://tenant/namespace/topic
  • Non-Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics \
list public/default
$ $PULSAR_HOME/bin/pulsar-admin topics \
create persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
lookup persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
delete persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
stats persistent://public/default/input-seed-avro-topic
$ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool

Partitioned topics

$ $PULSAR_HOME/bin/pulsar-admin topics \
create-partitioned-topic persistent://public/default/output-seed-avro-topic \
--partitions 2
$ $PULSAR_HOME/bin/pulsar-admin topics \
list-partitioned-topics public/default
$ $PULSAR_HOME/bin/pulsar-admin topics \
get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
delete-partitioned-topic persistent://public/default/output-seed-avro-topic

Message

Messages are the basic "unit" of Pulsar.

public interface Message<T> {
    Map<String, String> getProperties();
    boolean hasProperty(String var1);
    String getProperty(String var1);
    byte[] getData();
    T getValue();
    MessageId getMessageId();
    long getPublishTime();
    long getEventTime();
    long getSequenceId();
    String getProducerName();
    boolean hasKey();
    String getKey();
    boolean hasBase64EncodedKey();
    byte[] getKeyBytes();
    boolean hasOrderingKey();
    byte[] getOrderingKey();
    String getTopicName();
    Optional<EncryptionContext> getEncryptionCtx();
    int getRedeliveryCount();
    byte[] getSchemaVersion();
    boolean isReplicated();
    String getReplicatedFrom();
}

Producer

public void send() throws PulsarClientException {
    final String serviceUrl = "pulsar://server-100:6650";
    // final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
    // http://pulsar.apache.org/docs/en/client-libraries-java/#client
    final PulsarClient client = PulsarClient.builder()
            .serviceUrl(serviceUrl)
            .connectionTimeout(10000, TimeUnit.MILLISECONDS)
            .build();
    final String topic = "persistent://public/default/topic-sensor-temp";
    // http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
    final Producer<byte[]> producer = client.newProducer()
            .producerName("sensor-temp")
            .topic(topic)
            .compressionType(CompressionType.LZ4)
            .enableChunking(true)
            .enableBatching(true)
            .batchingMaxBytes(1024)
            .batchingMaxMessages(10)
            .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
            .blockIfQueueFull(true)
            .maxPendingMessages(512)
            .sendTimeout(1, TimeUnit.SECONDS)
            .create();
    MessageId mid = producer.send("sensor-temp".getBytes());
    System.out.printf("\nmessage with ID %s successfully sent", mid);
    mid = producer.newMessage()
            .key("sensor-temp-key")
            .value("sensor-temp-key".getBytes())
            .property("my-key", "my-value")
            .property("my-other-key", "my-other-value")
            .send();
    System.out.printf("message-key with ID %s successfully sent", mid);
    producer.close();
    client.close();
}

Consumer

public void consume() throws PulsarClientException {
    final String serviceUrl = "pulsar://server-101:6650";
    final String topic = "input-seed-avro-topic";
    final PulsarClient client = PulsarClient.builder()
            .serviceUrl(serviceUrl)
            .enableTcpNoDelay(true)
            .build();
    final Consumer<byte[]> consumer = client
            .newConsumer()
            .consumerName("seed-avro-consumer")
            .subscriptionName("seed-avro-subscription")
            .subscriptionType(SubscriptionType.Exclusive)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .topic(topic)
            .receiverQueueSize(10)
            .subscribe();
    final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class);
    while (true) {
        try {
            final Message<byte[]> msg = consumer.receive();
            LOG.info("receive messages:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
                    Thread.currentThread().getId(),
                    msg.getTopicName(),
                    msg.getMessageId(),
                    msg.getSequenceId(),
                    msg.getEventTime(),
                    msg.getPublishTime(),
                    msg.getProducerName(),
                    msg.getKey(), schema.decode(msg.getValue()));
            try {
                consumer.acknowledge(msg);
            } catch (final PulsarClientException e) {
                consumer.negativeAcknowledge(msg);
                LOG.error("acknowledge:" + e.getLocalizedMessage(), e);
            }
        } catch (final PulsarClientException e) {
            LOG.error("receive:" + e.getLocalizedMessage(), e);
        }
    }
}

Subscriptions

Consumers consume messages in Topic through subscription. Subscriptions are logical entities of cursors (tracking offsets). Multiple subscriptions can be added to a Topic. The subscription does not contain the data of the message, only metadata and cursors.

Each subscription stores a cursor. Cursor is the current offset in the log. Subscription stores its cursor in BookKeeper's Ledger. This enables cursor tracing to be extended like Topic.

Subscription type

  • Exclusive

    A subscription can only have one message consumer.

  • Failover disaster recovery

    A subscription has only one consumer at the same time, and there can be multiple backup consumers. Once the primary consumer fails, the backup consumer takes over. There will not be two active consumers at the same time.

  • Shared

    There can be multiple consumers in a subscription at the same time, and multiple consumers share the messages in the Topic.

  • Key_Shared

Ordering guarantee

If ordering is required, Exclusive and Failover subscription modes can be used, so that only one Consumer of the same Topic is consuming, which can ensure ordering.

If the Shared subscription mode is used, multiple consumers can consume the same Topic concurrently. By dynamically increasing the number of consumers, you can accelerate the consumption of topics and reduce the accumulation of messages on the server.

The KeyShared mode ensures that messages of the same Key will also be sent to the same Consumer in the Shared mode, which ensures both concurrency and sequencing.

Multi topic subscriptions

Pattern:

  • persistent://public/default/.*
  • persistent://public/default/foo.*

Reader

public void read() throws IOException {
    final String serviceUrl = "pulsar://server-101:6650";
    final PulsarClient client = PulsarClient.builder()
            .serviceUrl(serviceUrl)
            .build();
    // http://pulsar.apache.org/docs/en/client-libraries-java/#reader
    final Reader<byte[]> reader = client.newReader()
            .topic("my-topic")
            .startMessageId(MessageId.earliest()) // MessageId.latest
            .create();
    while (true) 
        final Message<byte[]> message = reader.readNext();
        System.out.println(new String(message.getData()));
    }
}

Partitioned topics

Message retention and expiration

If no data retention policy is set for a Topic, once all subscribed cursors of a Topic have successfully consumed an offset, the message before the offset will be automatically deleted.

If the Topic has set a data retention policy, the messages that have been consumed and confirmed will be deleted after exceeding the retention policy threshold (the message storage size of the Topic and the retention time of the messages in the Topic).

conf/broker.conf

# Default message retention time
# Default 0, modified to 3 days = 60 * 24 * 3
defaultRetentionTimeInMinutes=4320
# Default retention size
# It defaults to 0 and is modified to 10G
defaultRetentionSizeInMB=10240
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0

retention policy (for a namespace)

$ $PULSAR_HOME/bin/pulsar-admin namespaces \
get-retention public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
set-retention public/default \
--size 1024M \
--time 5m
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \
--header "Content-Type:application/json" \
--data '{
  "retentionTimeInMinutes" : 5,
  "retentionSizeInMB" : 1024
}'

message expiry / message-ttl

$ $PULSAR_HOME/bin/pulsar-admin namespaces \
get-message-ttl public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
set-message-ttl public/default \
--messageTTL 1800
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \
--header "Content-Type:application/json" \
--data '1800'

More benefits

Cloud intelligence has opened source OMP (Operation Management Platform), a comprehensive operation and maintenance management platform integrating lightweight, aggregated and intelligent operation and maintenance. It has the functions of management, deployment, monitoring, patrol inspection, self-healing, backup and recovery. It can provide users with convenient operation and maintenance capabilities and business management, and improve the work efficiency of operation and maintenance personnel, It greatly improves business continuity and security. Click the address link below to welcome you to give OMP some likes to send star and learn more about it ~

GitHub address: https://github.com/CloudWise-OpenSource/OMP
Gitee address: https://gitee.com/CloudWise/OMP

Wechat scan identifies the QR code below. Note [OMP] join the OMP developer exchange group of AIOps community operation and maintenance management platform to exchange and learn with more industry leaders

Series reading

Apache Pulsar (1): Pulsar vs Kafka

Keywords: Java Apache message queue Cloud Native pulsar

Added by DjSiXpAcK14 on Thu, 20 Jan 2022 11:00:10 +0200