Preface
1.1 principle
The core of websocket is to store the session information of successful handshake into static variables. However, there are many servers in the distributed system, Tomcat, which use nginx to forward to different tomcat, so it can not guarantee that this Tomcat stores the session information of successful handshake. Therefore, you can use the redis subscription function. When sending messages, you can use redis subscription to send them to the channel that has already subscribed to To find out whether there is session information of the receiver from each server, send the message if there is one.
Spring boot has been encapsulated well. This article uses the common development framework to deal with it
Be careful
The implementation scenario is nginx distributed environment, and the session sharing of the system has been realized.
session sharing is not described in this article. The code is not elegant enough, but the principle is
2. Subscription channel, config for receiving messages
import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.user.MultiServerUserRegistry; import org.springframework.messaging.simp.user.SimpUserRegistry; import org.springframework.web.socket.server.HandshakeInterceptor; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.msunsoft.common.utils.Constants; import com.msunsoft.websocket.oaNotify.hand.MessageReceiver; import com.msunsoft.websocket.oaNotify.hand.MessageReceiver2; import com.msunsoft.websocket.oaNotify.interceptor.WebSocketInterceptor; @Configuration public class RedisObserverConfig { //Channels subscribed to notification announcement // public static final String TOPIC_OA_NOTIFY = "websocket:oa_notify"; // @Autowired // private JedisConnectionFactory jedisConnectionFactory; @Autowired private LettuceConnectionFactory jedisConnectionFactory; //Thread call @Resource(name = "taskExecutor") private TaskExecutor taskExecutor; /** * Add listening, execute when receiving channel message (write by default) * @return */ // @Bean // MessageListenerAdapter messageListener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) { // return new MessageListenerAdapter(oaNotifyListener()); // } // public SimpUserRegistry userRegistry() { // SimpUserRegistry userRegistry = new MultiServerUserRegistry(userRegistry); // } /** * Using Jackson to serialize objects */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(objectMapper); return serializer; } /** * RedisTemplate */ @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory jedisConnectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(jedisConnectionFactory); //Serialize KEY by string StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); //VALUE serialization in JSON mode redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * Message listener, message received */ @Bean MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ //Message receiver and corresponding default processing method MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage"); //How to deserialize a message messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer); return messageListenerAdapter; } //Subscribed Channels @Bean RedisMessageListenerContainer redisContainer( MessageListenerAdapter messageListenerAdapter) { final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory); container.addMessageListener(messageListenerAdapter, new PatternTopic(Constants.TOPIC_OA_NOTIFY)); // container.setTaskExecutor(Executors.newFixedThreadPool(4)); container.setTaskExecutor(taskExecutor); return container; } //For sending messages // @Bean // StringRedisTemplate template(RedisConnectionFactory connectionFactory) { // return new StringRedisTemplate(connectionFactory); // } // @Bean // SimpMessagingTemplate messageTemp(MessageChannel messageChannel) { // return new SimpMessagingTemplate(messageChannel); // } /** * Subscribed Channels * @return */ // @Bean // ChannelTopic orderFoodTopic() { // return new ChannelTopic(Constants.TOPIC_OA_NOTIFY); // } }
3. Method of execution after receiving message
import org.springframework.stereotype.Component; import com.msunsoft.websocket.oaNotify.entity.RedisWebsocketMsg; @Component public interface MessageReceiver { public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg); }
Four websocket handshake and the interface of receiving message
import java.io.IOException;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggerFactoryBinder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import com.msunsoft.common.utils.Constants;
import com.msunsoft.common.utils.JsonUtils;
import com.msunsoft.common.utils.SpringUtils;
import com.msunsoft.modules.sys.entity.User;
import com.msunsoft.modules.sys.utils.UserUtils;
import com.msunsoft.websocket.oaNotify.entity.RedisWebsocketMsg;
import com.msunsoft.websocket.oaNotify.hand.MessageReceiver;
import com.msunsoft.websocket.oaNotify.service.RedisService;
import net.sf.json.JSONArray;
//The function of ServerEndpoint is to define the current class as a websocket server. The value of the annotation will be used to listen for the terminal access URL address of the user's connection.
@ServerEndpoint(value = "/socket/{userId}")
@Component
//public class WebSocketServer {
public class WebSocketServer implements MessageReceiver{ //implements WebSocketConfigurer
//Log using slf4j
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
//Used to record the current number of online connections private static int onLineCount = 0; /** * Because @ ServerEndpoint does not support injection, use SpringUtils to get IOC instance */
// private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
private RedisService redisService = SpringUtils.getBean(RedisService.class);
private ThreadPoolTaskExecutor threadPool = SpringUtils.getBean("taskExecutor") ; //Used to store WebSocketServer objects corresponding to each client private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<String, WebSocketServer>(); //The connection session of a client, through which data needs to be sent to the client private Session session; //id address of the client private String userId; /** * The connection is established successfully. The method called corresponds to onOpen on the foreground page * @param ip ip address * @param session Conversation */ @OnOpen public void onOpen(@PathParam("userId")String userId,Session session){ //Custom logic implementation according to business //If this user information is added in other tomcat, it will not be added again if(null != webSocketMap && null !=webSocketMap.get(userId)) {//If the user information added by this tomcat is no longer added to redis }else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, userId) &&(null == webSocketMap || null == webSocketMap.get(userId))) { onMessage("close", session); //Put session in this server this.session = session; this.userId = userId; webSocketMap.put(userId,this); //Put the current object in the map }else { this.session = session; this.userId = userId; webSocketMap.put(userId,this); //Put the current object in the map addOnLineCount(); //Number of people online plus one LOGGER.info("New connection joined,ip:{}!Current online population:{}",userId,getOnLineCount()); //Save information to redis redisService.addToSet(Constants.REDIS_WEBSOCKET_USER_SET, userId); } } /** * The method of the connection close call corresponds to the onClose of the foreground page * @param ip */ @OnClose public void onClose(@PathParam("userId")String userId){ //Remove users from Redis webSocketMap.remove(userId); //Remove WebSocketServer object according to ip(key) redisService.removeFromSet(Constants.REDIS_WEBSOCKET_USER_SET, userId); subOnLineCount(); LOGGER.info("WebSocket Close,ip:{},Current online population:{}",userId,getOnLineCount()); } /** * The method called when the server receives the message sent by the client corresponds to the onMessage of the foreground page * @param message * @param session */ @OnMessage public void onMessage(String message,Session session){ //Custom logic implementation according to business LOGGER.info("Message received from client:{}",message); sendMessage(message,session);//Retain } /** * Called when an error occurs, corresponding to the onError of the foreground page * @param session * @param error */ @OnError public void onError(Session session,Throwable error){ LOGGER.error("WebSocket Error occurred"); error.printStackTrace(); } /** * Send message to current user * @param message */ public void sendMessage(String message,Session session){ try{ //getBasicRemote() is to send messages synchronously. getAsyncRemote() is recommended to be used asynchronously session.getBasicRemote().sendText(message); }catch (IOException e){ e.printStackTrace(); LOGGER.info("Sending data error:,ip:{},message:{}",userId,message); } } /** * Send message to specified user * @param message */ public void sendMessageToId(String message,String id){ Session session = webSocketMap.get(id).session; sendMessage(message,session); } /** * Send messages to all users * @param message */ public static void sendMessageAll(final String message){ //The reason why you use entrySet instead of keySet is that entrySet embodies the mapping relationship of map and gets data faster through traversal. Set<Map.Entry<String, WebSocketServer>> entries = webSocketMap.entrySet(); for (Map.Entry<String, WebSocketServer> entry : entries) { final WebSocketServer webSocketServer = entry.getValue(); //Threads are used to control the sending of messages, which is more efficient. new Thread(new Runnable() { public void run() { webSocketServer.sendMessage(message,webSocketServer.session); } }).start(); } } /** * Get the current number of connections * @return */ public static synchronized int getOnLineCount(){ return WebSocketServer.onLineCount; } /** * When there is a new user connection, the number of connections is increased by 1 */ public static synchronized void addOnLineCount(){ WebSocketServer.onLineCount++; } /** * When disconnected, the number of connections is reduced by 1 */ public static synchronized void subOnLineCount(){ WebSocketServer.onLineCount--; } public Session getSession(){ return session; } public void setSession(Session session){ this.session = session; } /** * Receive subscription information */ public void receiveMessage(RedisWebsocketMsg rdisWebsocketMsg) { // TODO Auto-generated method stub Map<String, String> recieveUserMap = rdisWebsocketMsg.getReceiver(); for (Map.Entry<String, String> m : recieveUserMap.entrySet()) { ThreadSendMsage threadSendMsage = new ThreadSendMsage(m.getKey(),m.getValue(),rdisWebsocketMsg); threadPool.execute(threadSendMsage); } LOGGER.info("Sending message succeeded!"); } //Using threads to send messages public class ThreadSendMsage implements Runnable { private String recieveUserId; private String msgId; private RedisWebsocketMsg rdisWebsocketMsg; public ThreadSendMsage(String recieveUserId, String msgId, RedisWebsocketMsg rdisWebsocketMsg) { super(); this.recieveUserId = recieveUserId; this.msgId = msgId; this.rdisWebsocketMsg = rdisWebsocketMsg; } @Override public void run() { // TODO Auto-generated method stub rdisWebsocketMsg.setMsgId(msgId); sendMessageToUser(recieveUserId, rdisWebsocketMsg); } } public boolean sendMessageToUser(String recieveUserId, RedisWebsocketMsg rdisWebsocketMsg) { WebSocketServer webSocketServer = webSocketMap.get(recieveUserId); if(webSocketServer==null) { return false; } LOGGER.info("Enter send message"); if (!webSocketServer.session.isOpen()) {//When the channel is closed, delete the record information in redis LOGGER.info("websocket Channel closure---------------"); return false; } try { LOGGER.info("Sending message"); String msg = JsonUtils.objectToJson(rdisWebsocketMsg); webSocketServer.sendMessage(msg,webSocketServer.session); } catch (Exception e) { e.printStackTrace(); } return true; }
}
5. Entity class po of receiving message
public class RedisWebsocketMsg {
/**
*userId of the message recipient
/
Private map < string, string > receiver; / / key is userID, value is message id
/*
*For the code of the subscription channel corresponding to the message, refer to the code field of {@ link cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum}
*/
private String channelCode;
private String title;//Message header private String sender;//Sender name private String msgId;//Message id private String receiverId;//Recipient's id /** * Message text */ private String content; public RedisWebsocketMsg() { } public RedisWebsocketMsg(Map<String, String> receiver, String channelCode, String content) { this.receiver = receiver; this.channelCode = channelCode; this.content = content; } public RedisWebsocketMsg(Map<String, String> receiver, String channelCode, String title, String sender, String content) { this.receiver = receiver; this.channelCode = channelCode; this.title = title; this.sender = sender; this.content = content; } public String getReceiverId() { return receiverId; } public void setReceiverId(String receiverId) { this.receiverId = receiverId; } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId = msgId; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public Map<String, String> getReceiver() { return receiver; } public void setReceiver(Map<String, String> receiver) { this.receiver = receiver; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String getChannelCode() { return channelCode; } public void setChannelCode(String channelCode) { this.channelCode = channelCode; } @Override public String toString() { return "RedisWebsocketMsg{" + "receiver='" + receiver + '\'' + ", channelCode='" + channelCode + '\'' + ", content=" + content + '}'; }
}
6. redis service (for storing and querying, and deleting user handshake information)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundListOperations; import org.springframework.data.redis.core.BoundValueOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.SetOperations; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Service; import java.util.List; import java.util.concurrent.TimeUnit; /** * * @author zifangsky * @date 2018/7/30 * @since 1.0.0 */ @Service("redisService") public class RedisService { @Autowired private RedisTemplate<String, Object> redisTemplate; public void set(String key, Object value) { ValueOperations<String, Object> valueOperation = redisTemplate.opsForValue(); valueOperation.set(key, value); } public void setWithExpire(String key, Object value, long time, TimeUnit timeUnit) { BoundValueOperations<String, Object> boundValueOperations = redisTemplate.boundValueOps(key); boundValueOperations.set(value); boundValueOperations.expire(time,timeUnit); } public <K> K get(String key) { ValueOperations<String, Object> valueOperation = redisTemplate.opsForValue(); return (K) valueOperation.get(key); } public void delete(String key) { redisTemplate.delete(key); } // public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) { // //Bind operation // BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey); // //Insert data // boundValueOperations.leftPushAll(values); // //Set expiration time // boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit()); // } // public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) { // //Bind operation // BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey); // //Insert data // boundValueOperations.rightPushAll(values); // //Set expiration time // boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit()); // } public List<Object> rangeList(String listKey, long start, long end) { //Binding operation BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey); //Query data return boundValueOperations.range(start, end); } public void addToSet(String setKey, Object... values) { SetOperations<String, Object> opsForSet = redisTemplate.opsForSet(); opsForSet.add(setKey, values); } public Boolean isSetMember(String setKey, Object value) { SetOperations<String, Object> opsForSet = redisTemplate.opsForSet(); return opsForSet.isMember(setKey, value); } public void removeFromSet(String setKey, Object... values) { SetOperations<String, Object> opsForSet = redisTemplate.opsForSet(); opsForSet.remove(setKey, values); } public void convertAndSend(String channel, Object message) { redisTemplate.convertAndSend(channel, message); } }
7. redis configuration information in xml
<?xml version="1.0" encoding="UTF-8"?><description>Jedis Configuration</description> <!-- Load configuration properties file --> <context:property-placeholder ignore-unresolvable="true" location="classpath:jeesite.properties" /> <bean id="redisHttpSessionConfiguration" class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration"> <property name="maxInactiveIntervalInSeconds" value="1800" />
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="300" /> <!-- Maximum ability to maintain idel Number of objects in state --> <property name="maxTotal" value="60000" /> <!-- Maximum number of objects allocated --> <property name="testOnBorrow" value="true" /> <!-- When called borrow Object Check the validity of the method --> </bean> <bean id="jedisPool" class="redis.clients.jedis.JedisPool"> <constructor-arg index="0" ref="jedisPoolConfig" /> <constructor-arg index="1" value="${redis.host}" type="java.lang.String"/> <constructor-arg index="2" value="${redis.port}" type="int" /> <!-- <constructor-arg index="3" value="${redis.timeout}" type="int" /> <constructor-arg index="4" value="${redis.password}"/> <constructor-arg index="5" value="${redis.database}" type="int" /> <constructor-arg index="6" value="${redis.clientName}"/> --> </bean> </beans>
Eight front handshakes
var host = window.location.host; var hostArray = host.split(":"); host = hostArray[0]; var port = '<%=port%>'; var url = window.location.pathname; var webApp = url.split('/')[1]; var url = host+":"+port +"/"+ webApp +"/socket"; var userId = '${userId}'; function initWebsocket(){ var webSocket = null; var url = document.location.host; if ('WebSocket' in window) { webSocket = new WebSocket("ws://" + url + "/"+userId); } else if ('MozWebSocket' in window) { webSocket = new MozWebSocket("ws://" + url + "/"+userId); } else { alert('Not support webSocket'); } // When opening a connection webSocket.onopen = function(evnt) { console.log(" websocket.onopen "); webSocket.send("timing"); }; // When a message is received webSocket.onmessage = function(evnt) { if(open != null){ dialog.close(open); if(evnt.data != ""){ open = dialog.open({ width:350, height:250, shade:false, title:"Medical Insurance News Alert", position:[400,10,"auto","auto"], move:true, url:"${path}/jsp/drugAudit/messageReminder.jsp?result="+encodeURIComponent(evnt.data), content:"" }); } } }; webSocket.onerror = function(evnt) { console.log(" websocket.onerror "); }; webSocket.onclose = function(evnt) { console.log(" websocket.onclose "); }; //Monitor window closed window.onbeforeunload = function (event) { webSocket.close(); } }