Spring cloud stream (message driven)
Official website structure
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-b4rsklhu-1629185440989) (C: \ users \ RLB \ appdata \ roaming \ typora \ user images \ image-20210812224707690. PNG)]
form | explain |
---|---|
Middleware | Middleware. At present, it only supports RabbitMQ and Kafka |
Binder | Binder is the encapsulation of application and message oriented middleware. At present, it only supports RabbitMQ and Kafka. Binder can easily link message oriented middleware and dynamically the message type of the table (Topic of Kafka or exchange of RabbitMQ). These can be realized through configuration |
@Input | The annotation identifies the input through which messages can be published to the application |
@Output | The annotation identifies the output through which messages published to the application can be sent out |
@StreamListener | Listening queue is used for receiving messages from the consumer's queue |
@EnableBinding | It refers to the binding of channel and exChange |
What is it?
Official definition: Spring Cloud Stream is a message driven framework (similar to jdbc in java)
Spring Cloud Stream interacts with inputs, outputs and binder through our configuration, and the binder of Spring Cloud Stream interacts with the message oriented middleware, so we only need to figure out how to interact with Spring Cloud Stream
What can I do?
In different environments and different message oriented middleware, we can achieve non perceptual dynamic switching, because Binder of Spring Cloud Stream encapsulates message oriented middleware, which makes the development of micro services highly decoupled. We can pay more attention to business development and reduce the difficulty of programmers' learning.
Standard flow chart of Spring Cloud Stream
[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-4aGtvQc7-1629185440992)(image-20210816161720275.png)]
Simulate the flow in the figure above to complete the implementation of the code
Server:
//pom.xml <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.6.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> // application.yml server: port: 8801 spring: application: name: cloud-stream-provider8801 cloud: stream: binders: # Configure the service information of rabbitmq to bind here defaultRabbit: # Represents the name of the definition, which is used for binding integration type: rabbit # Message component type environment: # Set the environment configuration related to rabbitmq spring: rabbitmq: # Related configurations of rabbitmq host: localhost port: 5672 username: guest password: guest bindings: # Service integration output: # Output channel name destination: studyExchange # Indicates the name definition to use for Exchange content-type: application/json # Set the message type, json this time, and set the text to "text/plain" eureka: client: #Indicates whether to register yourself with EurekaServer register-with-eureka: true #Do you want to grab registration information from EurekaServer fetch-registry: true service-url: #Cluster version #defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ defaultZone: http://eureka7001.com:7001/eureka/ instance: #The time interval between Eureka client sending heartbeat to the server, in seconds (30s by default) lease-renewal-interval-in-seconds: 1 #The upper limit of waiting time of Eureka server after receiving the last heartbeat, in seconds (the default is 90s). If it times out, the service will be excluded lease-expiration-duration-in-seconds: 2 instance-id: send-8801.com #Displays the host name in the information list prefer-ip-address: true # The access path becomes an IP address //service interface public interface MessageProvider { String send(); } //Implementation class @EnableBinding(Source.class) //Define message push channel @Slf4j public class MessageProviderImpl implements MessageProvider { @Autowired private MessageChannel output; //Define the message output pipeline name to be consistent with the output pipeline name defined in yml @Override public String send() { String uuid = IdUtil.simpleUUID(); //Randomly generated uuid boolean send =output.send(MessageBuilder.withPayload(uuid).build()); //Send the sent content to the queue log.info("****** Send successfully:"+send); log.info("****** Sent uuid: "+uuid); return null; } } //controller for sending messages @RestController public class SendMessageController { @Resource private MessageProvider messageProvider; @GetMapping(value="/send") public void send(){ messageProvider.send(); } }
client:
//pom.xml <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.6.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> // application.yml server: port: 8802 spring: application: name: cloud-stream-consumer8802 cloud: stream: binders: # Configure the service information of rabbitmq to bind here defaultRabbit: # Represents the name of the definition, which is used for binding integration type: rabbit # Message component type environment: # Set the environment configuration related to rabbitmq spring: rabbitmq: # Related configurations of rabbitmq host: localhost port: 5672 username: guest password: guest bindings: # Service integration input: # Enter channel name destination: studyExchange # Indicates the name definition to use for Exchange content-type: application/json # Set the message type, json this time, and set the text to "text/plain" eureka: client: #Indicates whether to register yourself with EurekaServer register-with-eureka: true #Do you want to grab registration information from EurekaServer fetch-registry: true service-url: #Cluster version #defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ defaultZone: http://eureka7001.com:7001/eureka/ instance: #The time interval between Eureka client sending heartbeat to the server, in seconds (30s by default) lease-renewal-interval-in-seconds: 1 #The upper limit of waiting time of Eureka server after receiving the last heartbeat, in seconds (the default is 90s). If it times out, the service will be excluded lease-expiration-duration-in-seconds: 2 instance-id: receive-8802.com #Displays the host name in the information list prefer-ip-address: true # The access path becomes an IP address //controller receiving message @RestController @EnableBinding(Sink.class) //Define message receiving channel @Slf4j public class ReceiveMessageController8802 { @Value("${server.port}") private String port; @StreamListener(Sink.INPUT) //It is used to listen for the reception of message queue public void input(Message<String> message){// Receive messages via message log.info("Consumer 8802--->The message received is"+message.getPayload()); } } //Multiple clients can be defined to receive messages, and the configuration code is the same
When the server starts sending messages, the client listens to the queue to get messages
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-MYHnohqZ-1629185440993)(image-20210817100622574.png)]
Message repeated consumption problem
When different groups are getting messages
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-9bCvk4Mj-1629185440994)(image-20210817142037769.png)]
[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-VT0ZpW7d-1629185440995)(image-20210817142210502.png)]
We are divided into two groups to start the server to send messages on behalf of the two clients
[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-ZIzRN1aD-1629185440996)(image-20210817142609226.png)]
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-ahkifqy-1629185440996) (image-20210817142622791. PNG)]
When packets are different, messages can be consumed repeatedly
Now let's change the group groups of the two clients to the same receive to see the results (if the request is sent twice, if it is sent once, one of them will not receive the message, and the polling method is used to receive the message)
[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-sSYG7qng-1629185440996)(image-20210817142948541.png)]
[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-njtuL0MT-1629185440997)(image-20210817143006496.png)]
It is found that messages are no longer consumed repeatedly (whether to solve the problem of duplicate messages depends on the actual business needs)
Persistence of Stream
The problem of Stream persistence. When we remove the group configuration from the yml configuration of the 8802 client, and retain the group configuration in the yml of the 8803 client, now I stop all clients, restart the server, send four requests, and then start 8802. If we find that no message is received, start 8803, During startup, it is found that all four requests from the server are received, indicating that the group not only solves the problem of repeated message reception, but also solves the problem of message persistence.
9185440997)]
Find that the message is not being consumed repeatedly (whether to solve the problem of duplicate message, according to the actual business requirements)
Persistence of Stream
The problem of Stream persistence. When we remove the group configuration from the yml configuration of the 8802 client, and retain the group configuration in the yml of the 8803 client, now I stop all clients, restart the server, send four requests, and then start 8802. If we find that no message is received, start 8803, During startup, it is found that all four requests from the server are received, indicating that the group not only solves the problem of repeated message reception, but also solves the problem of message persistence.