05. Netty learning notes - (case: chat business)

netty notes summary: Netty Study Guide (summary of materials and articles)

according to Black horse programmer netty video tutorial Learn the notes you have made. Some of the contents are from the notes of the dark horse

Note demo case warehouse address: Github-[netty-learn]

Pit description

1. The subclass that implements SimpleChannelInboundHandler does not add @ Sharable annotation, resulting in the failure of the second client to connect

If you write a class that inherits simplechannelininboundhandler without adding @ channelhandler Sharable annotation: when the second client connects, the INACTIVE and UNREGISTERED events will be executed immediately, and the connection will fail directly!

Display of problems: the second client cannot be connected directly

Solution: add @ Sharable annotation to the custom handler!


Specific business draft

The client and server define the specified transmission protocol, and then transmit data according to the specified transmission protocol.

The first is the login service: when the client starts, it creates a thread to construct the specified message object and send data. (this process will be carried out according to the specified method, write the protocol to the data, and finally send it out); The server processes the received protocol data according to the specified protocol rules and converts it into the specified object.

① Login thread communication: after the client receives the data responded by the server, how to carry out the interactive processing between the two threads. (one is the thread created during the execution of the client active event to send data to the server. The other is the thread that the client reads the data from the server during the execution of channelread (the thread is in nio). How can these two threads communicate?)

  • A countdowncatch tool class is used for thread communication to determine whether the login receives information. The status of whether the login is successful is represented by a concurrent variable AtomicBoolean.

② Business message sending (client): after successful login, a thread is used to continuously obtain the command information of the console and send the specified message object according to different commands.

③ Single chat message service: a session set is maintained on the server side. If the login is successful, it will be added to the set (username, channel). For single chat business messages, there are three parts: sender, destination and content. When the server receives a single chat message, it obtains the specified channel from the session set according to the destination, and then uses the channel to send data to the destination.

④ Group chat: create group celebrity 1, person 2 and person 3. Use a map set to temporarily store the relationship between the user name and the corresponding channel. During the creation process, prompt data will be written to the channels of other people, indicating that they have been pulled into the group.

⑤ Group chat message sending: gsend group name information. First get all user names from sessionGroup according to the group name, and then send group chat messages in turn according to all channel s in sessionFactory according to each user.

⑥ Get group member information: gmembers group member information. After sending, get all channels in the group according to the group name, and then write data to each channel in turn.

⑦ Join group chat: gjoin [group name], which is directly added to the specified group set.

⑧ Exit group chat: gquit [group name], unbind the associated set.

⑨ Exit login: disconnect the client and perform a series of unbinding actions on the server! Normal exit and abnormal exit: implement a handleradapter and override the channelInactive and exceptionActive methods

⑩ Idle detection (send heartbeat): set IdleStateHandler to specify the read, write, read + Write monitoring time description. If no event occurs after a specified number of seconds, the IdleState event will be triggered. You can use ChannelDuplexHandler to override userEventTriggered to capture. Generally, the server sets the read for 5s, and the client monitors the write for 3 seconds (half the time of the server) to send the heartbeat, indicating that it is still connected!

Extension:

① Specified user offline: quitmember [username]

  • Client: parse the command to send it to the server for user offline operation, and specify a user name.
    • Use a parallel variable to indicate offline, rewrite the channelInactive and exceptioncaution events to set the EXIT variable, and judge the EXIT variable under the blocking events of the three read consoles of the client, indicating that the connection has been disconnected!
  • Server side: similar to the same handler interested in the specified object, perform business operations, get the channel of the specified username for close() operation!

Specific business realization

① Login service

client:

  • ① Send: the custom thread runs a thread in the channelActive event to mainly interact with our console. The same is true for login business. First, you need to enter the user name and password, and then package it into a preset LoginRequestMessage object, which is sent by the channel.
  • ② Receive: the thread in the eventloop receives the object decoded by the user-defined protocol, converts it into a LoginResponseMessage object, and determines whether it has successfully logged in.
    • Core: for how to let the thread in the eventloop notify the main thread of successful login, we can use a countdowncatch + atomicboolean. The former is used to notify the main thread to get the login result, and the latter is used to indicate whether the login status is successful or not!

Server: write a subclass that implements SimpleChannelInboundHandler, specify the object to receive LoginRequestMessage, then write the corresponding channelRead() method to perform business operations, and finally return a LoginResponseMessage to the client according to the actual situation.

client

client:

//Login message notification counter
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
//Success status variable
AtomicBoolean LOGIN = new AtomicBoolean(false);


//Be responsible for receiving the response data from the server
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.debug("msg: {}", msg);
    //The login response results are processed separately, and other results directly output the message content
    if (msg instanceof LoginResponseMessage) {
        LoginResponseMessage response = (LoginResponseMessage) msg;
        if (response.isSuccess()){
            LOGIN.set(true);//Set login status to true
        }
        WAIT_FOR_LOGIN.countDown();//Count - 1. If it is 0, it will notify the waiting thread to block using this counter
    }
}


@Override
public void channelActive(ChannelHandlerContext ctx){
    //It is responsible for receiving the user's input on the console and sending data to the server
    new Thread(()->{
        Scanner scanner = new Scanner(System.in);
        System.out.println("Please enter user name:");
        String username = scanner.nextLine();
        System.out.println("Please input a password:");
        String password = scanner.nextLine();
        //Construct the login message object and send it to the server
        Message message = new LoginRequestMessage(username, password);
        ctx.channel().writeAndFlush(message);
        try {
            //Wait for other threads to count to 0 before waking up for downward execution
            WAIT_FOR_LOGIN.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ...

    }, "system in").start();
}

Server

import com.changlu.message.ChatRequestMessage;
import com.changlu.message.ChatResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName ChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/13 18:54
 * @Description Chat request object processor: for ChatRequestMessage
 */
//The processor expresses interest in the decoded ChatRequestMessage object (determined by the object read during decoding according to the user-defined protocol, which is actually related to the object type sent by the client!) You can see that the ChatRequestMessage object is consistent with the ChatRequestMessage encapsulated on the client
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
        String to = msg.getTo();
        final Channel channel = SessionFactory.getSession().getChannel(to);
        //If the other party is offline, inform the sender of the message
        if (channel == null){
            ctx.writeAndFlush(new ChatResponseMessage(msg.getFrom(), "The opposite user does not exist or has been offline!"));
            return;
        }
        channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
}

server:

//handler
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();

 @Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProcotolFrameDecoder());//Fixed length decoder (self-made implementation, decoding rules have been defined)
    ch.pipeline().addLast(LOGGING_HANDLER);//Log processor
    ch.pipeline().addLast(MESSAGE_CODEC);//Protocol decoder 
    ch.pipeline().addLast(LOGIN_HANDLER);//[login handler]
}

effect:

Server:

client

② The client sends the service according to the command

Requirement: after successful login, the specified prompt information will be displayed to process the commands entered by the user; Login failed. Close the connection.

Client: it is also executed in the thread that sends the login request.

@Override
public void channelActive(ChannelHandlerContext ctx){
    //It is responsible for receiving the user's input on the console and sending data to the server
    new Thread(()->{
        //...
        try {
            //After receiving the response, count - 1, stop blocking and continue to execute downward
            WAIT_FOR_LOGIN.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Get login status variable
        if (!LOGIN.get()) {
            System.out.println("Login failed!");
            ctx.channel().close();
            return;
        }
        System.out.println("Login succeeded!");
        while (true) {
            System.out.println("==================================");
            System.out.println("send [username] [content]");
            System.out.println("gsend [group name] [content]");
            System.out.println("gcreate [group name] [m1,m2,m3...]");
            System.out.println("gmembers [group name]");
            System.out.println("gjoin [group name]");
            System.out.println("gquit [group name]");
            System.out.println("quit");
            System.out.println("==================================");
            String command = scanner.nextLine();
            String[] split = command.split(" ");
            switch (split[0]){
                case "send" :
                    ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
                    break;
                case "gsend" :
                    ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));
                    break;
                case "gcreate" :
                    Set<String> users = new HashSet<>(Arrays.asList(split[2].split(",")));
                    ctx.writeAndFlush(new GroupCreateRequestMessage(split[0],users));
                    break;
                case "gmembers" :
                    ctx.writeAndFlush(new GroupMembersRequestMessage(split[1]));
                    break;
                case "gjoin" :
                    ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1]));
                    break;
                case "gquit" :
                    ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1]));
                    break;
                case "quit" :
                    ctx.channel().close();
                    break;
            }
        }

    }, "system in").start();
}

③ Single chat service (send [username] [content])

Let's first look at the memory based session implementation:

import io.netty.channel.Channel;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SessionMemoryImpl implements Session {

    //Save the map collection of user name and channel mapping
    private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
    //Save the map collection of channel and user name mapping
    private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
    //The map collection used to save the channel and bind specific attributes
    private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();

    //Login success: save the corresponding map to three maps
    @Override
    public void bind(Channel channel, String username) {
        usernameChannelMap.put(username, channel);
        channelUsernameMap.put(channel, username);
        channelAttributesMap.put(channel, new ConcurrentHashMap<>());
    }

    @Override
    public void unbind(Channel channel) {
        String username = channelUsernameMap.remove(channel);
        usernameChannelMap.remove(username);
        channelAttributesMap.remove(channel);
    }

    @Override
    public Object getAttribute(Channel channel, String name) {
        return channelAttributesMap.get(channel).get(name);
    }

    @Override
    public void setAttribute(Channel channel, String name, Object value) {
        channelAttributesMap.get(channel).put(name, value);
    }

    //Get the specified channel according to username
    @Override
    public Channel getChannel(String username) {
        return usernameChannelMap.get(username);
    }

    @Override
    public String toString() {
        return usernameChannelMap.toString();
    }
}

client: read the command information entered by the console, package it into a ChatRequestMessage and send it.

case "send" :
ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
break;

server: the ChatRequestMessage object is obtained by decoding the subscription through the user-defined protocol, and the service is processed.

  • Specific business operations: ① get the destination name and the specified channel from the session object. ② Judge whether the channel is empty. If it is empty, it means that the user is not online, and a prompt data will be sent back; If it is not empty, the data content will be sent directly from the obtained channel.
  • New channelUnregistered rewrite event: unbind the logged in session channel.
import com.changlu.message.ChatRequestMessage;
import com.changlu.message.ChatResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName ChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/13 18:54
 * @Description Chat request object processor: for ChatRequestMessage
 */
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
        String to = msg.getTo();
        final Channel channel = SessionFactory.getSession().getChannel(to);
        //If the other party is offline, inform the sender of the message (null indicates that the other party has not logged in at all)
        if (channel == null){
            ctx.writeAndFlush(new ChatResponseMessage(msg.getFrom(), "The opposite user does not exist or has been offline!"));
            return;
        }
        channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
}

Server:

//The handler is shareable and thread safe
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
           //...
           ch.pipeline().addLast(CHAT_HANDLER);//Chat request handler
    }
});

Effect: the server starts first, then starts two client s, and logs in to lisi and wangwu in turn.

After logging in, the lisi client executes the command send wangwu 123. At this time, the server receives the message and uses wangwu's channel to send data:

Only log in to the lisi client, and then directly execute the command send wangwu hello. At this time, the lisi client receives a message and the wangwu client is not online yet

④ Group chat building and group pulling (gcreate [group name] [m1,m2,m3...])

Client: parse the command, read the group name and members, invite members + themselves to form a set set, and finally package it into GroupCreateRequestMessage and send it.

case "gcreate" :
    Set<String> users = new HashSet<>(Arrays.asList(split[2].split(",")));
    users.add(username);//Add yourself to group chat
    ctx.writeAndFlush(new GroupCreateRequestMessage(split[1],users));
    break;

Server: write a handler interested in GroupCreateRequestMessage for business processing. The core essence is to add groups and set member sets to HashMap. Finally, the corresponding processing is carried out according to whether the addition is successful.

import com.changlu.message.GroupCreateRequestMessage;
import com.changlu.message.GroupCreateResponseMessage;
import com.changlu.server.session.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;
import java.util.Set;

/**
 * @ClassName GroupCreateRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 14:46
 * @Description New group chat processing: create a group chat and pull in the specified members
 */
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler  extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        Set<String> members = msg.getMembers();
        //Group manager
        final GroupSession groupSession = GroupSessionFactory.getGroupSession();
        Group group = groupSession.createGroup(groupName, members);
        //If null is returned, it indicates that there was no original, and the current insertion is successful
        if (group == null){
            //Response 1: the creation is successful. Send a creation success message to the original client
            ctx.channel().writeAndFlush(new GroupCreateResponseMessage(true, "Group created successfully!"));
            List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
            //Response 2: Send a pulled group chat message to all clients pulled into the group chat
            for (Channel channel : membersChannel) {
                channel.writeAndFlush(new GroupCreateResponseMessage(true, "You have been pulled to group chat:" + groupName));
            }
        } else {
            //Response 3: failed to create and send a prompt to the source client
            ctx.channel().writeAndFlush(new GroupCreateResponseMessage(false, "Group already exists!"));
        }

    }
}
//handler processor that handles GroupCreateRequestMessage
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ...
    ch.pipeline().addLast(GROUP_CREATE_HANDLER);//Create group chat handler
}

Effect: create three clients: lisi, wangwu and zhaoliu. Remember that the lisi client executes the command gcreate dreamgroup, wangwu and zhaoliu

1. The group chat is created successfully. Firstly, the lisi client receives the message of success in creating the group chat, and then the message of being pulled into the group chat is received by lisi itself and the newly pulled wangwu and zhaoliu

Others will not be shown here when they receive the information pulled in

2. Group chat creation failed. lisi client receives the message of group chat creation failure.

⑤ Group chat message sending (gsend [group name] [content])

Client: parse the command, encapsulate it into GroupChatRequestMessage object and send it.

case "gsend" :
    ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));
    break;

Server: write a handler interested in GroupChatRequestMessage, then get all channels according to the group name, and then send out data according to the channels in turn.

import com.changlu.message.GroupChatRequestMessage;
import com.changlu.message.GroupChatResponseMessage;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;

/**
 * @ClassName GroupChatRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 15:26
 * @Description Group chat: Send a message to a group. [gcreate [group name] [m1,m2,m3...]]
 */
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        String content = msg.getContent();
        //Take out all channel s according to the group name to send data
        List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
        GroupChatResponseMessage responseMessage = new GroupChatResponseMessage(msg.getFrom(), content);
        responseMessage.setSuccess(true);
        for (Channel channel : membersChannel) {
            channel.writeAndFlush(responseMessage);
        }
    }
}
//Specified actuator
GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ...
    ch.pipeline().addLast(GROUP_CHAT_HANDLER);//Send message handler to group chat
}

Effect: before executing the command, you need to execute the gcreate command to create a group chat and pull people. After the creation is successful, you can send and execute the group chat message command


⑥ Get group member information (gmmembers [group name])

Client: parse the command, encapsulate it into GroupMembersRequestMessage object and send it.

System.out.println("gmembers [group name]");

case "gmembers" :
    ctx.writeAndFlush(new GroupMembersRequestMessage(split[1]));
    break;

Server: write a handler interested in GroupMembersRequestMessage, then perform business operations and send group member information to the source channel.

import com.changlu.message.GroupMembersRequestMessage;
import com.changlu.message.GroupMembersResponseMessage;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Set;

/**
 * @ClassName GroupMembersRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 15:59
 * @Description View group member information: obtain all group member information according to the group name. [gmembers [group name]]
 */
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
        final String groupName = msg.getGroupName();
        final Set<String> members = GroupSessionFactory.getGroupSession().getMembers(groupName);
        ctx.channel().writeAndFlush(new GroupMembersResponseMessage(members));
    }
}
GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();

ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);//Gets the handler for all group members of the specified group chat

effect:

⑦ Join a group chat (gjoin [group name])

Client: parse the command, package it into GroupJoinRequestMessage and send it.

System.out.println("gjoin [group name]");

case "gjoin" :
    ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1]));
    break;

Server: obtain the group name and user name to perform business operations.

import com.changlu.message.GroupJoinRequestMessage;
import com.changlu.message.GroupJoinResponseMessage;
import com.changlu.server.session.Group;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName GroupJoinRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:04
 * @Description Join the group chat processor: [gjoin [group name]]
 */
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        String username = msg.getUsername();
        Group group = GroupSessionFactory.getGroupSession().joinMember(groupName, username);
        if (group != null){
            ctx.writeAndFlush(new GroupJoinResponseMessage(true, "Group chat["+ groupName +"]Join successfully!"));
        }else{
            ctx.writeAndFlush(new GroupJoinResponseMessage(false, "There is no such group chat at present!"));
        }
    }
}
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();

ch.pipeline().addLast(GROUP_JOIN_HANDLER);//Join the group chat handler

effect:

⑧ Quit group chat (gquit [group name])

Client: parse the command, package it into GroupQuitRequestMessage and send it.

System.out.println("gquit [group name]");

case "gquit" :
    ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1]));
    break;

Server: obtain the group name and user name to perform business operations.

import com.changlu.message.GroupQuitRequestMessage;
import com.changlu.message.GroupQuitResponseMessage;
import com.changlu.server.session.Group;
import com.changlu.server.session.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName GroupQuitRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:09
 * @Description Exit the group chat processor: [gquit [group name]]
 */
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
        final String username = msg.getUsername();
        final String groupName = msg.getGroupName();
        final Group group = GroupSessionFactory.getGroupSession().removeMember(groupName, username);
        if (group != null) {
            ctx.writeAndFlush(new GroupQuitResponseMessage(true, "Quit group chat successfully!"));
        }else{
            ctx.writeAndFlush(new GroupQuitResponseMessage(true, "This group chat does not exist!"));
        }
    }
}
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();

ch.pipeline().addLast(GROUP_QUIT_HANDLER);//Exit group chat handler

effect:

⑨ Exit login

Client: parse the command and close the connection directly.

System.out.println("quit");

case "quit" :
    ctx.channel().close();
    break;

Server: if the client actively disconnects itself, the server will also trigger the specified inactive operation. At this time, unbind the channel set!

import com.changlu.server.session.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @ClassName QuitHandler
 * @Author ChangLu
 * @Date 2022/1/14 16:16
 * @Description Exit connection actuator
 */
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {

    // An inactive event is triggered when the connection is disconnected
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} Connection disconnected!", ctx.channel());
    }

    // Triggered when an exception occurs
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} Disconnected abnormally yes{}", ctx.channel(), cause.getMessage());
    }
}
QuitHandler QUIT_HANDLER = new QuitHandler();

ch.pipeline().addLast(QUIT_HANDLER);//Exit and handle exception handler

effect:

⑩ Idle detection (send heartbeat)

Knowledge point description

Network programming prone to problems: connection fake death. It is a big problem when the server is under high load. netty provides a way to fake death, which is the idle detector. (that is, a handler, IdleStateHandler). For the server, if it has not received the client data for a long time, it may think that there is a problem with the connection or the data written out for a long time has not been written out. Just monitor whether you have too much free time to read or write, or the combined length of free time to read and write.

  • IdleStateHandler ·: it is constructed with three parameters. Parameter 1 detects that the idle time of reading exceeds a certain second, parameter 2 detects that the idle time of writing exceeds a certain second, and parameter 3 detects that both reading and writing are idle. In seconds.
  • If no data is received from the channel within the specified seconds, an event (read or write...) will be triggered. You can write ChannelDuplexHandler to rewrite userEventTriggered to judge what event is triggered.

Heartbeat: for connection fake death, people do not send data, but the client can send data regularly. In order to prove to the server or send heartbeat packets like the server, set automatic detection on the server and client respectively. The server detects read events and the client detects write events. Usually, the detection time of the client is 1 / 2 of that of the server.

code

server: IdleStateHandler to specify the number of seconds to detect the event to trigger the event. ChannelDuplexHandler is used to capture the specified event to close the connection

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProcotolFrameDecoder());
    ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    //On the server side, it is mainly used to judge whether the read idle time is too long
    // If no channel data is received within 5s, an idlestate#reader will be triggered_ Idle event
    ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    ch.pipeline().addLast(new ChannelDuplexHandler(){ // Using a bidirectional processor, ChannelDuplexHandler can act as both inbound and outbound processors
        //Event used to capture IdleStateHandler
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent event = (IdleStateEvent) evt;
            //Trigger read event
            if (event.state() == IdleState.READER_IDLE) {
                log.debug("Already 5 s No data read!");
                //Close the channel connection
                ctx.channel().close();
            }
        }
    });
    //... Other handler s
}

Client: for the server-side detection mechanism, the client needs to automatically send a heartbeat message every 3 seconds (half of the server-side monitoring) to indicate that it is currently connected

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ProcotolFrameDecoder());
        //                    ch.pipeline().addLast(LOGGING_HANDLER);
        ch.pipeline().addLast(MESSAGE_CODEC);
        //The client is mainly used to judge whether the write idle time is too long to send heartbeat, indicating that the current user is not disconnected
        // If no channel data is received within 3s, an IdleState #reader will be triggered_ Idle event
        ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
        ch.pipeline().addLast(new ChannelDuplexHandler(){ // Using a bidirectional processor, ChannelDuplexHandler can act as both inbound and outbound processors
            //Event used to capture IdleStateHandler
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                IdleStateEvent event = (IdleStateEvent) evt;
                //Trigger read event
                if (event.state() == IdleState.WRITER_IDLE) {
                    log.debug("Already 3 s No data read!");
                    ctx.channel().writeAndFlush(new PingMessage());
                }
            }
        });
    }
}

Effect: when the server receives a message all the time, the specified event will not be triggered and the channel will not be closed naturally

extend

① Specified user offline (quitmember [username])

Client: parse the command quitmember [username] and send it as a QuitMemberRequestMessage object.

 System.out.println("quitmember [username]");

case "quitmember":
    ctx.writeAndFlush(new QuitMemberRequestMessage(split[1]));
    break;
  • For disconnection, the following two additional things need to be defined:
    1. Set a boolean variable EXIT(AtomicBoolean) for thread communication to indicate disconnection. Judge whether to disconnect the EXIT of the three consoles when they wait for input blocking events!
    2. Override the channelInactive and exceptioncaution events. One is the event triggered when the connection is disconnected, and the other is the event caught when an exception occurs. Once captured, set EXIT to true!
AtomicBoolean EXIT = new AtomicBoolean(false);//Detect disconnected variables

ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter(){
    // Triggered when the connection is disconnected
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.debug("The connection has been disconnected. Please press any key to exit...");
        EXIT.set(true);
    }

    // Triggered when an exception occurs
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.debug("The connection has been disconnected. Please press any key to exit...,Exception information:{}",cause.getMessage());
        EXIT.set(true);
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx){
        //It is responsible for receiving the user's input on the console and sending data to the server
        new Thread(()->{
            //...
            System.out.println("Please enter user name:");
            String username = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            System.out.println("Please input a password:");
            String password = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            
            //while(true) wait for the input command to add judgment
            String command = scanner.nextLine();
            if (EXIT.get()){
                return;
            }
            //...
    }
}

Server: write a handler that is interested in QuitMemberRequestMessage to get the username of the specified offline, then get the channel from sessionFactory and disconnect it manually!

import com.changlu.message.QuitMemberRequestMessage;
import com.changlu.message.QuitMemberResponseMessage;
import com.changlu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ClassName QuitMemberRequestMessageHandler
 * @Author ChangLu
 * @Date 2022/1/14 21:48
 * @Description Specified user offline actuator: [quitmember [username]]
 */
@ChannelHandler.Sharable
public class QuitMemberRequestMessageHandler extends SimpleChannelInboundHandler<QuitMemberRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, QuitMemberRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        Channel channel = SessionFactory.getSession().getChannel(username);
        if (channel == null) {
            ctx.writeAndFlush(new QuitMemberResponseMessage(true, "The user is not online!"));
        } else {
            //Unbind and force close
            SessionFactory.getSession().unbind(channel);
            channel.close();
            ctx.writeAndFlush(new QuitMemberResponseMessage(true, "The user has been forced offline!"));
        }
    }
}
QuitMemberRequestMessageHandler QUIT_MEMBER_HANDLER = new QuitMemberRequestMessageHandler();
ch.pipeline().addLast(QUIT_MEMBER_HANDLER);//Forced offline user handler

effect:

Organizer: long journey time: January 12-january 14, 2021

I am a long way. Thank you for your patience in reading. If you have any questions, please point them out and I will actively adopt them!
Welcome to my official account, long road Java, sharing Java learning articles and related materials.
Q group: 851968786 we can discuss and study together
Note: it can be reproduced, and the article link needs to be attached

Keywords: Java Netty

Added by martian2k4 on Tue, 18 Jan 2022 13:31:02 +0200