Mina Implements the Complete Process of Socket Communication

title: Mina server-side client communication
date: 2018-09-30 09:00:30
tags:

- [mina]
- [tcp]

categories:

- [Programming]

permalink: zxh

[TOC]

The first two chapters have introduced the theory completely. Today we use these theories to implement the c/s communication of tcp protocol.First, let's briefly review the previous introduction.
In mina, our clients and services are exactly the same, except we use different adapters.But his data processing is the same.Today we'll focus on how to set up a server, a client
And handles message communication processing between them

Server

The difference between the server and the client is that we create different listening objects. Clients send messages to the server, and the server needs to go through filter processing to reach the message center. But in the filter, we need to decode the messages before we can process our business where they are received..Normally we need to respond to the client after processing the message.The response also goes through encoding logic in the filter, encoding the data and sending it.Sending information to the client can be seen as the direction of the server.It also needs to be coded and decoded.Here's the creation code for the server


//Create Listening Object
IoAcceptor acceptor = new NioSocketAcceptor();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//Add Filter
acceptor.getFilterChain().addLast("logger",new LoggingFilter());
acceptor.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        textLineCodecFactory
));
//Set handler for time processing
acceptor.setHandler(new ServerMessageHandler());
//Set the size of the read data cache
acceptor.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//Set how long to go idle without a message
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
//Bind Port
try {
    acceptor.bind(new InetSocketAddress(Constaint.REMOTE_PORT));
} catch (IOException e) {
    logger.error(String.format("bind %s error",Constaint.REMOTE_PORT));
    e.printStackTrace();
}
logger.info(String.format("bind %s success",Constaint.REMOTE_PORT));

Client

    
//Create Listening Object
IoConnector connector = new NioSocketConnector();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//Add Filter
//Log filter.sltf log settings
connector.getFilterChain().addLast("logger",new LoggingFilter());
//Codec is provided in this filter, where the encoding is an information that ends in \rn 
connector.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        new SocketFactory()
));
//Set handler for time processing, provide listener functions for session life cycle, message acceptance, send functions
connector.setHandler(new ClientMessageHandler());
//Set the size of the read data cache
connector.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//Set how long to go idle without a message
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
ConnectFuture future = connector.connect(new InetSocketAddress(Constaint.REMOTE_IP,Constaint.REMOTE_PORT));
//Asynchronous processing, no blocking here
future.addListener(new IoFutureListener<IoFuture>() {
    @Override
    public void operationComplete(IoFuture ioFuture) {
        logger.info("Connection preparation complete");
        IoSession session = ioFuture.getSession();

    }
});

Signal communication

  • In fact, the server above, both sides of the client should be created by communication, we found when creating above.The message processor (IoHandlerAdapter) needs to be specified when it is created, which is executed after the IoFilter in the IoService.We call our message processor after the filter executes.

    private static Logger logger = LogManager.getLogger(ServerMessageHandler.class);
    public void sessionCreated(IoSession session) throws Exception {
        super.sessionCreated(session);
        logger.info("sessionCreated");
    }

    public void sessionOpened(IoSession session) throws Exception {
        super.sessionOpened(session);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        logger.info("sessionOpened");
    }

    public void sessionClosed(IoSession session) throws Exception {
        super.sessionClosed(session);
        logger.info("sessionClosed");
    }

    public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception {
        super.sessionIdle(session,idleStatus);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        //        logger.info("sessionIdle");
    }

    public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception {
        logger.info("exceptionCaught");
        throwable.printStackTrace();
    }

    public void messageReceived(IoSession session, Object message) throws Exception {
        super.messageReceived(session, message);
        String info = message.toString();
        Date date = new Date(System.currentTimeMillis());
        SimpleDateFormat sdf = new  SimpleDateFormat("yy-MM-dd HH:mm:ss");
        String time = sdf.format(date);
        session.write(time);
        System.out.println("Received message:"+info);
    }

    public void messageSent(IoSession session, Object message) throws Exception {
        super.messageSent(session, message);
        logger.info("messageSent");
    }
  • Here the message processor provides a few moments of control, such as where session s are executed when they are created or destroyed.Where the message is received, where the message is successfully sent.These controls can be moderately replicated to suit our needs.

Custom Factory Codec

  • Factories are the way to provide codec.This factory is loaded in the ProtocolCodecFilter.We can also customize filters, in which we can load our custom factories for codec.Where we encode and decode, we can add our business code.For example, decoding can get messages in our IoHandler Adapter's messageReceived method by writing them out through ProtocolDecoderOutput after reading the content through a convention protocol.Then business writing.This decouples the code.

public class SocketFactory  implements ProtocolCodecFactory {
    private MessageDecoder decoder;
    private MessageEncoder encoder;

    public SocketFactory() {
        decoder = new MessageDecoder();
        encoder = new MessageEncoder();
    }

    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return this.decoder;
    }

    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return this.encoder;
    }
}

Decoder

  • The above factory provides codec.Factories provide functions as we do in our lives, but they are not actually made by factories. They may only act as agents, just as processing factories.The same is true for mina communications.True codec is not performed by the factory, this section will expose the decoder CumulativeProtocolDecoder
  • Once the decoder is written, you just need to create it in the custom factory above.For custom encoders, simply inherit the CumulativeProtocolDecoder class.And just copy the doDecode method.The return value of this method is of type boolean.Different return values mean different things.It needs to be cleared up here

    • True: Returning true means that you have consumed information from the messages in the CumulativeProtocolDecoder. You should also call the wirte r of the ProtocolDecoderOutput before returning true in the encoder to publish the message to the IoHandAdaptor for business processing.But there's something else going on here, because our service-side clients are long connections, so messages are coming from us all the time. The messages in our cache may be a whole message, not a whole message, or a little more.
      1. If it is not a complete (half-package) then we go back to falsed and wait for the client to continue sending
      2. If it happens to be a whole strip, then when we accept it and return to true, we will have no data in the cache. In CumulativeProtocolDecoder, the call to doDecode in decoding will stop, which is not unexpected.
      3. There is more data than one complete message (sticky package), so we need to return true after processing one message, but CumulativeProtocolDecoder will continue to assemble the remaining cache and the remaining message will be equivalent to a second internal decoding.If not, that's equivalent to the first case above

      Remember three situations: half pack, normal, sticky

    • False: returning false means the data in the cache is not enough for our entire message and we need to continue waiting for messages from the client.The cache mechanism in CumulativeProtocolDecoder constantly splices data from clients into the cache

public class MessageDecoder extends CumulativeProtocolDecoder {
    /**
     * This method return true: Indicates that CumulativeProtocolDecoder in the parent continuously calls this method for message consumption
     *       return false: Indicates that the message has been consumed completely and the data in the cache will no longer be consumed.Waiting for Client to Re-enter
     *       The messaging interface is triggered when a message is sent, and the old and new messages are stitched together for processing.
     * @param ioSession
     * @param ioBuffer
     * @param protocolDecoderOutput
     * @return
     * @throws Exception
     */
    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        IoBuffer buffer = IoBuffer.allocate(10);
        while (ioBuffer.hasRemaining()) {
            if (ioBuffer.remaining()<3) {
                //Continue to accept
                return false;
            }
            //Get three bytes
            int oldLimit = ioBuffer.limit();
            ioBuffer.limit(ioBuffer.position()+3);
            String text = ioBuffer.getString(Charset.forName("UTF-8").newDecoder());
            protocolDecoderOutput.write(text);
            ioBuffer.limit(oldLimit);
            if (ioBuffer.hasRemaining()) {
                return true;
            }
        }
        return false;
    }
}

Encoder

  • Encoders are much simpler than decoders. Encoders join our protocol. Normally, messages in our business code are Java entities. What we need to do is convert Java entities into IoBuffers to send them according to the protocol.But messages sent in mina are sent through the write method in IoSession.We looked at the source code and found that in IoSession.write(Object o), if it was an IoBuffer, it would not go through our encoder, otherwise it would go through our encoder to encode and eventually send the converted IoBuffer out.


public class MessageEncoder extends ProtocolEncoderAdapter {
    @Override
    public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
        //TODO coded according to protocol
        //After assembly ioSession.write(IoBuffer) writes out
        System.out.println(o);
    }
}

summary

Join the team

<span id="addMe">Join the team</span>

WeChat Public Number

Keywords: Java Session codec Windows encoding

Added by kaos057 on Fri, 23 Aug 2019 05:30:04 +0300