Hello, I'm brother Jun.
As an excellent message flow platform, Pulsar is used more and more. This article explains Pulsar's Java client.
Deploy Pulsar
Pulsar can be deployed in three ways: local binary installation, docker deployment and Kubernetes deployment.
This paper uses docker to deploy a single node Pulsar cluster. The experimental environment is 2-core CPU and 4G memory.
The deployment command is as follows:
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone
The following errors may occur during installation:
unknown flag: --mount See 'docker run --help'.
This is because the docker version is low and does not support the mount parameter. Just upgrade the docker version to above 17.06.
The deployment process may fail due to network reasons. You can succeed by trying several times. If you see the following log, it indicates that the startup is successful.
2022-01-08T22:27:58,726+0000 [main] INFO org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone
After the local single node cluster is started, a namespace named public/default will be created
Pulsar client
At present, Pulsar supports clients in multiple languages, including:
Java client
Go client
Python client
C + + client
Node.js client
WebSocket client
C# client
SpringBoot configuration
Use SpringBoot to integrate Pulsar client. First, introduce Pulsar client dependency. The code is as follows:
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.9.1</version> </dependency>
Then add the configuration in the properties file:
# Pulsar address pulsar.url=pulsar://192.168.59.155:6650 # topic pulsar.topic=testTopic # consumer group pulsar.subscription=topicGroup
Create Client
Creating a client is very simple. The code is as follows:
client = PulsarClient.builder() .serviceUrl(url) .build();
The above URL is the pulsar defined in the properties file url .
When creating the Client, even if the cluster is not started successfully, the program will not report an error, because the cluster has not been really connected at this time.
Create Producer
producer = client.newProducer() .topic(topic) .compressionType(CompressionType.LZ4) .sendTimeout(0, TimeUnit.SECONDS) .enableBatching(true) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .batchingMaxMessages(1000) .maxPendingMessages(1000) .blockIfQueueFull(true) .roundRobinRouterBatchingPartitionSwitchFrequency(10) .batcherBuilder(BatcherBuilder.DEFAULT) .create();
Creating a Producer will truly connect to the cluster. At this time, if there is a problem with the cluster, a connection error will be reported.
The following explains the parameters for creating Producer:
Topic: the topic to be written by Producer.
compressionType: compression strategy. Currently, it supports four strategies (NONE, LZ4, ZLIB and ZSTD), starting from pulsar2 Starting from 3, this policy will take effect only if the Consumer version is above 2.3.
sendTimeout: timeout. If the Producer receives an ACK within the timeout, it will resend it.
enableBatching: whether to enable message batch processing. The default value here is true. This parameter can only take effect when sending messages asynchronously (sendAsync). Selecting synchronous sending will fail.
batchingMaxPublishDelay: the time period for sending messages in batches. Here, 10ms is defined. It should be noted that if the batch time is set, it will not be affected by the number of messages. Batch sending will send the batch messages to be sent in a network packet, reduce the number of network IO, and greatly improve the sending efficiency of the network card.
batchingMaxMessages: the maximum number of messages sent in batches.
Maxpending messages: the maximum length of the message queue waiting to receive an ACK from the broker. If the queue is full, all sendAsync and send of producer will fail unless blockIfQueueFull is set to true.
blockIfQueueFull: when the Producer sends a message, it will put the message into the local Queue cache first. If the cache is full, it will block the message sending.
Roundrobin routerbatchingpartition switchfrequency: if no key is specified when sending a message, the message is sent in the way of round robin by default. Using the way of round robin, the cycle of switching partition is (frequency * batchingMaxPublishDelay).
Create Consumer
Pulsar's consumption model is shown below:
As can be seen from the figure, the Consumer needs to bind a subscription to consume.
consumer = client.newConsumer() .topic(topic) .subscriptionName(subscription) .subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS) .receiverQueueSize(1000) .subscribe();
The following explains the parameters for creating a Consumer:
Topic: the topic that the Consumer wants to subscribe to.
subscriptionName: the name of the subscription to associate with the consumer.
subscriptionType: subscription type. Pulsar supports four types of Subscriptions:
Exclusive: exclusive mode. One Topic can only have one consumer. If there are multiple consumers, an error will occur.
Failover: disaster recovery mode. The same Topic can have multiple consumers, but only one consumer can consume. Other consumers are used as backup for failover. If the current consumer fails, select one of the backup consumers for consumption. As shown below:
Shared: sharing mode. The same Topic can be subscribed and consumed by multiple consumers. Messages are distributed to different consumers through the round robin polling mechanism, and each message will be distributed to only one consumer. When the consumer is disconnected, if the messages sent to it are not consumed, these messages will be redistributed to other surviving consumers. As shown below:
Key_Shared: both messages and consumers will be bound with a key, and messages will only be sent to consumers bound with the same key. If a new consumer establishes a connection or a consumer disconnects, you need to update the key of some messages. Compared with shared mode, key_ The advantage of shared is that it can not only allow consumers to consume messages concurrently, but also ensure the message order under the same key. As shown below:
subscriptionInitialPosition: where to start consumption when creating a new subscription. There are two options:
Latest: start consumption from the latest news
Early: consume from the Earliest news
negativeAckRedeliveryDelay: how often the broker resends after consumption failure.
receiverQueueSize: the maximum number of messages that can be accumulated before calling the receive method. It can be set to 0 so that only one message is pulled from the broker at a time. In the Shared mode, the receiverQueueSize is set to 0 to prevent batch messages from being sent to one more Consumer and causing other consumers to be idle.
There are four ways for consumers to receive messages: synchronous single message, synchronous batch, asynchronous single message and asynchronous batch. The codes are as follows:
Message message = consumer.receive() CompletableFuture<Message> message = consumer.receiveAsync(); Messages message = consumer.batchReceive(); CompletableFuture<Messages> message = consumer.batchReceiveAsync();
For batch reception, you can also set the batch reception policy. The code is as follows:
consumer = client.newConsumer() .topic(topic) .subscriptionName(subscription) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(1024 * 1024) .timeout(200, TimeUnit.MILLISECONDS) .build()) .subscribe();
The parameters in the code are described as follows:
maxNumMessages: the maximum number of messages received in batch.
maxNumBytes: the size of batch received messages. Here is 1MB.
test
First, write the code for Producer to send messages, as follows:
public void sendMsg(String key, String data) { CompletableFuture<MessageId> future = producer.newMessage() .key(key) .value(data.getBytes()).sendAsync(); future.handle((v, ex) -> { if (ex == null) { logger.info("Message sent successfully, key:{}, msg: {}", key, data); } else { logger.error("Failed to send message, key:{}, msg: {}", key, data); } return null; }); future.join(); logger.info("Send message complete, key:{}, msg: {}", key, data); }
Then write a code of Consumer consumption message, as follows:
public void start() throws Exception{ while (true) { Message message = consumer.receive(); String key = message.getKey(); String data = new String(message.getData()); String topic = message.getTopicName(); if (StringUtils.isNotEmpty(data)) { try{ logger.info("Received message, topic:{}, key:{}, data:{}", topic, key, data); }catch(Exception e){ logger.error("Abnormal message received, topic:{}, key:{}, data:{}", topic, key, data, e); } } consumer.acknowledge(message); } }
Finally, write a Controller class and call Producer to send messages. The code is as follows:
@RequestMapping("/send") @ResponseBody public String send(@RequestParam String key, @RequestParam String data) { logger.info("Message sending request received, key:{}, value:{}", key, data); pulsarProducer.sendMsg(key, data); return "success"; }
Call Producer to send a message, key=key1, data=data1. Enter the following url in the browser and press enter:
http://192.168.157.1:8083/pulsar/send?key=key1&data=data1
You can see the console output the following log:
2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - Message sent successfully, key:key1, msg: data1 2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - Send message complete, key:key1, msg: data1 2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - Received message, topic:persistent://public/default/testTopic, key:key1, data:data1 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0
From the log, we can see that the namespace used here is the public/default generated when creating the cluster.
summary
From the point of view of integrating the use of Java clients with SpringBoot, Pulsar's api is very friendly and easy to use. More consideration should be given to the use of consumers, including batch, asynchronous and subscription types.