Spring cloud stream (message driven)

Spring cloud stream (message driven)

Official website structure

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-b4rsklhu-1629185440989) (C: \ users \ RLB \ appdata \ roaming \ typora \ user images \ image-20210812224707690. PNG)]

formexplain
MiddlewareMiddleware. At present, it only supports RabbitMQ and Kafka
BinderBinder is the encapsulation of application and message oriented middleware. At present, it only supports RabbitMQ and Kafka. Binder can easily link message oriented middleware and dynamically the message type of the table (Topic of Kafka or exchange of RabbitMQ). These can be realized through configuration
@InputThe annotation identifies the input through which messages can be published to the application
@OutputThe annotation identifies the output through which messages published to the application can be sent out
@StreamListenerListening queue is used for receiving messages from the consumer's queue
@EnableBindingIt refers to the binding of channel and exChange

What is it?

Official definition: Spring Cloud Stream is a message driven framework (similar to jdbc in java)

Spring Cloud Stream interacts with inputs, outputs and binder through our configuration, and the binder of Spring Cloud Stream interacts with the message oriented middleware, so we only need to figure out how to interact with Spring Cloud Stream

What can I do?

In different environments and different message oriented middleware, we can achieve non perceptual dynamic switching, because Binder of Spring Cloud Stream encapsulates message oriented middleware, which makes the development of micro services highly decoupled. We can pay more attention to business development and reduce the difficulty of programmers' learning.

Standard flow chart of Spring Cloud Stream

[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-4aGtvQc7-1629185440992)(image-20210816161720275.png)]

Simulate the flow in the figure above to complete the implementation of the code

Server:

//pom.xml
<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

// application.yml
server:
  port: 8801
spring:
  application:
    name: cloud-stream-provider8801
  cloud:
    stream:
      binders: # Configure the service information of rabbitmq to bind here
        defaultRabbit:  # Represents the name of the definition, which is used for binding integration
          type: rabbit  # Message component type
          environment:  # Set the environment configuration related to rabbitmq
            spring:
              rabbitmq: # Related configurations of rabbitmq
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:  #  Service integration
        output:  # Output channel name
          destination: studyExchange # Indicates the name definition to use for Exchange
          content-type: application/json # Set the message type, json this time, and set the text to "text/plain"
eureka:
  client:
    #Indicates whether to register yourself with EurekaServer
    register-with-eureka: true
    #Do you want to grab registration information from EurekaServer
    fetch-registry: true
    service-url:
      #Cluster version
      #defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
      defaultZone: http://eureka7001.com:7001/eureka/
  instance:
    #The time interval between Eureka client sending heartbeat to the server, in seconds (30s by default)
    lease-renewal-interval-in-seconds: 1
    #The upper limit of waiting time of Eureka server after receiving the last heartbeat, in seconds (the default is 90s). If it times out, the service will be excluded
    lease-expiration-duration-in-seconds: 2
    instance-id: send-8801.com #Displays the host name in the information list
    prefer-ip-address: true  # The access path becomes an IP address

//service interface
public interface MessageProvider {
    String send();
}
//Implementation class
@EnableBinding(Source.class)  //Define message push channel
@Slf4j
public class MessageProviderImpl implements MessageProvider {

    @Autowired
    private MessageChannel output;  //Define the message output pipeline name to be consistent with the output pipeline name defined in yml

    @Override
    public String send() {
        String uuid = IdUtil.simpleUUID();  //Randomly generated uuid
        boolean send =output.send(MessageBuilder.withPayload(uuid).build()); //Send the sent content to the queue
        log.info("****** Send successfully:"+send);
        log.info("****** Sent uuid: "+uuid);
        return null;
    }
}
//controller for sending messages
@RestController  
public class SendMessageController {
    @Resource
    private MessageProvider messageProvider;

    @GetMapping(value="/send")
    public void send(){
        messageProvider.send();
    }
}

client:

//pom.xml
<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
    
// application.yml
server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer8802
  cloud:
    stream:
      binders: # Configure the service information of rabbitmq to bind here
        defaultRabbit:  # Represents the name of the definition, which is used for binding integration
          type: rabbit  # Message component type
          environment:  # Set the environment configuration related to rabbitmq
            spring:
              rabbitmq: # Related configurations of rabbitmq
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:  #  Service integration
        input:  # Enter channel name
          destination: studyExchange # Indicates the name definition to use for Exchange
          content-type: application/json # Set the message type, json this time, and set the text to "text/plain"
eureka:
  client:
    #Indicates whether to register yourself with EurekaServer
    register-with-eureka: true
    #Do you want to grab registration information from EurekaServer
    fetch-registry: true
    service-url:
      #Cluster version
      #defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
      defaultZone: http://eureka7001.com:7001/eureka/
  instance:
    #The time interval between Eureka client sending heartbeat to the server, in seconds (30s by default)
    lease-renewal-interval-in-seconds: 1
    #The upper limit of waiting time of Eureka server after receiving the last heartbeat, in seconds (the default is 90s). If it times out, the service will be excluded
    lease-expiration-duration-in-seconds: 2
    instance-id: receive-8802.com #Displays the host name in the information list
    prefer-ip-address: true  # The access path becomes an IP address

//controller receiving message
@RestController
@EnableBinding(Sink.class)  //Define message receiving channel
@Slf4j
public class ReceiveMessageController8802 {

    @Value("${server.port}")
    private String port;

    @StreamListener(Sink.INPUT) //It is used to listen for the reception of message queue
    public void input(Message<String> message){// Receive messages via message
        log.info("Consumer 8802--->The message received is"+message.getPayload());
    }
}
//Multiple clients can be defined to receive messages, and the configuration code is the same

When the server starts sending messages, the client listens to the queue to get messages

[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-MYHnohqZ-1629185440993)(image-20210817100622574.png)]

Message repeated consumption problem

When different groups are getting messages

[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-9bCvk4Mj-1629185440994)(image-20210817142037769.png)]

[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-VT0ZpW7d-1629185440995)(image-20210817142210502.png)]

We are divided into two groups to start the server to send messages on behalf of the two clients

[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-ZIzRN1aD-1629185440996)(image-20210817142609226.png)]

[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-ahkifqy-1629185440996) (image-20210817142622791. PNG)]

When packets are different, messages can be consumed repeatedly

Now let's change the group groups of the two clients to the same receive to see the results (if the request is sent twice, if it is sent once, one of them will not receive the message, and the polling method is used to receive the message)

[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-sSYG7qng-1629185440996)(image-20210817142948541.png)]

[the external link image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-njtuL0MT-1629185440997)(image-20210817143006496.png)]

It is found that messages are no longer consumed repeatedly (whether to solve the problem of duplicate messages depends on the actual business needs)

Persistence of Stream

The problem of Stream persistence. When we remove the group configuration from the yml configuration of the 8802 client, and retain the group configuration in the yml of the 8803 client, now I stop all clients, restart the server, send four requests, and then start 8802. If we find that no message is received, start 8803, During startup, it is found that all four requests from the server are received, indicating that the group not only solves the problem of repeated message reception, but also solves the problem of message persistence.
9185440997)]

Find that the message is not being consumed repeatedly (whether to solve the problem of duplicate message, according to the actual business requirements)

Persistence of Stream

The problem of Stream persistence. When we remove the group configuration from the yml configuration of the 8802 client, and retain the group configuration in the yml of the 8803 client, now I stop all clients, restart the server, send four requests, and then start 8802. If we find that no message is received, start 8803, During startup, it is found that all four requests from the server are received, indicating that the group not only solves the problem of repeated message reception, but also solves the problem of message persistence.

Added by Timsoft on Tue, 21 Dec 2021 22:49:53 +0200