JAVA-IO streaming advanced NIO

NIO

Java NIO (New IO) is a new IO API introduced from Java version 1.4. NIO has the same function and purpose as the original IO, but it is used in a totally different way. NIO supports buffer oriented, channel based IO operations. NIO will read and write files in a more efficient way.

Main differences between NIO and IO:

IO NIO
Stream Oriented Buffer Oriented
Blocking IO Non Blocking IO
nothing Selectors

buffer

The core of Java NIO system is channel and buffer. A channel represents a connection opened to an IO device, such as a file, socket. If you need to use NIO system, you need to get channels to connect IO devices and buffers to hold data. Then operate the buffer to process the data.

That is, Channel is responsible for transmission and Buffer is responsible for storage

Buffer: a container for a specific basic data type. By java.nio All buffers defined by the package are subclasses of the buffer abstract class. It is mainly used to interact with NIO channel. Data is read into buffer from channel and written into channel from buffer.

According to different data types (except for boolean), corresponding types of buffers are provided, and the management mode is almost the same. The cache area is obtained by allocate() method: ByteBuffer, CharBuffer, ShortBuffer, IntBuffer, LongBuffer, FloatBuffer, doublebeuffer

Four core attributes:

  • Capacity: indicates the maximum data capacity of Buffer. The Buffer capacity cannot be negative and cannot be changed after creation
  • Limit: indicates the size of data that the buffer can operate on, that is, data cannot be read or written after the limit
  • Position: indicates the position where the buffer is manipulating data (0 < = mark < = position < = limit < = capacity)
  • Mark and reset: a mark is an index. You can specify a specific position in the Buffer through the mark() method in the Buffer, and then you can restore it by calling reset() method
method describe
Buffer clear() Clear buffer and return reference to buffer
Buffer flip() Set the buffer limit to the current location and top up the current location to 0, that is, read data mode
int capacity() Return the capacity size of Buffer
boolean hasRemaining() Determine whether there are elements in the buffer
int limit() Returns the location of the Buffer's limit
Buffer limit(int n) The buffer limit will be set to n and a buffer object with a new limit will be returned
Buffer mark() Mark buffer
int position() Returns the current position of the buffer
Buffer position(int n) The current position of the Buffer is set to n, and the modified Buffer object is returned
int remaining() Returns the number of elements between position and limit
Buffer reset() Move the position to the position of the previously set mark
Buffer rewind() Set the position to 0, cancel the mark set

Data operation:

  • Get data in Buffer
    • get(): read a single byte
    • get(byte[] dst): batch read multiple bytes into dst
    • get(int index): read the bytes of the specified index position (position will not be moved)
  • Put data into Buffer
    • put(byte b): the current location where the order byte will be written to the buffer
    • put(byte[] src): writes bytes in src to the current location of the buffer
    • put(int index, byte b): writes the specified byte to the index position of the buffer (position will not be moved)
String s = "hello";
//1. Allocate a buffer of specified size
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
System.out.println(byteBuffer.position());//0
System.out.println(byteBuffer.capacity());//1024
System.out.println(byteBuffer.limit());//1024
System.out.println("----------------------------");
//2. Store a byte array
byteBuffer.put(s.getBytes());
System.out.println(byteBuffer.position());//5
System.out.println(byteBuffer.capacity());//1024
System.out.println(byteBuffer.limit());//1024
System.out.println("----------------------------");
//3. Switch to read data mode
byteBuffer.flip();
System.out.println(byteBuffer.position());//0
System.out.println(byteBuffer.capacity());//1024
System.out.println(byteBuffer.limit());//5
System.out.println("----------------------------");
//4. Read data
byteBuffer.mark();//Mark position at this time
byte[] b = new byte[byteBuffer.limit()];
byteBuffer.get(b);
System.out.println(new String(b,0,byteBuffer.limit()));//hello
System.out.println(byteBuffer.position());//5
System.out.println(byteBuffer.capacity());//1024
System.out.println(byteBuffer.limit());//5
byteBuffer.reset();//Position returns to the last marked position
System.out.println(byteBuffer.position());//0
System.out.println("----------------------------");
//5.rewind(): repeatable data
byteBuffer.rewind();
System.out.println(byteBuffer.position());//0
System.out.println(byteBuffer.capacity());//1024
System.out.println(byteBuffer.limit());//5
System.out.println("----------------------------");
//6.clear(): clear the buffer, but the buffer data still exists, but the data is in the forgotten state
byteBuffer.clear();
System.out.println(byteBuffer.position());//0
System.out.println(byteBuffer.capacity());//1024
System.out.println(byteBuffer.limit());//1024

Direct and indirect buffers:

Indirect buffer: allocate the buffer through allocate() method, and build the buffer in the memory of the JVM.

Direct buffer: allocate buffer through allocateDirect() method or FileChannel's map(). Set the buffer in the operating system's physical memory, only supporting ByteBuffer. (it improves efficiency, but consumes a lot of resources and is difficult to control. It is impossible to control when to write and when to recycle.)

It is recommended that direct buffers be allocated primarily to large, persistent buffers that are vulnerable to native I/O operations of the underlying system. In general, it is best to allocate direct buffers only when they provide a significant benefit in terms of program performance.

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
byteBuffer.isDirect();//Determine whether it is a direct buffer

Channel

By java.nio.channels Package definition, used for the connection between IO source node and target node. In Java NIO, it is responsible for the data transmission in the buffer. Channel itself is not responsible for storing data, so it needs to cooperate with buffer for data transmission.

Main implementation classes:

  • FileChannel: channel for reading, writing, mapping, and manipulating files
  • Datagram channel: read and write data channels in the network through UDP
  • SocketChannel: read and write data in the network through TCP
  • ServerSocketChannel: can listen to new incoming TCP connections. A SocketChannel will be created for each new incoming connection

Get Channel:

  • Get method 1: one way to get the channel is to call getChannel() method on the object supporting the channel
    • Local IO: FileInputStream, FileOutputStream, RandomAccessFile
    • Network IO: datagram Socket, Socket, ServerSocket
  • Obtain method 2: open and return the specified channel through the static method open() of the channel
  • Get method 3: use the static method newByteChannel() of the Files class to get the byte channel
@Test
public void test1() throws Exception {
    //Channel + non direct buffer to complete file copy
    FileInputStream fileInputStream = new FileInputStream(new File("1.jpg"));
    FileOutputStream fileOutputStream = new FileOutputStream(new File("2.jpg"));
    FileChannel inchannel = fileInputStream.getChannel();
    FileChannel outchannel = fileOutputStream.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);//Indirect buffer
    while (inchannel.read(buffer) != -1){
        buffer.flip();
        outchannel.write(buffer);
        buffer.clear();
    }
    outchannel.close();
    inchannel.close();
    fileOutputStream.close();
    fileInputStream.close();
}
@Test
public void test2() throws IOException {
    //Direct buffer completes file copy (memory mapped file)
    FileChannel inchannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
    FileChannel outchannel = FileChannel.open(Paths.get("3.jpg"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
    MappedByteBuffer inmap = inchannel.map(FileChannel.MapMode.READ_ONLY, 0, inchannel.size());//Memory mapped file, the same as non direct buffer
    MappedByteBuffer outmap = outchannel.map(FileChannel.MapMode.READ_WRITE, 0, inchannel.size());
    byte[] b = new byte[inmap.limit()];
    inmap.get(b);
    outmap.put(b);
    inchannel.close();
    outchannel.close();
}

Data transmission between channels:

  • transferFrom()
  • transferTo()
@Test
public void test3() throws Exception {
    //Data transmission between channels is also the way of direct buffer
    FileChannel inchannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
    FileChannel outchannel = FileChannel.open(Paths.get("3.jpg"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
    inchannel.transferTo(0,inchannel.size(),outchannel);
    outchannel.transferFrom(inchannel,0,inchannel.size());
    inchannel.close();
    outchannel.close();
}

Scatter and gather

Scattering Reads is to spread the data in the channel into multiple buffers.

Gathering Writes is to gather data from multiple buffers into a Channel. (write the data between position and limit to Channel in the order of buffer.)

ByteBuffer[] bufs = new ByteBuffer[]{buf1,buf2,buf3};
channel.read(bufs);

channe2.writer(bufs)

Character set: Charset

Encoding: string to byte array

Decoding: byte array converted to string

@Test
public void test3() throws Exception {
    Charset c = Charset.forName("GBK");
    CharsetEncoder encoder = c.newEncoder();
    CharsetDecoder decoder = c.newDecoder();
    CharBuffer buffer = CharBuffer.allocate(1024);
    buffer.put("Peng Yuyan is really handsome");
    buffer.flip();
    ByteBuffer byteBuffer = encoder.encode(buffer);//code
    byteBuffer.flip();
    CharBuffer charBuffer = decoder.decode(byteBuffer);//decode
}

Non blocking network communication of NIO

Blocking and non blocking:

  • Traditional IO flows are blocking. That is, when a thread calls read() or write(), the thread is blocked until some data is read or written, and the thread cannot perform other tasks during this period. Therefore, when the network communication is completed for IO operation, the threads will block, so the server must provide each client with an independent thread for processing. When the server needs to process a large number of clients, the performance drops sharply.
  • Java NIO is non blocking. When a thread reads and writes data from a channel, if no data is available, the thread can perform other tasks. Threads usually use the free time of non blocking IO to perform IO operations on other channels, so a single thread can manage multiple input and output channels. Therefore, NIO allows the server side to use one or a limited number of threads to handle all clients connected to the server side at the same time.

Using NIO to complete the three core of network communication:

  • Channel: responsible for connection
    • java.nio.channels.Channel interface
    • SelectableChannel
      •           SocketChannel
        
      •           ServerSocketChannel
        
      •           DatagramChannel
        
      •           Pipe.SinkChannel
        
      •           Pipe.SourceChannel
        
  • Buffer: responsible for data access
  • Selector: it is the multiplexer of the SelectableChannel, which is used to monitor the IO status of the SelectableChannel.

SocketChannel in Java NIO is a channel (client) that connects to a TCP network socket. ServerSocketChannel is a channel that can listen for new incoming TCP connections, just like ServerSocket in standard IO (server side).

Operation steps of SocketChannel:

  • Open SocketChannel
  • Read and write data
  • Close SocketChannel

Blocking IO network communication: (without selector)

public class TestBlockingNIO {
    //No Selector, blocking IO
@Test
public void client() throws IOException{
    //Access channel, FileChannel is used to access local file communication, SocketChannel is used to send network communication
    SocketChannel sChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1",9898));
    FileChannel inChannel=FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
    //Send to server
    ByteBuffer buf=ByteBuffer.allocate(1024);
    while(inChannel.read(buf)!=-1){
        buf.flip();
        sChannel.write(buf);
        buf.clear();
    }
    sChannel.shutdownOutput();//Close the transmission channel, indicating that the transmission is completed

    //Receive feedback from the server
    int len=0;
    while((len=sChannel.read(buf))!=-1){
        buf.flip();
        System.out.println(new String(buf.array(),0,len));
        buf.clear();
    }
    inChannel.close();
    sChannel.close();
}

//Server
@Test
public void server() throws IOException{
    ServerSocketChannel ssChannel=ServerSocketChannel.open();
    FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
    //Bind connection
    ssChannel.bind(new InetSocketAddress(9898));
    //Get channel for client connection
    SocketChannel sChannel=ssChannel.accept();
    //Receive client data
    ByteBuffer buf=ByteBuffer.allocate(1024);
    while(sChannel.read(buf)!=-1){
        buf.flip();
        outChannel.write(buf);
        buf.clear();
    }

    //Send feedback to client
    buf.put("Server receives data successfully".getBytes());
    buf.flip();
    sChannel.write(buf);

    sChannel.close();
    outChannel.close();
    ssChannel.close();
}
}

Nonblocking IO network communication: (using selector)

The selector is a multiplexer of the SelectableChannel object. The selector can monitor the IO status of multiple selectablechannels at the same time. That is to say, a single thread can manage multiple channels by using the selector. Selector is the core of non blocking IO

Usage:

  • Create selector: by calling Selector.open() method creates a selector
  • Register channel with selector: SelectableChannel.register(Selector sel, int ops), where ops is the selectionKey, the value is as follows:
    • Read: SelectionKey.OP_READ
    • Write: SelectionKey.OP_WRITE
    • connect: SelectionKey.OP_CONNECT
    • receive: SelectionKey.OP_ACCEPT
    • If you listen to more than one event during registration, you can use the bitwise OR operator connection. ( SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE)
public class TestNonBlockingNIO {
    //client
    @Test
    public void client()throws IOException{
        //1. Access
        SocketChannel sChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        //2. Switch non blocking mode
        sChannel.configureBlocking(false);
        //3. Allocate buffer of specified size
        ByteBuffer buf=ByteBuffer.allocate(1024);
        //4. Send data to the server
        Scanner scan=new Scanner(System.in);
        while(scan.hasNext()){
            String str=scan.next();
            buf.put((new Date().toString()+"\n"+str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        //5. Close the channel
        sChannel.close();
    }
//Server
@Test
public void server() throws IOException{
    //1. Access
    ServerSocketChannel ssChannel=ServerSocketChannel.open();
    //2. Switch non blocking mode
    ssChannel.configureBlocking(false);
    //3. Binding connection
    ssChannel.bind(new InetSocketAddress(9898));
    //4. Get selector
    Selector selector=Selector.open();
    //5. Register the channel to the selector and specify "listen for receive events"
    ssChannel.register(selector,SelectionKey.OP_ACCEPT);
    //6. Events that have been "ready" on polling get selector
    while(selector.select()>0){
        //7. Get all registered "selection keys (ready listening events)" in the current selector
        Iterator<SelectionKey> it=selector.selectedKeys().iterator();
        while(it.hasNext()){
            //8. Get ready events
            SelectionKey sk=it.next();
            //9. Judge when it is ready
            if(sk.isAcceptable()){
                //10. If "ready to receive", get client connection
                SocketChannel sChannel=ssChannel.accept();
                //11. Switch non blocking mode
                sChannel.configureBlocking(false);
                //12. Register the channel to the selector
                sChannel.register(selector, SelectionKey.OP_READ);
            }else if(sk.isReadable()){
                //13. Get the channel of "read ready" status on the current selector
                SocketChannel sChannel=(SocketChannel)sk.channel();
                //14. Read data
                ByteBuffer buf=ByteBuffer.allocate(1024);
                int len=0;
                while((len=sChannel.read(buf))>0){
                    buf.flip();
                    System.out.println(new String(buf.array(),0,len));
                    buf.clear();
                }
            }
            //15. Deselect key
            it.remove();
        }
    }
}
}

DatagramChannel

Datagram channel in Java NIO is a channel that can send and receive UDP packets.

 DatagramChannel dc=DatagramChannel.open();
public class TestNonBlockNIO2 {
    @Test
    public void send() throws IOException{
        DatagramChannel dc=DatagramChannel.open();
        dc.configureBlocking(false);
        ByteBuffer buf=ByteBuffer.allocate(1024);
        Scanner scan=new Scanner(System.in);
        while(scan.hasNext()){
            String str=scan.next();
            buf.put((new Date().toString()+"\n"+str).getBytes());
            buf.flip();
            dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
            buf.clear();
        }
        dc.close();
    }

    @Test
    public void receive() throws IOException{
        DatagramChannel dc=DatagramChannel.open();
        dc.configureBlocking(false);
        dc.bind(new InetSocketAddress(9898));
        Selector selector=Selector.open();
        dc.register(selector, SelectionKey.OP_READ);
        while(selector.select()>0){
            Iterator<SelectionKey> it=selector.selectedKeys().iterator();
            while(it.hasNext()){
                SelectionKey sk=it.next();
                if(sk.isReadable()){
                    ByteBuffer buf=ByteBuffer.allocate(1024);
                    dc.receive(buf);
                    buf.flip();
                    System.out.println(new String(buf.array(),0,buf.limit()));
                    buf.clear();
                }
            }
            it.remove();
        }
    }
}

Pipe

The Java NIO pipeline is a one-way data connection between two threads. Pipe has a source channel and a sink channel.

The data is written to the sink channel and read from the source channel.

public class TestPipe {
    @Test
    public void test1()throws IOException{
        //1. Access pipeline
        Pipe pipe=Pipe.open();
        //2. Write the data in the buffer to the pipeline. The write is sink
        ByteBuffer buf=ByteBuffer.allocate(1024);
        Pipe.SinkChannel sinkChannel=pipe.sink();
        buf.put("Send data through a one-way pipe".getBytes());
        buf.flip();
        sinkChannel.write(buf);

        //3. Read the data in the buffer. The read is source
        Pipe.SourceChannel sourceChannel=pipe.source();
        buf.flip();
        int len=sourceChannel.read(buf);
        System.out.println(new String(buf.array(),0,len));

        sourceChannel.close();
        sinkChannel.close();
    }
}

Keywords: Java network socket jvm

Added by Bill H on Tue, 16 Jun 2020 10:48:24 +0300