Spring data redis dynamically switches data sources

Recently, we encountered a troublesome demand. We need a microservice application to access two different Redis clusters at the same time. Generally, we don't use Redis in this way, but the two Redis were originally different business clusters, and now a microservice needs to be accessed at the same time.

In fact, we may encounter similar scenarios during actual business development. For example, Redis read-write separation, which is also a function not provided by spring data Redis. The underlying connection pool, such as lattice or Jedis, provides API s for obtaining read-only connections, but there are two defects:

  • The upper spring data redis does not encapsulate this interface
  • Based on the redis architecture, sentinel mode needs to be configured with sentinel address, and cluster mode needs to be aware of cluster topology. In the cloud native environment, these are hidden by the cloud provider by default, and only one dynamic VIP domain name is exposed.

Therefore, we need to implement a mechanism to dynamically switch Redis connections on the basis of spring data Redis.

Mechanism introduction

The configuration class of spring data Redis is: org springframework. boot. autoconfigure. data. Redis. Redisproperties, you can configure the connection configuration of a single Redis instance or Redis cluster. According to these configurations, a unified RedisConnectionFactory redisconnectionfactory will be generated

The abstract relationship between the spring data redis core interface and the connection behind it is as follows:

From this figure, we can know that we can implement a RedisConnectionFactory that can dynamically return different Redis connections. According to the automatic loading source code of spring data Redis, we can know that all redisconnectionfactories in the framework are @ ConditionalOnMissingBean, That is, we can replace it with our own RedisConnectionFactory.

Realize dynamic data source switching

We can encapsulate a configuration of multiple Redis connections in the outer layer of RedisProperties configuration, that is, MultiRedisProperties:

@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "spring.redis")
public class MultiRedisProperties {
    /**
     * The default connection must be configured. The configured key is default
     */
    public static final String DEFAULT = "default";

    private boolean enableMulti = false;
    private Map<String, RedisProperties> multi;
}

Copy code

This configuration is based on the original configuration, that is, users can use the original configuration or use this multi Redis configuration, that is, spring needs to be configured Redis. enable-multi=true. The key placed in the multi Map is the data source name. Users can specify which Redis to use through this data source name before using RedisTemplate or ReactiveRedisTemplate.

Next, let's implement the multiredisletticconnectionfactory, that is, the RedisConnectionFactory that can dynamically switch Redis connections. The Redis client used in our project is lattice:

public class MultiRedisLettuceConnectionFactory
        implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
    private final Map<String, LettuceConnectionFactory> connectionFactoryMap;

    private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();

    public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {
        this.connectionFactoryMap = connectionFactoryMap;
    }

    public void setCurrentRedis(String currentRedis) {
        if (!connectionFactoryMap.containsKey(currentRedis)) {
            throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration");
        }
        MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);
    }

    @Override
    public void destroy() throws Exception {
        connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);
    }

    private LettuceConnectionFactory currentLettuceConnectionFactory() {
        String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();
        if (StringUtils.isNotBlank(currentRedis)) {
            MultiRedisLettuceConnectionFactory.currentRedis.remove();
            return connectionFactoryMap.get(currentRedis);
        }
        return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);
    }

    @Override
    public ReactiveRedisConnection getReactiveConnection() {
        return currentLettuceConnectionFactory().getReactiveConnection();
    }

    @Override
    public ReactiveRedisClusterConnection getReactiveClusterConnection() {
        return currentLettuceConnectionFactory().getReactiveClusterConnection();
    }

    @Override
    public RedisConnection getConnection() {
        return currentLettuceConnectionFactory().getConnection();
    }

    @Override
    public RedisClusterConnection getClusterConnection() {
        return currentLettuceConnectionFactory().getClusterConnection();
    }

    @Override
    public boolean getConvertPipelineAndTxResults() {
        return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();
    }

    @Override
    public RedisSentinelConnection getSentinelConnection() {
        return currentLettuceConnectionFactory().getSentinelConnection();
    }

    @Override
    public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
        return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);
    }
}
Copy code

The logic is very simple, that is, it provides an interface for setting Redis data sources, java training It is put into ThreadLocal and is only valid for the current time. It is cleared after reading.

Then, register the multiredisletticconnectionfactory as a Bean in our ApplicationContext:

@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)
@Configuration(proxyBeanMethods = false)
public class RedisCustomizedConfiguration {

    /**
     * @param builderCustomizers
     * @param clientResources
     * @param multiRedisProperties
     * @return
     * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration
     */
    @Bean
    public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(
            ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
            ClientResources clientResources,
            MultiRedisProperties multiRedisProperties,
            ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
            ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider
    ) {
        //Read configuration
        Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap();
        Map<String, RedisProperties> multi = multiRedisProperties.getMulti();
        multi.forEach((k, v) -> {
            //This is actually the way that the original source code in the framework uses RedisProperties. In fact, we just wrap a layer outside RedisProperties
            LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration(
                    v,
                    sentinelConfigurationProvider,
                    clusterConfigurationProvider
            );
            LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources);
            connectionFactoryMap.put(k, lettuceConnectionFactory);
        });
        return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);
    }

}
Copy code

unit testing

Let's test and use embedded Redis to start local Redis to realize unit testing. We start two Redis, put different keys in the two Redis, verify whether they exist, test the synchronous interface, multi-threaded call synchronous interface, and multiple asynchronous interfaces without waiting for subscriptions, so as to test the effectiveness:

import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
import redis.embedded.RedisServer;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        "spring.redis.enable-multi=true",
        "spring.redis.multi.default.host=127.0.0.1",
        "spring.redis.multi.default.port=6379",
        "spring.redis.multi.test.host=127.0.0.1",
        "spring.redis.multi.test.port=6380",
})
public class MultiRedisTest {
    //Start two redis
    private static RedisServer redisServer;
    private static RedisServer redisServer2;

    @BeforeAll
    public static void setUp() throws Exception {
        System.out.println("start redis");
        redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();
        redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();
        redisServer.start();
        redisServer2.start();
        System.out.println("redis started");
    }

    @AfterAll
    public static void tearDown() throws Exception {
        System.out.println("stop redis");
        redisServer.stop();
        redisServer2.stop();
        System.out.println("redis stopped");
    }

    @EnableAutoConfiguration
    @Configuration
    public static class App {
    }

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private ReactiveStringRedisTemplate reactiveRedisTemplate;
    @Autowired
    private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;

    private void testMulti(String suffix) {
        //Use the default connection, set "testDefault" + suffix, "testDefault" key value pair
        redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");
        //Use the test connection to set the "testsecond" + suffix "and" testdefault "key value pairs
        multiRedisLettuceConnectionFactory.setCurrentRedis("test");
        redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");
        //Use the default connection to verify that "testDefault" + suffix exists and "testSecond" + suffix does not exist
        Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));
        Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));
        //Use the test connection to verify that "testDefault" + suffix does not exist and that "testSecond" + suffix exists
        multiRedisLettuceConnectionFactory.setCurrentRedis("test");
        Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));
        multiRedisLettuceConnectionFactory.setCurrentRedis("test");
        Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));
    }

    //Single verification
    @Test
    public void testMultiBlock() {
        testMulti("");
    }

    //Multithreaded authentication
    @Test
    public void testMultiBlockMultiThread() throws InterruptedException {
        Thread thread[] = new Thread[50];
        AtomicBoolean result = new AtomicBoolean(true);
        for (int i = 0; i < thread.length; i++) {
            int finalI = i;
            thread[i] = new Thread(() -> {
                try {
                    testMulti("" + finalI);
                } catch (Exception e) {
                    e.printStackTrace();
                    result.set(false);
                }
            });
        }
        for (int i = 0; i < thread.length; i++) {
            thread[i].start();
        }
        for (int i = 0; i < thread.length; i++) {
            thread[i].join();
        }
        Assertions.assertTrue(result.get());
    }

    //reactive interface verification
    private Mono<Boolean> reactiveMulti(String suffix) {
        return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")
                .flatMap(b -> {
                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");
                    return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");
                }).flatMap(b -> {
                    return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
                }).map(b -> {
                    Assertions.assertTrue(b);
                    System.out.println(Thread.currentThread().getName());
                    return b;
                }).flatMap(b -> {
                    return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
                }).map(b -> {
                    Assertions.assertFalse(b);
                    System.out.println(Thread.currentThread().getName());
                    return b;
                }).flatMap(b -> {
                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");
                    return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
                }).map(b -> {
                    Assertions.assertFalse(b);
                    System.out.println(Thread.currentThread().getName());
                    return b;
                }).flatMap(b -> {
                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");
                    return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
                }).map(b -> {
                    Assertions.assertTrue(b);
                    return b;
                });
    }

    //reactive authentication is called multiple times and subscribe is executed, which is multi-threaded in itself
    @Test
    public void testMultiReactive() throws InterruptedException {
        for (int i = 0; i < 10000; i++) {
            reactiveMulti("" + i).subscribe(System.out::println);
        }
        TimeUnit.SECONDS.sleep(10);
    }
}

Keywords: Java Redis Spring Cache

Added by MilesStandish on Mon, 27 Dec 2021 10:25:17 +0200