Message mechanism
Pulsar adopts the pub sub design pattern. In this design pattern, the producer publishes messages to topic, and the Consumer subscribes to topic, processes the published messages, and sends a confirmation after processing.
Once a subscription is created, Pulsar can still save all messages even if the consumer is disconnected. The message will not be deleted until the consumer confirms that the message has been processed successfully.
Topic
Logically, a Topic is a log structure in which each message has an offset. Apache Pulsar uses cursors to track offsets.
Pulsar supports two basic topic types: persistent topic and non persistent topic.
{persistent|non-persistent}://tenant/namespace/topic
- Non-Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics \ list public/default $ $PULSAR_HOME/bin/pulsar-admin topics \ create persistent://public/default/input-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ lookup persistent://public/default/input-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ delete persistent://public/default/input-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ stats persistent://public/default/input-seed-avro-topic $ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool
Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics \ create-partitioned-topic persistent://public/default/output-seed-avro-topic \ --partitions 2 $ $PULSAR_HOME/bin/pulsar-admin topics \ list-partitioned-topics public/default $ $PULSAR_HOME/bin/pulsar-admin topics \ get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ delete-partitioned-topic persistent://public/default/output-seed-avro-topic
Message
Messages are the basic "unit" of Pulsar.
public interface Message<T> { Map<String, String> getProperties(); boolean hasProperty(String var1); String getProperty(String var1); byte[] getData(); T getValue(); MessageId getMessageId(); long getPublishTime(); long getEventTime(); long getSequenceId(); String getProducerName(); boolean hasKey(); String getKey(); boolean hasBase64EncodedKey(); byte[] getKeyBytes(); boolean hasOrderingKey(); byte[] getOrderingKey(); String getTopicName(); Optional<EncryptionContext> getEncryptionCtx(); int getRedeliveryCount(); byte[] getSchemaVersion(); boolean isReplicated(); String getReplicatedFrom(); }
Producer
public void send() throws PulsarClientException { final String serviceUrl = "pulsar://server-100:6650"; // final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650"; // http://pulsar.apache.org/docs/en/client-libraries-java/#client final PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .connectionTimeout(10000, TimeUnit.MILLISECONDS) .build(); final String topic = "persistent://public/default/topic-sensor-temp"; // http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer final Producer<byte[]> producer = client.newProducer() .producerName("sensor-temp") .topic(topic) .compressionType(CompressionType.LZ4) .enableChunking(true) .enableBatching(true) .batchingMaxBytes(1024) .batchingMaxMessages(10) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .blockIfQueueFull(true) .maxPendingMessages(512) .sendTimeout(1, TimeUnit.SECONDS) .create(); MessageId mid = producer.send("sensor-temp".getBytes()); System.out.printf("\nmessage with ID %s successfully sent", mid); mid = producer.newMessage() .key("sensor-temp-key") .value("sensor-temp-key".getBytes()) .property("my-key", "my-value") .property("my-other-key", "my-other-value") .send(); System.out.printf("message-key with ID %s successfully sent", mid); producer.close(); client.close(); }
Consumer
public void consume() throws PulsarClientException { final String serviceUrl = "pulsar://server-101:6650"; final String topic = "input-seed-avro-topic"; final PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .enableTcpNoDelay(true) .build(); final Consumer<byte[]> consumer = client .newConsumer() .consumerName("seed-avro-consumer") .subscriptionName("seed-avro-subscription") .subscriptionType(SubscriptionType.Exclusive) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .topic(topic) .receiverQueueSize(10) .subscribe(); final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class); while (true) { try { final Message<byte[]> msg = consumer.receive(); LOG.info("receive messages:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}", Thread.currentThread().getId(), msg.getTopicName(), msg.getMessageId(), msg.getSequenceId(), msg.getEventTime(), msg.getPublishTime(), msg.getProducerName(), msg.getKey(), schema.decode(msg.getValue())); try { consumer.acknowledge(msg); } catch (final PulsarClientException e) { consumer.negativeAcknowledge(msg); LOG.error("acknowledge:" + e.getLocalizedMessage(), e); } } catch (final PulsarClientException e) { LOG.error("receive:" + e.getLocalizedMessage(), e); } } }
Subscriptions
Consumers consume messages in Topic through subscription. Subscriptions are logical entities of cursors (tracking offsets). Multiple subscriptions can be added to a Topic. The subscription does not contain the data of the message, only metadata and cursors.
Each subscription stores a cursor. Cursor is the current offset in the log. Subscription stores its cursor in BookKeeper's Ledger. This enables cursor tracing to be extended like Topic.
Subscription type
Exclusive
A subscription can only have one message consumer.
Failover disaster recovery
A subscription has only one consumer at the same time, and there can be multiple backup consumers. Once the primary consumer fails, the backup consumer takes over. There will not be two active consumers at the same time.
Shared
There can be multiple consumers in a subscription at the same time, and multiple consumers share the messages in the Topic.
- Key_Shared
Ordering guarantee
If ordering is required, Exclusive and Failover subscription modes can be used, so that only one Consumer of the same Topic is consuming, which can ensure ordering.
If the Shared subscription mode is used, multiple consumers can consume the same Topic concurrently. By dynamically increasing the number of consumers, you can accelerate the consumption of topics and reduce the accumulation of messages on the server.
The KeyShared mode ensures that messages of the same Key will also be sent to the same Consumer in the Shared mode, which ensures both concurrency and sequencing.
Multi topic subscriptions
Pattern:
- persistent://public/default/.*
- persistent://public/default/foo.*
Reader
public void read() throws IOException { final String serviceUrl = "pulsar://server-101:6650"; final PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); // http://pulsar.apache.org/docs/en/client-libraries-java/#reader final Reader<byte[]> reader = client.newReader() .topic("my-topic") .startMessageId(MessageId.earliest()) // MessageId.latest .create(); while (true) final Message<byte[]> message = reader.readNext(); System.out.println(new String(message.getData())); } }
Partitioned topics
Message retention and expiration
If no data retention policy is set for a Topic, once all subscribed cursors of a Topic have successfully consumed an offset, the message before the offset will be automatically deleted.
If the Topic has set a data retention policy, the messages that have been consumed and confirmed will be deleted after exceeding the retention policy threshold (the message storage size of the Topic and the retention time of the messages in the Topic).
conf/broker.conf
# Default message retention time # Default 0, modified to 3 days = 60 * 24 * 3 defaultRetentionTimeInMinutes=4320 # Default retention size # It defaults to 0 and is modified to 10G defaultRetentionSizeInMB=10240 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0
retention policy (for a namespace)
$ $PULSAR_HOME/bin/pulsar-admin namespaces \ get-retention public/default $ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool $ $PULSAR_HOME/bin/pulsar-admin namespaces \ set-retention public/default \ --size 1024M \ --time 5m $ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \ --header "Content-Type:application/json" \ --data '{ "retentionTimeInMinutes" : 5, "retentionSizeInMB" : 1024 }'
message expiry / message-ttl
$ $PULSAR_HOME/bin/pulsar-admin namespaces \ get-message-ttl public/default $ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL $ $PULSAR_HOME/bin/pulsar-admin namespaces \ set-message-ttl public/default \ --messageTTL 1800 $ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \ --header "Content-Type:application/json" \ --data '1800'
More benefits
Cloud intelligence has opened source OMP (Operation Management Platform), a comprehensive operation and maintenance management platform integrating lightweight, aggregated and intelligent operation and maintenance. It has the functions of management, deployment, monitoring, patrol inspection, self-healing, backup and recovery. It can provide users with convenient operation and maintenance capabilities and business management, and improve the work efficiency of operation and maintenance personnel, It greatly improves business continuity and security. Click the address link below to welcome you to give OMP some likes to send star and learn more about it ~
GitHub address: https://github.com/CloudWise-OpenSource/OMP
Gitee address: https://gitee.com/CloudWise/OMP
Wechat scan identifies the QR code below. Note [OMP] join the OMP developer exchange group of AIOps community operation and maintenance management platform to exchange and learn with more industry leaders