spring boot integration Kafka spring-kafka in-depth exploration

Preface

Kafka is a message queue product. Based on Topic partitions design, it can achieve very high performance of message sending and processing. Spring created a project called Spring-kafka, which encapsulates Apache's Kafka-client for rapid integration of Kafka in Spring projects. In addition to simple messaging, Spring-kafka also provides many advanced functions. Let's explore these usages.

Project address: https://github.com/spring-projects/spring-kafka

Simple integration

Introducing dependencies

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

Add Configuration

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

Test sending and receiving

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {

    private final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        this.template.send("topic_input", input);
    }
    @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }
}

After starting the application, enter in the browser: http://localhost 8080/send/kl. You can see the log output in the console: input value: "kl". The use of the foundation is as simple as that. A KafkaTemplate is injected when sending a message and an @KafkaListener annotation is added when receiving a message.

Spring-kafka-test embedded Kafka Server

However, the above code can start successfully if you already have the service environment of Kafka Server. We know that Kafka is built by Scala + Zookeeper and can download the deployment package from the official website and deploy locally. However, I want to tell you that Spring-Kafka-Test has encapsulated Kafka-test's annotated one-click function to turn on Kafka Server in order to simplify the development process to verify Kafka-related functions, which is also super simple to use. Kafka for all test cases later in this article is provided using this embedded service.

Introducing dependencies

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <version>2.2.6.RELEASE</version>
   <scope>test</scope>
</dependency>

Start up service

The following uses the Junit test case to start a Kafka Server service directly, including four Broker nodes.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads()throws IOException {
        System.in.read();
    }
}

As above: Just need a comment @Embedded Kafka to start a full-featured Kafka service. Is it cool? The default is to create a Broker with a random port without any parameters, which will output specific ports and default configuration items in the startup log. However, these configurable items in the Kafka installation package configuration file can be configurable in the annotation parameters. The following details describe the configurable parameters in the @EmbeddedKafka annotation:

  • value: number of broker nodes
  • count: As with value, it is also the number of nodes configured for broker
  • Controlled Shutdown: Controlled shutdown switch, mainly used to reduce the unavailability time of Partition on Broker when it accidentally shuts down

Kafka is a highly available service with multiple Broker architectures. A Topic corresponds to multiple partitions, and a Partition can have multiple replications. These replicas are stored in multiple Brokers for high availability. However, although there are multiple partitioned replica sets, there is only one working replica set. By default, the first allocated replica set (preferred replica) is the Leader, responsible for writing and reading data. When we upgrade Broker or update Broker configuration, we need to restart the service, and then we need to transfer the partition to the available Broker. Here are three scenarios

  1. Close Broker directly: When Broker is shut down, the Broker cluster will re-select a new Broker to be the Partition Leader, and the Partition on the Broker will be unavailable for a short time during the election.
  2. Turn on controlledShutdown: When Broker closes, Broker itself tries to transfer the Leader role to other available Brokers first
  3. Use command-line tools: Manually trigger Partition Leader role transfer using bin/kafka-preferred-replica-election.sh
  • Ports: A list of ports, an array. Corresponding to the count parameter, there are several Broker s, which correspond to several port numbers.
  • Broker Properties: Broker parameter settings are an array structure that supports Broker parameter settings in the following ways:
@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
  • okerPropertiesLocation: Broker parameter file settings

The function is the same as the broker properties above, but the settable parameters of Kafka Broker are as many as 182. It is certainly not the best solution to configure Kafka Broker as above, so it provides the function of loading local configuration files, such as:

@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")

Create a new Topic

By default, if Topic does not exist when sending a message using Kafka Template, a new Topic is created. The default number of partitions and copies is set as the following Broker parameter

num.partitions = 1 # default number of Topic partitions
num.replica.fetchers = 1 # default number of copies

Create Topic at Program Start

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/31
 */
@Configuration
public class KafkaConfig {
    @Bean
    public KafkaAdmin admin(KafkaProperties properties){
        KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
        admin.setFatalIfBrokerNotAvailable(true);
        return admin;
    }
    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic-kl", 1, (short) 1);
    }
}

If Kafka Broker supports (1.0.0 or later), new Partition partitions will be added if it finds that the number of Artitions in the existing Topic is less than the number of Partitions set. There are several common uses of Kafka Admin as follows:

setFatalIfBrokerNotAvailable(true): The default value is False, which does not affect the initialization of Spring context when Broker is unavailable. Set this value to True if you feel that Broker is unavailable and affects normal business.

setAutoCreate(false): The default value is True, which means that when Kafka is instantiated, the instantiated NewTopic object is automatically created.

initialize(): When setAutoCreate is false, we need the initialize() method of calling admin displayed by our program to initialize the NewTopic object.

Create in code logic

Sometimes we don't know how many Partition s a Topic needs when the program starts, but we can't use Broker's default settings directly. At this time, we need to use AdminClient which comes with Kafka-Client. The Kafka Admin encapsulated in Spring above is also handled by AdminClient. Such as:

    @Autowired
    private KafkaProperties properties;
    @Test
    public void testCreateToipc(){
        AdminClient client = AdminClient.create(properties.buildAdminProperties());
        if(client !=null){
            try {
                Collection<NewTopic> newTopics = new ArrayList<>(1);
                newTopics.add(new NewTopic("topic-kl",1,(short) 1));
                client.createTopics(newTopics);
            }catch (Throwable e){
                e.printStackTrace();
            }finally {
                client.close();
            }
        }
    }

ps: Other ways to create Topic

The above ways of creating Topic are based on your spring boot version up to 2.x, because spring-kafka 2.x only supports the spring boot 2.x version. These APIs are not available in version 1.x. Here's a way to create Topic through Kafka_2.10 in a program

Introducing dependencies

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>

api mode creation

    @Test
    public void testCreateTopic()throws Exception{
        ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
        String topicName = "topic-kl";
        int partitions = 1;
        int replication = 1;
        AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
    }

Note that the last constructed parameter of ZkClient is a serialized deserialized interface implementation. If blogger test is not filled in, the data created by Topic on ZK is problematic. The default Kafka implementation is also simple, that is, the string UTF-8 encoding is done. ZKStringSerializer $is an example of an interface already implemented in Kafka, a concomitant object of Scala, which can be obtained by calling MODULE $directly in Java.

Command mode creation

    @Test
    public void testCreateTopic(){
        String [] options= new String[]{
                "--create",
                "--zookeeper","127.0.0.1:2181",
                "--replication-factor", "3",
                "--partitions", "3",
                "--topic", "topic-kl"
        };
        TopicCommand.main(options);
    }

Kafka Template for Message Sending

Get the sending result

Asynchronous acquisition

        template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                ......
            }

            @Override
            public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                ....
            }
        });

Synchronized acquisition

        ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");
        try {
            SendResult<Object,Object> result = future.get();
        }catch (Throwable e){
            e.printStackTrace();
        }

kafka transaction message

By default, the KafkaTemplate instance automatically generated by Spring-kafka does not have the ability to send transactional messages. The following configuration is required to activate the transaction feature. After the transaction is activated, all message sending can only be performed within the method in which the transaction occurs, otherwise an exception without transaction will be thrown.

spring.kafka.producer.transaction-id-prefix=kafka_tx.

When there is a transaction requirement to send a message, for example, when all messages are sent successfully, the following example is given: assuming that after the first consumption is sent, an exception occurs before the second message is sent, then the first message that has been sent will also be rolled back. Normally, assuming that the message is dormant for a period of time after it is sent, the consumer receives the second message only after the transaction method has been executed.

    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        template.executeInTransaction(t ->{
            t.send("topic_input","kl");
            if("error".equals(input)){
                throw new RuntimeException("failed");
            }
            t.send("topic_input","ckl");
            return true;
        });
    }

Similarly, when the transaction feature is activated, adding the @Transactional annotation to the method will also take effect.

    @GetMapping("/send/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendFoo(@PathVariable String input) {
        template.send("topic_input", "kl");
        if ("error".equals(input)) {
            throw new RuntimeException("failed");
        }
        template.send("topic_input", "ckl");
    }

The transaction message of Spring-Kafka is based on the transaction message function provided by Kafka. The default configuration of Kafka Broker is set for three or more broker high availability services. For the sake of simplicity and convenience, we use embedded services to build a new single Broker Kafka service. Some problems arise, such as:

1. If the transaction log replica set is larger than the number of Broker s, the following exception will be thrown:

Number of alive brokers '1' does not meet the required replication factor '3' 
for the transactions state topic (configured via 'transaction.state.log.replication.factor').
This error can be ignored if the cluster is starting up and not all brokers are up yet.

Default Broker configuration transaction.state.log.replication.factor=3, single node can only be adjusted to 1

2. If the number of replicas is less than the number of replica synchronization queues, the following exception will be thrown

Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]

Default Broker configuration transaction.state.log.min.isr=2, single node can only be adjusted to 1

Replying Kafka Template Gets Message Reply

Replying KafkaTemplate is a subclass of KafkaTemplate. In addition to inheriting the method of the parent class, a new method sendAndReceive is added to realize the semantics of message sending reply.

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

That is, I send a message to get the result that the consumer returned to me. Just like the traditional RPC interaction. When the sender of the message needs to know the specific consumption situation of the message consumer, it is very suitable for this api. For example, when sending a batch of data in a message, you need to know which data the consumer has successfully processed. The following code demonstrates how to integrate and use Replying Kafka Template

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
    private final Logger logger = LoggerFactory.getLogger(Application.class);
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        return new ReplyingKafkaTemplate(pf, repliesContainer);
    }

    @Bean
    public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate(pf);
    }

    @Autowired
    private ReplyingKafkaTemplate template;

    @GetMapping("/send/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendFoo(@PathVariable String input) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
        RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        System.err.println("Return value: " + consumerRecord.value());
    }

    @KafkaListener(id = "webGroup", topics = "topic-kl")
    @SendTo
    public String listen(String input) {
        logger.info("input value: {}", input);
        return "successful";
    }
}

Exploration of Spring-kafka News Consumption Usage

@ Use of KafkaListener

The ability of @KafkaListener to receive messages has been demonstrated in the previous simple integration, but the @KafkaListener function is more than that. Other common features that use scenarios are as follows:

  • Which Topic and partition messages are displayed for specified consumption,
  • Set the offset for each Topic and partition initialization.
  • Setting concurrency of consumption threads
  • Setting up message exception handler
    @KafkaListener(id = "webGroup", topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
                    @TopicPartition(topic = "topic2", partitions = "0",
                            partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
            },concurrency = "6",errorHandler = "myErrorHandler")
    public String listen(String input) {
        logger.info("input value: {}", input);
        return "successful";
    }

Other annotation parameters are well understood. ErorHandler needs to specify that setting this parameter requires the implementation of an interface, KafkaListener ErrorHandler. And the configuration in the annotations is your custom implementation instance Name in the spring context. For example, the above configuration is errorHandler = myErrorHandler. Then there should be an example of this on-line Spring:

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/31
 */
@Service("myErrorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
    Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        logger.info(message.getPayload().toString());
        return null;
    }
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        logger.info(message.getPayload().toString());
        return null;
    }
}

Manual Ack mode

Manual ACK mode, the business logic controls the submission of offsets. For example, when a program is consuming, it has this semantics, especially if it does not confirm ack in exceptional circumstances, that is, it does not submit an offset, so you can only use manual Ack mode to do it. To turn on manual, you first need to turn off automatic submission, and then set the consumption mode of consumer.

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

With the above settings in place, when consuming, you only need to enter the @KafkaListener listening method into the Acknowledgment and execute it to ack.acknowledge() to represent the submission of the offset.

    @KafkaListener(id = "webGroup", topics = "topic-kl")
    public String listen(String input, Acknowledgment ack) {
        logger.info("input value: {}", input);
        if ("kl".equals(input)) {
            ack.acknowledge();
        }
        return "successful";
    }

@ KafkaListener Annotation Listener Life Cycle

@ The lifecycle of the listener annotated by KafkaListener is controllable. By default, @KafkaListener's parameter autoStartup = true. That is to say, auto-start consumption, but also with KafkaListener Endpoint Registry to intervene in his life cycle. KafkaListener Endpoint Registry has three action methods: start(),pause(),resume()/ start, stop, continue. The following code demonstrates this function in detail.

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
    private final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Autowired
    private KafkaTemplate template;

    @GetMapping("/send/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendFoo(@PathVariable String input) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
        template.send(record);
    }

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @GetMapping("/stop/{listenerID}")
    public void stop(@PathVariable String listenerID){
        registry.getListenerContainer(listenerID).pause();
    }
    @GetMapping("/resume/{listenerID}")
    public void resume(@PathVariable String listenerID){
        registry.getListenerContainer(listenerID).resume();
    }
    @GetMapping("/start/{listenerID}")
    public void start(@PathVariable String listenerID){
        registry.getListenerContainer(listenerID).start();
    }
    @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
    public String listen(String input) {
        logger.info("input value: {}", input);
        return "successful";
    }
}

In the above code, listenerID is the id value "webGroup" in @KafkaListener. After the project is started, you can see the effect by executing the following URLs separately.

Send a message first: http://localhost 8081/send/ckl. Because autoStartup = false, you don't see messages coming into the listener.

Then start the listener: http://localhost 8081/start/webGroup. You can see a message coming in.

The effect of pausing and continuing consumption can be tested in a similar way.

SendTo message forwarding

The @SendTo annotation can also take a parameter to specify the Topic queue for forwarding, in addition to the semantics of the sending response. Common scenarios such as multiple processing of a message and inconsistent resources such as cup for different processing costs can be solved by using different Topics and consumer s deployed on different hosts. Such as:

    @KafkaListener(id = "webGroup", topics = "topic-kl")
    @SendTo("topic-ckl")
    public String listen(String input) {
        logger.info("input value: {}", input);
        return input + "hello!";
    }

    @KafkaListener(id = "webGroup1", topics = "topic-ckl")
    public void listen2(String input) {
        logger.info("input value: {}", input);
    }

Application of Message Retry and Dead Letter Queue

In addition to the manual Ack mode mentioned above to control message offset, Spring-kafka also encapsulates the semantics of retrievable consumer messages, that is, it can be set to retry the message when the consumer data is abnormal. And you can set the number of retries to allow messages to enter a predetermined Topic. That's in the dead letter queue. The following code demonstrates this effect:

    @Autowired
    private KafkaTemplate template;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        //Maximum retry three times
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
        return factory;
    }

    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        template.send("topic-kl", input);
    }

    @KafkaListener(id = "webGroup", topics = "topic-kl")
    public String listen(String input) {
        logger.info("input value: {}", input);
        throw new RuntimeException("dlt");
    }

    @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
    public void dltListen(String input) {
        logger.info("Received from DLT: " + input);
    }

The application above triggers runtime exceptions when it hears messages in topic-kl, and then the listener tries to make three calls after reaching the maximum number of retries. The message will be dropped and retried in the dead letter queue. The Topic rule of the Dead Letter Queue is that the business Topic name +". DLT". If the name of the business Topic above is "topic-kl", then the Topic of the corresponding dead letter queue is "topic-kl.DLT"

Concluding remarks

Recently, Kafka has been used in business, so it systematically explores various uses of Spring-kafka, and discovers many interesting and cool features, such as an annotation to open embedded Kafka services, sending \ response semantic calls, transactional messages and so on, like RPC calls. Hopefully, this blog will help those who are using Spring-kafka or are about to use Spring-kafka to take fewer detours and step on fewer pits.

Keywords: Java kafka Spring Apache Scala

Added by atholon on Wed, 28 Aug 2019 13:18:55 +0300