Practice of spring boot integrating Redis to realize publish and subscribe function

1, Project structure

I'll use it first SpringBoot Initializer It is more convenient to create a simple Demo and then modify it on the Demo. The project structure is shown in the figure below:

The project structure is also very simple

  • PrintMessageListener is responsible for processing subscription messages. I just print the received Redis information;
  • AdminController is responsible for inputting the url from the browser to realize dynamic subscription / unsubscribe and publish;
  • RedisConfiguration is probably the most important. You need to be responsible for injecting the following beans into the Spring container:
    • RedisTemplate: you can publish messages by calling its convertAndSend(channel, Object message) method;
    • Redismessagelistener container, you can subscribe to messages by calling its addmessagelistener (messagelistener listener, topic) method; On the contrary, you can also call its removemessagelistener (messagelistener listener, topic) method to unsubscribe from the message;
  • PubsubApplication is the startup class of SpringBoot;
  • logback. The XML configuration content can be referenced This article

PS: as Maven project, there must be POM XML, which is not reflected in the picture, so let me add.

2, Maven dependency

Dependencies to be introduced into the project include:

  • Spring boot starter Web: help us start a web server;
  • Spring boot starter data Redis: help us integrate Redis;
  • lombok: it is convenient for us to use @ Slf4j/@Data and simplify the code;
  • Slf4j API: allows us to use classes such as Logger and LoggerFactory;
  • Logback classic: let's actually print out the log.

Complete POM XML file:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.5.3</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <groupId>com.example.demo</groupId>
  <artifactId>pubsub</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>pubsub</name>
  <description>Demo project for Spring Boot</description>

  <properties>
    <java.version>1.8</java.version>
    <slf4j.version>1.7.32</slf4j.version>
    <logback.version>1.2.6</logback.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>${logback.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
	<configuration>
	  <excludes>
	    <exclude>
	      <groupId>org.projectlombok</groupId>
	      <artifactId>lombok</artifactId>
            </exclude>
	  </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

3, Message listening

After receiving the published message, we need to process logic, which is written in PrintMessageListener:

package com.example.demo.pubsub.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * Function Description: print the received Redis information
 *
 * @author geekziyu
 * @version 1.0.0
 */
@Slf4j
public class PrintMessageListener implements MessageListener {

    private StringRedisSerializer stringRedisSerializer;

    public PrintMessageListener(StringRedisSerializer stringRedisSerializer) {
        this.stringRedisSerializer = stringRedisSerializer;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = stringRedisSerializer.deserialize(message.getChannel());
        String body = stringRedisSerializer.deserialize(message.getBody());
        handleMessage(channel, body);
    }

    private void handleMessage(String channel, String body) {
        log.info("consumption Redis news\n channel:{}\n body:{}", channel, body);
    }
}

4, Redis configuration

As mentioned earlier, if we want to use the API provided in Spring boot starter data Redis to publish and subscribe to Redis messages, we need to use RedisTemplate and RedisMessageListenerContainer. Now let's inject them into the Spring container:

package com.example.demo.pubsub.config;

import com.example.demo.pubsub.listener.PrintMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * Function Description: Redis configuration
 *
 * @author geekziyu
 * @version 1.0.0
 */
@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer result = new RedisMessageListenerContainer();
        result.setConnectionFactory(redisConnectionFactory);

        return result;
    }

    @Bean("redisTemplate")
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, String> result = new RedisTemplate<>();
        result.setConnectionFactory(factory);

        result.setKeySerializer(stringRedisSerializer());
        result.setHashKeySerializer(stringRedisSerializer());

        result.setValueSerializer(stringRedisSerializer());
        result.setHashValueSerializer(stringRedisSerializer());
        return result;
    }

    @Bean
    public PrintMessageListener printMessageListener() {
        return new PrintMessageListener(stringRedisSerializer());
    }

    @Bean
    public StringRedisSerializer stringRedisSerializer() {
        return new StringRedisSerializer();
    }
}

The following points need to be noted:
First, if setConnectionFactory(RedisConnectionFactory) is not called and a connection factory is set for RedisMessageListenerContainer, a null pointer exception will occur when calling addMessageListener to execute subscription. The specific location of the exception is shown in the following figure:

Second, if you do not call the setConnectionFactory method of RedisTemplate to set the Redis connection factory, an exception will occur at startup, as shown in the following figure:

// Description RedisConnectionFactory is required for RedisTemplate!
Caused by: java.lang.IllegalStateException: RedisConnectionFactory is required
	at org.springframework.util.Assert.state(Assert.java:76)
	at org.springframework.data.redis.core.RedisAccessor.afterPropertiesSet(RedisAccessor.java:38)
	at org.springframework.data.redis.core.RedisTemplate.afterPropertiesSet(RedisTemplate.java:128)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)

5, Subscribe to publications via HTTP request

Here, I use AdminController to accept publish and subscribe / unsubscribe requests. The source code is as follows:

package com.example.demo.pubsub.controller;

import com.example.demo.pubsub.listener.PrintMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * Function Description: background controller
 *
 * @author geekziyu
 * @version 1.0.0
 */
@RestController
@RequestMapping("/admin")
public class AdminController {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private RedisMessageListenerContainer container;

    private Map<String, MessageListener> registeredListener = new HashMap<>();

    @Autowired
    private StringRedisSerializer stringRedisSerializer;


    @GetMapping("/pub")
    public String publish(String channel, String body) {
        redisTemplate.convertAndSend(channel, body);
        return "ok";
    }

    @GetMapping("/sub")
    public String subscribe(String channel) {
        MessageListener listener = registeredListener.computeIfAbsent(channel, ch -> new PrintMessageListener(stringRedisSerializer));
        container.addMessageListener(listener, new ChannelTopic(channel));
        return "ok";
    }

    @GetMapping("/unsub")
    public String unsubscribe(String channel) {
        MessageListener messageListener = registeredListener.get(channel);
        if (messageListener != null) {
            container.removeMessageListener(messageListener, new ChannelTopic(channel));
        }
        return "ok";
    }

}

6, Print log

In order to output logs smoothly on the console, you may need logback Complete code of XML:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="info">
    <appender-ref ref="STDOUT"/>
  </root>
</configuration>

Section

In this way, we can implement publish and subscribe.

First subscribe:

http://localhost:8080/admin/sub?channel=dream

Release again:

http://localhost:8080/admin/pub?channel=dream&body=engineer

Check the console and Redis message consumption is successful:

Note that your application The default connection of Redis in properties is localhost:6379:

spring.redis.host=localhost
spring.redis.port=6379

You need to ensure that Redis has been started locally and the service port is 6379. If you are not familiar with how to build Redis, you need to modify Redis and connect to an available Redis service.

Reference documents

Spring boot integrates Redis to realize message publishing and subscription

Keywords: Redis Spring

Added by DeadEvil on Sat, 15 Jan 2022 17:30:49 +0200