Recently, we encountered a troublesome demand. We need a microservice application to access two different Redis clusters at the same time. Generally, we don't use Redis in this way, but the two Redis were originally different business clusters, and now a microservice needs to be accessed at the same time.
In fact, we may encounter similar scenarios during actual business development. For example, Redis read-write separation, which is also a function not provided by spring data Redis. The underlying connection pool, such as lattice or Jedis, provides API s for obtaining read-only connections, but there are two defects:
- The upper spring data redis does not encapsulate this interface
- Based on the redis architecture, sentinel mode needs to be configured with sentinel address, and cluster mode needs to be aware of cluster topology. In the cloud native environment, these are hidden by the cloud provider by default, and only one dynamic VIP domain name is exposed.
Therefore, we need to implement a mechanism to dynamically switch Redis connections on the basis of spring data Redis.
Mechanism introduction
The configuration class of spring data Redis is: org springframework. boot. autoconfigure. data. Redis. Redisproperties, you can configure the connection configuration of a single Redis instance or Redis cluster. According to these configurations, a unified RedisConnectionFactory redisconnectionfactory will be generated
The abstract relationship between the spring data redis core interface and the connection behind it is as follows:
From this figure, we can know that we can implement a RedisConnectionFactory that can dynamically return different Redis connections. According to the automatic loading source code of spring data Redis, we can know that all redisconnectionfactories in the framework are @ ConditionalOnMissingBean, That is, we can replace it with our own RedisConnectionFactory.
Realize dynamic data source switching
We can encapsulate a configuration of multiple Redis connections in the outer layer of RedisProperties configuration, that is, MultiRedisProperties:
@Data @NoArgsConstructor @ConfigurationProperties(prefix = "spring.redis") public class MultiRedisProperties { /** * The default connection must be configured. The configured key is default */ public static final String DEFAULT = "default"; private boolean enableMulti = false; private Map<String, RedisProperties> multi; } Copy code
This configuration is based on the original configuration, that is, users can use the original configuration or use this multi Redis configuration, that is, spring needs to be configured Redis. enable-multi=true. The key placed in the multi Map is the data source name. Users can specify which Redis to use through this data source name before using RedisTemplate or ReactiveRedisTemplate.
Next, let's implement the multiredisletticconnectionfactory, that is, the RedisConnectionFactory that can dynamically switch Redis connections. The Redis client used in our project is lattice:
public class MultiRedisLettuceConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory { private final Map<String, LettuceConnectionFactory> connectionFactoryMap; private static final ThreadLocal<String> currentRedis = new ThreadLocal<>(); public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) { this.connectionFactoryMap = connectionFactoryMap; } public void setCurrentRedis(String currentRedis) { if (!connectionFactoryMap.containsKey(currentRedis)) { throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration"); } MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis); } @Override public void destroy() throws Exception { connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy); } @Override public void afterPropertiesSet() throws Exception { connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet); } private LettuceConnectionFactory currentLettuceConnectionFactory() { String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get(); if (StringUtils.isNotBlank(currentRedis)) { MultiRedisLettuceConnectionFactory.currentRedis.remove(); return connectionFactoryMap.get(currentRedis); } return connectionFactoryMap.get(MultiRedisProperties.DEFAULT); } @Override public ReactiveRedisConnection getReactiveConnection() { return currentLettuceConnectionFactory().getReactiveConnection(); } @Override public ReactiveRedisClusterConnection getReactiveClusterConnection() { return currentLettuceConnectionFactory().getReactiveClusterConnection(); } @Override public RedisConnection getConnection() { return currentLettuceConnectionFactory().getConnection(); } @Override public RedisClusterConnection getClusterConnection() { return currentLettuceConnectionFactory().getClusterConnection(); } @Override public boolean getConvertPipelineAndTxResults() { return currentLettuceConnectionFactory().getConvertPipelineAndTxResults(); } @Override public RedisSentinelConnection getSentinelConnection() { return currentLettuceConnectionFactory().getSentinelConnection(); } @Override public DataAccessException translateExceptionIfPossible(RuntimeException ex) { return currentLettuceConnectionFactory().translateExceptionIfPossible(ex); } } Copy code
The logic is very simple, that is, it provides an interface for setting Redis data sources, java training It is put into ThreadLocal and is only valid for the current time. It is cleared after reading.
Then, register the multiredisletticconnectionfactory as a Bean in our ApplicationContext:
@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false) @Configuration(proxyBeanMethods = false) public class RedisCustomizedConfiguration { /** * @param builderCustomizers * @param clientResources * @param multiRedisProperties * @return * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration */ @Bean public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory( ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers, ClientResources clientResources, MultiRedisProperties multiRedisProperties, ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider, ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider ) { //Read configuration Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap(); Map<String, RedisProperties> multi = multiRedisProperties.getMulti(); multi.forEach((k, v) -> { //This is actually the way that the original source code in the framework uses RedisProperties. In fact, we just wrap a layer outside RedisProperties LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration( v, sentinelConfigurationProvider, clusterConfigurationProvider ); LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources); connectionFactoryMap.put(k, lettuceConnectionFactory); }); return new MultiRedisLettuceConnectionFactory(connectionFactoryMap); } } Copy code
unit testing
Let's test and use embedded Redis to start local Redis to realize unit testing. We start two Redis, put different keys in the two Redis, verify whether they exist, test the synchronous interface, multi-threaded call synchronous interface, and multiple asynchronous interfaces without waiting for subscriptions, so as to test the effectiveness:
import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit.jupiter.SpringExtension; import reactor.core.publisher.Mono; import redis.embedded.RedisServer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @ExtendWith(SpringExtension.class) @SpringBootTest(properties = { "spring.redis.enable-multi=true", "spring.redis.multi.default.host=127.0.0.1", "spring.redis.multi.default.port=6379", "spring.redis.multi.test.host=127.0.0.1", "spring.redis.multi.test.port=6380", }) public class MultiRedisTest { //Start two redis private static RedisServer redisServer; private static RedisServer redisServer2; @BeforeAll public static void setUp() throws Exception { System.out.println("start redis"); redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build(); redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build(); redisServer.start(); redisServer2.start(); System.out.println("redis started"); } @AfterAll public static void tearDown() throws Exception { System.out.println("stop redis"); redisServer.stop(); redisServer2.stop(); System.out.println("redis stopped"); } @EnableAutoConfiguration @Configuration public static class App { } @Autowired private StringRedisTemplate redisTemplate; @Autowired private ReactiveStringRedisTemplate reactiveRedisTemplate; @Autowired private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory; private void testMulti(String suffix) { //Use the default connection, set "testDefault" + suffix, "testDefault" key value pair redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault"); //Use the test connection to set the "testsecond" + suffix "and" testdefault "key value pairs multiRedisLettuceConnectionFactory.setCurrentRedis("test"); redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond"); //Use the default connection to verify that "testDefault" + suffix exists and "testSecond" + suffix does not exist Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix)); Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix)); //Use the test connection to verify that "testDefault" + suffix does not exist and that "testSecond" + suffix exists multiRedisLettuceConnectionFactory.setCurrentRedis("test"); Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix)); multiRedisLettuceConnectionFactory.setCurrentRedis("test"); Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix)); } //Single verification @Test public void testMultiBlock() { testMulti(""); } //Multithreaded authentication @Test public void testMultiBlockMultiThread() throws InterruptedException { Thread thread[] = new Thread[50]; AtomicBoolean result = new AtomicBoolean(true); for (int i = 0; i < thread.length; i++) { int finalI = i; thread[i] = new Thread(() -> { try { testMulti("" + finalI); } catch (Exception e) { e.printStackTrace(); result.set(false); } }); } for (int i = 0; i < thread.length; i++) { thread[i].start(); } for (int i = 0; i < thread.length; i++) { thread[i].join(); } Assertions.assertTrue(result.get()); } //reactive interface verification private Mono<Boolean> reactiveMulti(String suffix) { return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault") .flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond"); }).flatMap(b -> { return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix); }).map(b -> { Assertions.assertTrue(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix); }).map(b -> { Assertions.assertFalse(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix); }).map(b -> { Assertions.assertFalse(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix); }).map(b -> { Assertions.assertTrue(b); return b; }); } //reactive authentication is called multiple times and subscribe is executed, which is multi-threaded in itself @Test public void testMultiReactive() throws InterruptedException { for (int i = 0; i < 10000; i++) { reactiveMulti("" + i).subscribe(System.out::println); } TimeUnit.SECONDS.sleep(10); } }