SpringBoot2.x integrate websocket


##I WebSocket

1. Introduction to websocket

WebSocket is a communication protocol that allows full duplex communication over a single TCP connection. WebSocket makes the data exchange between the client and the server easier, and allows the server to actively push data to the client. In WebSocket API, the browser and server only need to complete a handshake, and a persistent connection can be established between them for two-way data transmission.

WebSocket protocol was born in 2008 and became an international standard in 2011. All browsers already support it. Its biggest feature is that the server can actively push information to the client, and the client can also actively send information to the server. It is a real two-way equal dialogue and belongs to Server push technology A kind of.

2.WebSocket features

  • Based on the TCP protocol, the server-side implementation is relatively easy
  • It has good compatibility with HTTP protocol. The default ports are also 80 and 443, and the handshake phase adopts HTTP protocol, so it is not easy to shield the handshake and can pass through various HTTP proxy servers
  • The data format is light, the performance overhead is small, and the communication is efficient
  • You can send text or binary data
  • There is no homology restriction, and the client can communicate with any server
  • The protocol identifier is ws (wss if encrypted), and the server URL is the URL

3.WebSocket attribute

The following are the properties of the WebSocket object. Suppose we use the above code to create a Socket object:

attributedescribe
Socket.readyStateThe read-only property readyState indicates the connection state, which can be the following values: 0 - indicates that the connection has not been established. 1 - indicates that the connection has been established and communication is available. 2 - indicates that the connection is being closed. 3 - indicates that the connection is closed or cannot be opened.
Socket.bufferedAmountThe read-only attribute bufferedAmount has been put into the queue by send() to wait for transmission, but the number of UTF-8 text bytes that have not been issued.

4.WebSocket event

The following are the related events of WebSocket object. Suppose we use the above code to create a Socket object:

eventEvent handlerdescribe
openSocket.onopenTriggered when the connection is established
messageSocket.onmessageTriggered when the client receives data from the server
errorSocket.onerrorTriggered when a communication error occurs
closeSocket.oncloseTriggered when the connection is closed

5.WebSocket method

The following are the related methods of WebSocket object. Suppose we use the above code to create a Socket object:

methoddescribe
Socket.send()Send data using connection
Socket.close()Close connection

2, Springboot 2 X integrate websocket

1. Achieving goals

  • Private message
  • Mass messaging
  • Support receiving offline messages (memory storage)
  • Count the number of unread messages

2. Steps

2.1 create a new SpringBoot project and add Maven dependency

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.dw</groupId>
    <artifactId>springboot-websocket</artifactId>
    <version>1.0</version>
    <name>springboot-websocket</name>
    <description>springboot-websocket</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.2 the project structure is as follows:

2.3 server configuration

package com.dw.sprboosoc.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * WebSocket Configuration class
 * dingwen
 * 2021/2/20 10:37
 **/
@Configuration
public class WebSocketConfig {

    /*
     * ServerEndpointExporter websocket endpoint declared with @ ServerEndpoint annotation will be automatically registered
     * @param []
     * @return org.springframework.web.socket.server.standard.ServerEndpointExporter
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

WebSocketServer

package com.dw.sprboosoc.service;

import com.alibaba.fastjson.JSON;
import com.dw.sprboosoc.constant.MessageEnum;
import com.dw.sprboosoc.dto.WebSocketMessageDto;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSocket Core class
 * dingwen
 * 2021/2/20 10:41
 **/
// Add Bean
@Component
@Slf4j
//Access path
@ServerEndpoint(value = "/websocket/{sendUserId}")
public class WebSocketServer {
    //Current number of online connections to ensure thread safety
    private static final AtomicInteger currentOnlineNumber = new AtomicInteger();
    //The thread safety Set of concurrent package is used to store the WebSocketServer object corresponding to each client
    private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
    //A message map that is set to be static and common. The map ConcurrentMap is thread safe. The HashMap is not safe
    private static final ConcurrentMap<String, Map<String, List<WebSocketMessageDto>>> messageMap = new ConcurrentHashMap<>();

    /*
     *send message
     * @param [session, message, userId]
     * @return void
     */
    public void sendMessage(WebSocketMessageDto webSocketMessageDto) throws IOException {
        try {
            switch (webSocketMessageDto.getMessageEnum()) {
                // Broadcast message
                case all:
                    sessionPool.values().forEach(se -> {
                        try {
                            se.getBasicRemote()
                                    .sendText(webSocketMessageDto.toString());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
                    log.info("websocket: Broadcast message" + webSocketMessageDto);
                    // off-line
                    storeOfflineMessage(webSocketMessageDto);
                    break;
                // Private hair
                case one:
                    if (judgeUserOnline(webSocketMessageDto.getRecvUserId())) {
                        // on-line
                        sessionPool.get(webSocketMessageDto.getRecvUserId())
                                .getBasicRemote()
                                .sendText(webSocketMessageDto.toString());
                    } else {
                        // off-line
                        storeOfflineMessage(webSocketMessageDto);
                    }

                    log.info("websocket: Private message" + webSocketMessageDto);
                    break;
            }


        } catch (Exception exception) {
            log.error("websocket: An error occurred while sending the message");
        }
    }

    /*
     *Client received message
     * @param [message]
     * @return void
     */
    @OnMessage
    public void onMessage(String webSocketMessageDtoStr) throws IOException {
        WebSocketMessageDto webSocketMessageDto = JSON.parseObject(webSocketMessageDtoStr, WebSocketMessageDto.class);
        log.info("websocket:" + webSocketMessageDto.getRecvUserId() + "Roger, from" + webSocketMessageDto.getSendUserId() + "Messages sent" + webSocketMessageDto.getMessage());
        sendMessage(webSocketMessageDto);
    }

    /*
     *Judge whether the user is online
     * @param [recvUserId]
     * @return boolean
     */
    public boolean judgeUserOnline(String recvUserId) {
        boolean flag = !ObjectUtils.isEmpty(sessionPool.get(recvUserId));
        String flagStr = flag ? "on-line" : "off-line";
        log.info("websocket: " + recvUserId + ":" + flagStr);
        return flag;
    }

    /*
     *When the user is offline, the message is stored in memory
     * @param [recvUserId]
     * @return void
     */
    public void storeOfflineMessage(WebSocketMessageDto webSocketMessageDto) {
        //Send a message to the user for the first time when he is not online
        if (ObjectUtils.isEmpty(messageMap.get(webSocketMessageDto.getRecvUserId()))) {
            Map<String, List<WebSocketMessageDto>> maps = new HashMap<>();
            List<WebSocketMessageDto> list = new ArrayList<>();
            list.add(webSocketMessageDto);
            maps.put(webSocketMessageDto.getRecvUserId(), list);
            messageMap.put(webSocketMessageDto.getRecvUserId(), maps);
        } else {
            //Send the message again when the user is not online
            Map<String, List<WebSocketMessageDto>> listObject = messageMap.get(webSocketMessageDto.getRecvUserId());
            List<WebSocketMessageDto> objects = new ArrayList<>();
            if (!ObjectUtils.isEmpty(listObject.get(webSocketMessageDto.getRecvUserId()))) {//The user sent a message to the user who received the message
                //This user has sent offline messages to this user (all messages sent by this user to this user)
                objects = listObject.get(webSocketMessageDto.getRecvUserId());
                //Add the message sent this time
                objects.add(webSocketMessageDto);
                //Replace the original map
                listObject.put(webSocketMessageDto.getRecvUserId(), objects);
            } else {//This user has not sent offline messages to this user
                objects.add(webSocketMessageDto);
                listObject.put(webSocketMessageDto.getRecvUserId(), objects);
            }
            messageMap.put(webSocketMessageDto.getRecvUserId(), listObject);


        }
    }

    /*
     *Call after successful connection establishment
     * @param [session, userId]
     * @return void
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "sendUserId") String sendUserId) throws IOException {
        //Join after successfully establishing connection
        sessionPool.put(sendUserId, session);
        //Current online quantity + 1
        currentOnlineNumber.incrementAndGet();
        log.info("websocket:" + sendUserId + "Join connection,Current online user" + currentOnlineNumber + "Unread messages:" + getMessageCount(sendUserId));
        // Send offline messages
        sendOffLineMessage(sendUserId);
    }

    /*
     * Send offline messages when users go online
     * @param []
     * @return void
     */
    @SneakyThrows
    public void sendOffLineMessage(String sendUserId) {

        if (ObjectUtils.isEmpty(messageMap.get(sendUserId))) {
            // The user has no offline messages
            return;
        }
        // The currently logged in user has an offline message
        //It means that someone sends a message to the user when the user is not logged in
        //All messages not received by this user
        Map<String, List<WebSocketMessageDto>> lists = messageMap.get(sendUserId);
        //Offline messages sent by object users
        List<WebSocketMessageDto> list = lists.get(sendUserId);
        if (list != null) {
            for (WebSocketMessageDto webSocketMessageDto : list) {
                onMessage(JSON.toJSONString(webSocketMessageDto));
            }
        }

        // Delete sent messages
        removeHasBeenSentMessage(sendUserId, lists);


    }

    /*
     *Delete sent messages
     * @param [sendUserId, map]
     * @return void
     */
    public void removeHasBeenSentMessage(String sendUserId, Map<String, List<WebSocketMessageDto>> map) {
        // Iterator object of key in map
        //Delete the message after the user receives it to avoid sending it again next time
        Iterator iterator = map.keySet().iterator();
        while (iterator.hasNext()) {// Cycle key value for judgment
            String keys = (String) iterator.next();//key
            if (sendUserId.equals(keys)) {
                iterator.remove();
            }
        }
    }

    /*
     *Called when the connection is closed
     * @param [userId]
     * @return void
     */
    @OnClose
    public void onClose(@PathParam(value = "sendUserId") String sendUserId) {
        sessionPool.remove(sendUserId);
        currentOnlineNumber.decrementAndGet();
        log.info("websocket:" + sendUserId + "Disconnect,Current online user" + currentOnlineNumber);
    }

    /*
     *Called when an error occurs
     * @param [session, throwable]
     * @return void
     */
    @OnError
    public void onError(Throwable throwable) {
        log.error("websocket: An error has occurred");
        throwable.printStackTrace();
    }

    /**
     * Gets the number of unread messages for the user
     */
    public int getMessageCount(String recvUserId) {
        //Get all messages not received by the user
        Map<String, List<WebSocketMessageDto>> listMap = messageMap.get(recvUserId);
        if (listMap != null) {
            List<WebSocketMessageDto> list = listMap.get(recvUserId);
            if (list != null) {
                return listMap.get(recvUserId).size();
            } else {
                return 0;
            }

        } else {
            return 0;
        }

    }
}


2.4 basic usage of client

open a connection

    let socket;
    function openSocket() {
        if(typeof(WebSocket) == "undefined") {
            console.log("Your browser does not support WebSocket");
        }else{
            console.log("Your browser supports WebSocket");
            //Implement the WebSocket object, specify the server address to be connected and establish a connection with the port
            let sendUserId = document.getElementById('sendUserId').value;
            let socketUrl="ws://192.168.1.108:8081/websocket/"+sendUserId;
            console.log(socketUrl);
            if(socket!=null){
                socket.close();
                socket=null;
            }
            socket = new WebSocket(socketUrl);
            //Open event
            socket.onopen = function() {
                console.log("websocket Opened");
                //socket.send("this is a message from the client" + location.href + new Date());
            };
            //Get message event
            socket.onmessage = function(msg) {
                let serverMsg = "Received server information:" + msg.data;
                console.log(serverMsg);
                //The discovery message enters the start processing front-end trigger logic
            };
            //Close event
            socket.onclose = function() {
                console.log("websocket Closed");
            };
            //An error event has occurred
            socket.onerror = function() {
                console.log("websocket An error has occurred");
            }
        }
    }

Send private messages

    function sendMessage() {
        if(typeof(WebSocket) == "undefined") {
            console.log("Your browser does not support WebSocket");
        }else {
            let recvUserId = document.getElementById('recvUserId').value;
            let sendUserId = document.getElementById('sendUserId').value;
            let msg = document.getElementById('msg').value;
            let webSocketMessageDto = '{"recvUserId":"'+recvUserId+'","sendUserId":"'+sendUserId+'","message":"'+msg+'","messageEnum":"one"}';
            console.log(webSocketMessageDto);
            socket.send(webSocketMessageDto);
        }
    }




Mass messaging

    function sendMessages() {
        if(typeof(WebSocket) == "undefined") {
            console.log("Your browser does not support WebSocket");
        }else {
            let msg = document.getElementById('msg').value;
            let webSocketMessageDto = '{"message":"'+msg+'","messageEnum":"all"}'; let sendUserId = document.getElementById('sendUserId').value;
            let webSocketMessageDto = '{"sendUserId":"'+sendUserId+'","message":"'+msg+'","messageEnum":"all"}';
            console.log(webSocketMessageDto);
            socket.send(webSocketMessageDto);
        }
    }

2, Testing

2.1 start up project

2.2 accessing the test page

2.3 add two users with user ID s of 100 and 200


2.4 server log

2.5 100 sends messages to 200

2.6 200 received message

2.7 100 send messages to 666 offline users


Online users 668.2


2.9 mass sending by admin users

You can see that all connected users have received mass messages.

Full code address: Code cloud: https://gitee.com/dingwen-gitee/springboot-websocket.git

Keywords: Java Spring Boot websocket

Added by shaymol on Wed, 09 Feb 2022 20:05:31 +0200