Using websocket (redis subscription) in nginx distributed cluster

Preface

1.1 principle

The core of websocket is to store the session information of successful handshake into static variables. However, there are many servers in the distributed system, Tomcat, which use nginx to forward to different tomcat, so it can not guarantee that this Tomcat stores the session information of successful handshake. Therefore, you can use the redis subscription function. When sending messages, you can use redis subscription to send them to the channel that has already subscribed to To find out whether there is session information of the receiver from each server, send the message if there is one.
Spring boot has been encapsulated well. This article uses the common development framework to deal with it

Be careful

The implementation scenario is nginx distributed environment, and the session sharing of the system has been realized.
session sharing is not described in this article. The code is not elegant enough, but the principle is

2. Subscription channel, config for receiving messages

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.MultiServerUserRegistry;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.msunsoft.common.utils.Constants;
import com.msunsoft.websocket.oaNotify.hand.MessageReceiver;
import com.msunsoft.websocket.oaNotify.hand.MessageReceiver2;
import com.msunsoft.websocket.oaNotify.interceptor.WebSocketInterceptor;

@Configuration
public class RedisObserverConfig {
	//Channels subscribed to notification announcement
//	public static final String TOPIC_OA_NOTIFY = "websocket:oa_notify";

//    @Autowired
//    private JedisConnectionFactory jedisConnectionFactory;
	
  @Autowired
  private LettuceConnectionFactory jedisConnectionFactory;
    //Thread call
    @Resource(name = "taskExecutor")
    private TaskExecutor          taskExecutor;
 
   /**
    * Add listening, execute when receiving channel message (write by default)
    * @return
    */
   
//    @Bean
//    MessageListenerAdapter messageListener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {
//        return new MessageListenerAdapter(oaNotifyListener());
//    }
    
//    public SimpUserRegistry userRegistry() {
//    	SimpUserRegistry userRegistry = new MultiServerUserRegistry(userRegistry);
//    }
    /**
     * Using Jackson to serialize objects
     */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(objectMapper);

        return serializer;
    }
    /**
     * RedisTemplate
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory jedisConnectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(jedisConnectionFactory);

        //Serialize KEY by string
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);

        //VALUE serialization in JSON mode
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
    /**
     * Message listener, message received
     */
    @Bean
    MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
        //Message receiver and corresponding default processing method
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
        //How to deserialize a message
        messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);

        return messageListenerAdapter;
    }
    //Subscribed Channels
    @Bean
    RedisMessageListenerContainer redisContainer( MessageListenerAdapter messageListenerAdapter) {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(jedisConnectionFactory);
        container.addMessageListener(messageListenerAdapter, new PatternTopic(Constants.TOPIC_OA_NOTIFY));
//        container.setTaskExecutor(Executors.newFixedThreadPool(4));
        container.setTaskExecutor(taskExecutor);
        return container;
    }
    //For sending messages
//    @Bean
//    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
//    	return new StringRedisTemplate(connectionFactory);
//    }
//    @Bean
//    SimpMessagingTemplate messageTemp(MessageChannel messageChannel) {
//    	return new SimpMessagingTemplate(messageChannel);
//    }
/**
 * Subscribed Channels
 * @return
 */
//    @Bean
//    ChannelTopic orderFoodTopic() {
//        return new ChannelTopic(Constants.TOPIC_OA_NOTIFY);
//    }

  
}

3. Method of execution after receiving message

import org.springframework.stereotype.Component;

import com.msunsoft.websocket.oaNotify.entity.RedisWebsocketMsg;

@Component
public interface  MessageReceiver {
	public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg);
}

Four websocket handshake and the interface of receiving message

import java.io.IOException;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggerFactoryBinder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import com.msunsoft.common.utils.Constants;
import com.msunsoft.common.utils.JsonUtils;
import com.msunsoft.common.utils.SpringUtils;
import com.msunsoft.modules.sys.entity.User;
import com.msunsoft.modules.sys.utils.UserUtils;
import com.msunsoft.websocket.oaNotify.entity.RedisWebsocketMsg;
import com.msunsoft.websocket.oaNotify.hand.MessageReceiver;
import com.msunsoft.websocket.oaNotify.service.RedisService;

import net.sf.json.JSONArray;

//The function of ServerEndpoint is to define the current class as a websocket server. The value of the annotation will be used to listen for the terminal access URL address of the user's connection.
@ServerEndpoint(value = "/socket/{userId}")
@Component
//public class WebSocketServer {
public class WebSocketServer implements MessageReceiver{ //implements WebSocketConfigurer
//Log using slf4j
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);

//Used to record the current number of online connections
private static int onLineCount = 0;
/**
 * Because @ ServerEndpoint does not support injection, use SpringUtils to get IOC instance
 */

// private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
private RedisService redisService = SpringUtils.getBean(RedisService.class);

private ThreadPoolTaskExecutor threadPool =  SpringUtils.getBean("taskExecutor")  ;
//Used to store WebSocketServer objects corresponding to each client
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<String, WebSocketServer>();

//The connection session of a client, through which data needs to be sent to the client
private Session session;

//id address of the client
private String userId;


/**
 * The connection is established successfully. The method called corresponds to onOpen on the foreground page
 * @param ip ip address
 * @param session Conversation
 */
@OnOpen
public void onOpen(@PathParam("userId")String userId,Session session){
    //Custom logic implementation according to business
    //If this user information is added in other tomcat, it will not be added again
    if(null != webSocketMap && null !=webSocketMap.get(userId)) {//If the user information added by this tomcat is no longer added to redis
    	
    }else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, userId) &&(null == webSocketMap || null == webSocketMap.get(userId))) {
    	onMessage("close", session);
    	//Put session in this server
    	this.session = session;
        this.userId = userId;
        webSocketMap.put(userId,this);  //Put the current object in the map
    }else {
    	this.session = session;
        this.userId = userId;
        webSocketMap.put(userId,this);  //Put the current object in the map
        addOnLineCount();  //Number of people online plus one
        LOGGER.info("New connection joined,ip:{}!Current online population:{}",userId,getOnLineCount());
        //Save information to redis
        redisService.addToSet(Constants.REDIS_WEBSOCKET_USER_SET, userId);
    }
    
    
}
/**
 * The method of the connection close call corresponds to the onClose of the foreground page
 * @param ip
 */
@OnClose
public void onClose(@PathParam("userId")String userId){
	//Remove users from Redis
    webSocketMap.remove(userId);  //Remove WebSocketServer object according to ip(key)
    redisService.removeFromSet(Constants.REDIS_WEBSOCKET_USER_SET, userId);
    subOnLineCount();
    LOGGER.info("WebSocket Close,ip:{},Current online population:{}",userId,getOnLineCount());
}

/**
 * The method called when the server receives the message sent by the client corresponds to the onMessage of the foreground page
 * @param message
 * @param session
 */
@OnMessage
public void onMessage(String message,Session session){
    //Custom logic implementation according to business
    LOGGER.info("Message received from client:{}",message);
    sendMessage(message,session);//Retain
}

/**
 * Called when an error occurs, corresponding to the onError of the foreground page
 * @param session
 * @param error
 */
@OnError
public void onError(Session session,Throwable error){
    LOGGER.error("WebSocket Error occurred");
    error.printStackTrace();
}


/**
 * Send message to current user
 * @param message
 */
public void sendMessage(String message,Session session){
    try{
        //getBasicRemote() is to send messages synchronously. getAsyncRemote() is recommended to be used asynchronously
        session.getBasicRemote().sendText(message);
    }catch (IOException e){
        e.printStackTrace();
        LOGGER.info("Sending data error:,ip:{},message:{}",userId,message);
    }
}
/**
     *    Send message to specified user
 * @param message
 */
public void sendMessageToId(String message,String id){
	Session session = webSocketMap.get(id).session;
    sendMessage(message,session);
}
/**
 * Send messages to all users
 * @param message
 */
public static void sendMessageAll(final String message){
    //The reason why you use entrySet instead of keySet is that entrySet embodies the mapping relationship of map and gets data faster through traversal.
    Set<Map.Entry<String, WebSocketServer>> entries = webSocketMap.entrySet();
    for (Map.Entry<String, WebSocketServer> entry : entries) {
        final WebSocketServer webSocketServer = entry.getValue();
        //Threads are used to control the sending of messages, which is more efficient.
        new Thread(new Runnable() {
            public void run() {
                webSocketServer.sendMessage(message,webSocketServer.session);
            }
        }).start();
    }
}

/**
 * Get the current number of connections
 * @return
 */
public static synchronized int getOnLineCount(){
    return WebSocketServer.onLineCount;
}

/**
 * When there is a new user connection, the number of connections is increased by 1
 */
public static synchronized void addOnLineCount(){
    WebSocketServer.onLineCount++;
}

/**
 * When disconnected, the number of connections is reduced by 1
 */
public static synchronized void subOnLineCount(){
    WebSocketServer.onLineCount--;
}

public Session getSession(){
    return session;
}
public void setSession(Session session){
    this.session = session;
}

/**
 * Receive subscription information
 */
public void receiveMessage(RedisWebsocketMsg rdisWebsocketMsg) {
	// TODO Auto-generated method stub
	Map<String, String> recieveUserMap = rdisWebsocketMsg.getReceiver();
	
	 for (Map.Entry<String, String> m : recieveUserMap.entrySet()) {
		 ThreadSendMsage threadSendMsage = new ThreadSendMsage(m.getKey(),m.getValue(),rdisWebsocketMsg);
		 threadPool.execute(threadSendMsage);
	}
	LOGGER.info("Sending message succeeded!");
}
//Using threads to send messages
public class ThreadSendMsage implements Runnable  {
    private String recieveUserId;
    private String msgId;
    private RedisWebsocketMsg rdisWebsocketMsg;
    
	public ThreadSendMsage(String recieveUserId, String msgId, RedisWebsocketMsg rdisWebsocketMsg) {
		super();
		this.recieveUserId = recieveUserId;
		this.msgId = msgId;
		this.rdisWebsocketMsg = rdisWebsocketMsg;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		rdisWebsocketMsg.setMsgId(msgId);
		sendMessageToUser(recieveUserId,  rdisWebsocketMsg);
	}
	
}
public boolean sendMessageToUser(String recieveUserId, RedisWebsocketMsg rdisWebsocketMsg) {
	WebSocketServer webSocketServer = webSocketMap.get(recieveUserId);
	if(webSocketServer==null) {
		return false;
	}
	LOGGER.info("Enter send message");
	if (!webSocketServer.session.isOpen()) {//When the channel is closed, delete the record information in redis
		LOGGER.info("websocket Channel closure---------------");
		return false;
    }
	try {
		LOGGER.info("Sending message");
		String msg = JsonUtils.objectToJson(rdisWebsocketMsg);
		webSocketServer.sendMessage(msg,webSocketServer.session);
	} catch (Exception e) {
		e.printStackTrace();
	}
    return true;
}

}

5. Entity class po of receiving message

public class RedisWebsocketMsg {
/**
*userId of the message recipient
/
Private map < string, string > receiver; / / key is userID, value is message id
/*
*For the code of the subscription channel corresponding to the message, refer to the code field of {@ link cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum}
*/
private String channelCode;

private String title;//Message header
private String sender;//Sender name
private String msgId;//Message id
private String receiverId;//Recipient's id
/**
 * Message text
 */
private String content;

public RedisWebsocketMsg() {

}

public RedisWebsocketMsg(Map<String, String> receiver, String channelCode, String content) {
    this.receiver = receiver;
    this.channelCode = channelCode;
    this.content = content;
}

public RedisWebsocketMsg(Map<String, String> receiver, String channelCode, String title, String sender, String content) {
	this.receiver = receiver;
	this.channelCode = channelCode;
	this.title = title;
	this.sender = sender;
	this.content = content;
}

public String getReceiverId() {
	return receiverId;
}

public void setReceiverId(String receiverId) {
	this.receiverId = receiverId;
}

public String getMsgId() {
	return msgId;
}

public void setMsgId(String msgId) {
	this.msgId = msgId;
}

public String getTitle() {
	return title;
}

public void setTitle(String title) {
	this.title = title;
}

public String getSender() {
	return sender;
}

public void setSender(String sender) {
	this.sender = sender;
}

public Map<String, String> getReceiver() {
    return receiver;
}

public void setReceiver(Map<String, String> receiver) {
    this.receiver = receiver;
}

public String getContent() {
    return content;
}

public void setContent(String content) {
    this.content = content;
}

public String getChannelCode() {
    return channelCode;
}

public void setChannelCode(String channelCode) {
    this.channelCode = channelCode;
}

@Override
public String toString() {
    return "RedisWebsocketMsg{" +
            "receiver='" + receiver + '\'' +
            ", channelCode='" + channelCode + '\'' +
            ", content=" + content +
            '}';
}

}

6. redis service (for storing and querying, and deleting user handshake information)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author zifangsky
 * @date 2018/7/30
 * @since 1.0.0
 */
@Service("redisService")
public class RedisService  {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void set(String key, Object value) {
        ValueOperations<String, Object> valueOperation = redisTemplate.opsForValue();
        valueOperation.set(key, value);
    }

    public void setWithExpire(String key, Object value, long time, TimeUnit timeUnit) {
        BoundValueOperations<String, Object> boundValueOperations = redisTemplate.boundValueOps(key);
        boundValueOperations.set(value);
        boundValueOperations.expire(time,timeUnit);
    }

    public <K> K get(String key) {
        ValueOperations<String, Object> valueOperation = redisTemplate.opsForValue();

        return (K) valueOperation.get(key);
    }

    public void delete(String key) {
         redisTemplate.delete(key);
    }

//    public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) {
//        //Bind operation
//        BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
//        //Insert data
//        boundValueOperations.leftPushAll(values);
//        //Set expiration time
//        boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
//    }

//    public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) {
//        //Bind operation
//        BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
//        //Insert data
//        boundValueOperations.rightPushAll(values);
//        //Set expiration time
//        boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
//    }

    public List<Object> rangeList(String listKey, long start, long end) {
        //Binding operation
        BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
        //Query data
        return boundValueOperations.range(start, end);
    }

    public void addToSet(String setKey, Object... values) {
        SetOperations<String, Object> opsForSet = redisTemplate.opsForSet();
        opsForSet.add(setKey, values);
    }

    public Boolean isSetMember(String setKey, Object value) {
        SetOperations<String, Object> opsForSet = redisTemplate.opsForSet();

        return opsForSet.isMember(setKey, value);
    }

    public void removeFromSet(String setKey, Object... values) {
        SetOperations<String, Object> opsForSet = redisTemplate.opsForSet();
        opsForSet.remove(setKey, values);
    }

    public void convertAndSend(String channel, Object message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

7. redis configuration information in xml

<?xml version="1.0" encoding="UTF-8"?>

<description>Jedis Configuration</description>

<!-- Load configuration properties file -->
<context:property-placeholder ignore-unresolvable="true" location="classpath:jeesite.properties" />

<bean id="redisHttpSessionConfiguration"
class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration">
<property name="maxInactiveIntervalInSeconds" value="1800" />
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
	<property name="maxIdle" value="300" /> <!-- Maximum ability to maintain idel Number of objects in state  -->
	<property name="maxTotal" value="60000" /> <!-- Maximum number of objects allocated -->
	<property name="testOnBorrow" value="true" /> <!-- When called borrow Object Check the validity of the method -->
</bean>

<bean id="jedisPool" class="redis.clients.jedis.JedisPool">
	<constructor-arg index="0" ref="jedisPoolConfig" />
	<constructor-arg index="1" value="${redis.host}" type="java.lang.String"/>
	<constructor-arg index="2" value="${redis.port}" type="int" />
	<!-- <constructor-arg index="3" value="${redis.timeout}" type="int" />
	<constructor-arg index="4" value="${redis.password}"/>
	<constructor-arg index="5" value="${redis.database}" type="int" />
	<constructor-arg index="6" value="${redis.clientName}"/> -->
</bean>
</beans>

Eight front handshakes

var host = window.location.host;
var hostArray = host.split(":");
host = hostArray[0];
 var port = '<%=port%>';
 var url = window.location.pathname;
 var webApp = url.split('/')[1];
 var url = host+":"+port  +"/"+ webApp +"/socket";
var userId = '${userId}';
function initWebsocket(){
	var webSocket = null;
	var url = document.location.host; 
	if ('WebSocket' in window) {
	   webSocket = new WebSocket("ws://" + url + "/"+userId);
	}
	else if ('MozWebSocket' in window) {
	     webSocket = new MozWebSocket("ws://" + url + "/"+userId);
	}
	else {
	    alert('Not support webSocket');
	}
	// When opening a connection
	webSocket.onopen = function(evnt) {
	  console.log("  websocket.onopen  ");
	  webSocket.send("timing");
	};
	// When a message is received
	webSocket.onmessage = function(evnt) {
		if(open != null){
			dialog.close(open);
			if(evnt.data != ""){
				open = dialog.open({
					width:350,
					height:250,
					shade:false,
					title:"Medical Insurance News Alert",
					position:[400,10,"auto","auto"],
					move:true,
					url:"${path}/jsp/drugAudit/messageReminder.jsp?result="+encodeURIComponent(evnt.data),
					content:""
				});
			}
		}
	};
	webSocket.onerror = function(evnt) {
	  console.log("  websocket.onerror  ");
	};
	webSocket.onclose = function(evnt) {
	  console.log("  websocket.onclose  ");
	};
	//Monitor window closed
	window.onbeforeunload = function (event) {
	    webSocket.close();
	}
} 
Published 14 original articles, won praise 0, visited 281
Private letter follow

Keywords: Session Redis Java socket

Added by Spudgun on Mon, 16 Mar 2020 12:58:54 +0200