Let's talk about spring cloud stream and kafka.

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

spring cloud stream is a man who intends to unify the message middleware in the backyard. He has flexible skills. Behind him is spring, which can make 18 weapons (message subscription mode, consumer group, static partitions, etc.). At present, there are kafka and rabbitMQ in the backyard.

Gossip Party: today, let's dig into the relationship between spring cloud stream and kafka. rabbitMQ will let her stay in the cold palace.

1. kafka

Apache Kafka® is a distributed streaming platform. What exactly does that mean?

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

My mother is a flow processing platform. She can do a lot of work.

  • Ability to process publish / subscribe messages
  • Save messages in a stable way
  • Deal with it as soon as it comes. It's really fast.

In conclusion, it is fast, stable and accurate.

The operation of kafka is very simple. Here Download, and then run zookeeper first. In the latest kafka download package, there is also a zookeeper, which can be used directly. After the startup of zookeeper, you need to configure the ip and port of zookeeper in the configuration file of kafka. The configuration file is config/server.properties.

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

Then run the command in bin directory and start kafka.

bin/kafka-server-start.sh -daemon config/server.properties

2. kafka Manager

Although kafka has started, we need to know what she said. We still need a manager to report the situation. I use kafka manager here. The download address is Here . It's a pity that only the source code is downloaded. There is no runnable version. You need to compile it by yourself. The compilation speed is quite slow. I'll provide you a compiled version. Click Here.

kafka manager also needs to configure the relationship with kafka. In the conf/application.conf file, it is not kafka itself, but the attached zookeeper of kafka.

kafka-manager.zkhosts="localhost:2181"

Then start bin / Kafka Manager (Kafka manager.bat can also be run in windows)

There is a pit here. If you run it under windows, it may fail to start. You will be prompted that the input line is too long.

This is because the directory is too long. Shorten the name of kafak-manager-2.0.0.2 directory to run normally.

After startup, fill in the address port of zookeeper host through Add Cluster. The version of kafka must match the version of kafka you are using. Otherwise, you may not see the content of kafka.

Then we can see the broker, topic, consumers, partitions and other information of kafka.

3. spring cloud stream

The starting point of everything is still start.spring.io

This black interface is what spring does for Halloween. Related to us are the two dependencies on the right, which correspond to these in pom.xml.

<dependencies>
        <dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
		</dependency>
        <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-test-support</artifactId>
			<scope>test</scope>
		</dependency>
</dependencies>
<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

But these are not enough. If you run it directly, you will be prompted.

Caused by: java.lang.IllegalStateException: Unknown binder configuration: kafka

You also need to add a dependency package

        <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>

4. Send message, biubiu

After the spring cloud stream project framework is set up, we need to divide it into two parts, one is the sending part and the other is the receiving part. Let's first look at the message sending part. First, the configuration file, application.yml.

spring:
  cloud:
    stream:
      default-binder: kafka #Default binder,
      kafka: #If you use rabbit MQ, fill rabbit here.
        binder:
          brokers: #Kafka's message middleware server address
          - localhost:9092
      bindings:
        output: #Channel name
          binder: kafka
          destination: test1 #Destination to which the message is sent, corresponding to topic
          group: output-group-1 #group corresponding to kafka
          content-type: text/plain #Format of message

Note that the output here indicates that the message is published, which corresponds to the later subscription message. The name of this output is the message channel name, which can be customized, as will be mentioned later.

Then we need to create a publisher

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
public class Producer {
	private Source mySource;

	public Producer(Source mySource) {
		super();
		this.mySource = mySource;
	}

	public Source getMysource() {
		return mySource;
	}

	public void setMysource(Source mysource) {
		mySource = mySource;
	}
}

@EnableBinding literally knows that it is a binding channel. The bound channel name is the above output, and the south.class is provided by spring. It means that this is a binding publishing channel. Its channel name is output, which corresponds to the output in application.yml.

The source code can be seen clearly

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * Bindable interface with one output channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Source {

	/**
	 * Name of the output channel.
	 */
	String OUTPUT = "output";

	/**
	 * @return output channel
	 */
	@Output(Source.OUTPUT)
	MessageChannel output();

}

If we need to define our own channel, we can write a class ourselves. For example, the channel name is changed to my out.

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  
public interface MySource {
    String INPUT = "my-in";
    String OUTPUT = "my-out";
    @Input(INPUT)
    SubscribableChannel myInput();
    @Output(OUTPUT)
    MessageChannel myOutput();
}

In this case, application.yml will be changed.

        my-out:
          binder: kafka
          destination: mytest #Destination to which the message is sent, corresponding to topic
          group: output-group-2 #group corresponding to kafka
          content-type: text/plain #Format of message

The @ EnableBinding of Product.class also needs to be changed. In order to do this, I wrote another MyProducer.

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(MySource.class)
public class MyProducer {
	private MySource mySource;

	public MyProducer(MySource mySource) {
		super();
		this.mySource = mySource;
	}

	public MySource getMysource() {
		return mySource;
	}

	public void setMysource(MySource mysource) {
		mySource = mySource;
	}
}

In this way, the part of publishing the message is finished. Let's write a controller to send the message.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.wphmoon.kscs.service.ChatMessage;
import com.wphmoon.kscs.service.MyProducer;
import com.wphmoon.kscs.service.Producer;

@RestController
public class MyController {
	@Autowired
	private Producer producer;
	@Autowired
	private MyProducer myProducer;

	


// get the String message via HTTP, publish it to broker using spring cloud stream
	@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
	public String publishMessageString(@RequestBody String payload) {
// send message to channel output
		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
		return "success";
	}
	@RequestMapping(value = "/sendMyMessage/string", method = RequestMethod.POST)
	public String publishMyMessageString(@RequestBody String payload) {
// send message to channel myoutput
		myProducer.getMysource().myOutput().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
		return "success";
	}
}

Simply call producer and send a string. I use postman to initiate this action.

The message has been sent. How can we receive it? Look down.

5. Receive the news. Come here.

Similarly, we use the previous spring cloud stream project framework as the message receiving part, first of all, the application.yml file.

server:
  port: 8081
spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:9092
      bindings:
        input:
         binder: kafka
         destination: test1
         content-type: text/plain
         group: input-group-1
        my-in:
         binder: kafka
         destination: mytest
         content-type: text/plain
         group: input-group-2

The focus is on input and my in, which correspond to the previous output and my out one by one.

The default corresponding to the Source class is Sink, which is officially provided. The code is as follows

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * Bindable interface with one input channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Sink {

	/**
	 * Input channel name.
	 */
	String INPUT = "input";

	/**
	 * @return input channel.
	 */
	@Input(Sink.INPUT)
	SubscribableChannel input();

}

Call its class Consumer to receive messages. The code is as follows

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(Sink.class)
public class Consumer {
	private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

	@StreamListener(target = Sink.INPUT)
	public void consume(String message) {
		logger.info("recieved a string message : " + message);
	}

	@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'")
	public void handle(@Payload ChatMessage message) {
		final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
				.withZone(ZoneId.systemDefault());
		final String time = df.format(Instant.ofEpochMilli(message.getTime()));
		logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
	}
}

The codes of MySink and MyConsumer are as follows:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {
	String INPUT = "my-in";
    @Input(INPUT)
    SubscribableChannel myInput();
}

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(MySink.class)
public class MyConsumer {
	private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class);

	@StreamListener(target = MySink.INPUT)
	public void consume(String message) {
		logger.info("recieved a string message : " + message);
	}

	@StreamListener(target = MySink.INPUT, condition = "headers['type']=='chat'")
	public void handle(@Payload ChatMessage message) {
		final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
				.withZone(ZoneId.systemDefault());
		final String time = df.format(Instant.ofEpochMilli(message.getTime()));
		logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
	}
}

So it's OK. When we send a message with postman, we can see it in the log directly.

2019-10-29 18:42:39.455  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.MyConsumer        : recieved a string message : What do you look at?
2019-10-29 18:43:17.017  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.Consumer          : recieved a string message : What do you look at?

6. Take a look in Kafka Manager

The destination defined in application.yml is kafka topic, which can be seen in kafka manager topic list.

The consumer receiving the message can also see

This is the love between spring cloud stream and kafka. However, how could their political marriage be so simple? We will talk about the complicated part later. Please look forward to The Return of the King.

Source code address

Keywords: Programming kafka Spring Zookeeper Java

Added by someberry on Wed, 30 Oct 2019 04:57:01 +0200