IO doesn't feel much related to multi-threading, but NIO has changed the way threads are used at the application level and solved some practical difficulties. AIO is asynchronous IO and the previous series are also related. Here, for learning and recording, I also wrote an article to introduce NIO and AIO.
1. What is NIO
NIO is the abbreviation of New I/O, which is opposite to the old stream-based I/O method. Judging from the name, it represents a new set of Java I/O standards. It was incorporated into the JDK in Java 1.4 and has the following features:
All read and write operations from the channel must go through a buffer, and the channel is the abstraction of io, and the other end of the channel is the manipulated file.
2. Buffer
Implementation of Buffer in Java. Basic data types have their corresponding Buffers
Simple examples of using Buffer:
package test; import java.io.File;import java.io.FileInputStream;import java.nio.ByteBuffer;import java.nio.channels.FileChannel; public class Test { public static void main(String[] args) throws Exception { FileInputStream fin = new FileInputStream(new File( "d://temp_buffer.tmp")); FileChannel fc = fin.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); fc.read(byteBuffer); fc.close(); byteBuffer.flip();//Read and write conversion}}The steps to use are summarized:
1. Get Channel
2. Apply for a Buffer
3. Establish a read/write relationship between Channel and Buffer
4. Close
The following example is to use NIO to copy files:
public static void nioCopyFile(String resource, String destination) throws IOException { FileInputStream fis = new FileInputStream(resource); FileOutputStream fos = new FileOutputStream(destination); FileChannel readChannel = fis.getChannel(); // Read file channel FileChannel writeChannel = fos.getChannel(); // Write file channel ByteBuffer buffer = ByteBuffer.allocate(1024); // Read in the data cache while (true) { buffer.clear(); int len = readChannel.read(buffer); // Read in the data if (len == -1) { break; // Read in the } buffer.flip(); writeChannel.write(buffer); // Write to the file} readChannel.close(); writeChannel.close(); }There are 3 important parameters in the Buffer: position, capacity and limit
Here we need to distinguish between capacity and upper limit. For example, if a Buffer has 10KB, then 10KB is the capacity. If I read the 5KB file into the Buffer, then the upper limit is 5KB.
Here is an example to understand these 3 important parameters:
public static void main(String[] args) throws Exception { ByteBuffer b = ByteBuffer.allocate(15); // 15-byte buffer System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 10; i++) { // Save 10 bytes of data b.put((byte) i); } System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); // Reset position System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 5; i++) { System.out.print(b.get()); } System.out.println(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); }The whole process is shown in the figure:
At this time, the position is from 0 to 10, and the capacity and limit remain unchanged.
This operation resets the position. Usually, when converting the buffer from write mode to read mode, you need to perform this method. The flip() operation not only resets the current position to 0, but also sets the limit to the position of the current position.
The meaning of limit is to determine which data is meaningful. In other words, the data from position to limit is meaningful data because it is the data from the last operation. Therefore, flip operations often mean reading and writing conversion.
The same meaning as above.
Most methods in Buffer change these 3 parameters to achieve certain functions:
public final Buffer rewind()
Set position zero and clear mark
public final Buffer clear()
Set position zero, and set limit to capacity size and clear the flag mark
public final Buffer flip()
First set limit to the position where the position is, then set position to zero, and clear the flag bit mark, usually used during read and write conversion
File mapped to memory
public static void main(String[] args) throws Exception { RandomAccessFile raf = new RandomAccessFile("C://mapfile.txt", "rw"); FileChannel fc = raf.getChannel(); // Map the file into memory MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); while (mbb.hasRemaining()) { System.out.print((char) mbb.get()); } mbb.put(0, (byte) 98); // Modify the file raf.close(); }Modifying the MappedByteBuffer is equivalent to modifying the file itself, so the operation speed is very fast.
3. Channel
General structure of multi-threaded network server:
Simple multi-threaded server:
public static void main(String[] args) throws Exception { ServerSocket echoServer = null; Socket clientSocket = null; try { echoServer = new ServerSocket(8000); } catch (IOException e) { System.out.println(e); } while (true) { try { clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connect!"); tp.execute(new HandleMsg(clientSocket)); } catch (IOException e) { System.out.println(e); } } }The function is to write back the data to the client whenever the server reads.
Here tp is a thread pool, and HandleMsg is the class that handles messages.
static class HandleMsg implements Runnable{ omit part of the information public void run(){ try { is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); os = new PrintWriter(clientSocket.getOutputStream(), true); // Read the data sent by the client from the InputStream String inputLine = null; long b=System.currentTimeMillis(); while ((inputLine = is.readLine()) != null) { os.println(inputLine); } long e=System. currentTimeMillis (); System. out.println ("spend:"+(e - b)+" ms "); } catch (IOException e) { e.printStackTrace(); } finally { Close resource} } }Client:
public static void main(String[] args) throws Exception { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.println("Hello!"); writer.flush(); reader = new BufferedReader(new InputStreamReader( client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (Exception e) { } finally { // Omit resource closing} }The above network programming is very basic, and there will be some problems using this method:
Use one thread for each client. If the client experiences an exception such as delay, the thread may be occupied for a long time. Because the data preparation and reading are in this thread. At this time, if there are many clients, it may consume a lot of system resources.
Solution:
Use non-blocking NIO (read the data without waiting, the data is ready before working)
In order to reflect the efficiency of NIO use.
Here we first simulate an inefficient client to simulate the delay due to the network:
private static ExecutorService tp= Executors.newCachedThreadPool(); private static final int sleep_time=1000*1000*1000; public static class EchoClient implements Runnable{ public void run(){ try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.print("H"); LockSupport.parkNanos(sleep_time); writer.print("e"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("o"); LockSupport.parkNanos(sleep_time); writer.print("!"); LockSupport.parkNanos(sleep_time); writer.println(); writer.flush(); }catch(Exception e) { } } }Server-side output:
spend:6000ms
spend:6000ms
spend:6000ms
spend:6001ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6003ms
spend:6003ms
because
while ((inputLine = is.readLine()) != null)
It is blocked, so time is spent waiting.
What would happen if NIO was used to deal with this problem?
One of the big features of NIO is: notify me if the data is ready
Channel is a bit similar to streams, and a Channel can correspond to files or network sockets.
The selector is a selector that can select a channel and do something.
A thread can correspond to a selector, and a selector can poll multiple channels, and each channel has a socket.
Compared with the above thread corresponding to a Socket, after using NIO, a thread can poll multiple Sockets.
When the selector calls select(), it will check whether any client has prepared the data. When no data is ready, select() will block. It is usually said that NIO is non-blocking, but if the data is not ready, there will still be blocking.
When data is ready, after calling select(), a SelectionKey will be returned. The SelectionKey indicates that the data of a Channel on a selector has been prepared.
This Channel will only be selected when the data is ready.
In this way, NIO implements a thread to monitor multiple clients.
The client with the network delay just simulated will not affect the threads under NIO, because when a Socket network delays, the data is not ready, and the selector will not select it, but will select other prepared clients.
The difference between selectNow() and select() is that selectNow() does not block. When no client prepares data, selectNow() will not block and will return 0. When a client prepares data, selectNow() returns the number of prepared clients.
Main code:
package test; import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.Socket;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.spi.AbstractSelector;import java.nio.channels.spi.SelectorProvider;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedList;import java.util.Map;import java.util.Set;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class MultiThreadNIOEchoServer { public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>(); class EchoClient { private LinkedList<ByteBuffer> outq; EchoClient() { outq = new LinkedList<ByteBuffer>(); } public LinkedList<ByteBuffer> getOutputQueue() { return outq; } public void enqueue(ByteBuffer bb) { outq.addFirst(bb); } } class HandleMsg implements Runnable { SelectionKey sk; ByteBuffer bb; public HandleMsg(SelectionKey sk, ByteBuffer bb) { super(); this.sk = sk; this.bb = bb; } @Override public void run() { // TODO Auto-generated method stub EchoClient echoClient = (EchoClient) sk.attachment(); echoClient.enqueue(bb); sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); selector.wakeup(); } } private Selector selector; private ExecutorService tp = Executors.newCachedThreadPool(); private void startServer() throws Exception { selector = SelectorProvider.provider().openSelector(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress(8000); ssc.socket().bind(isa); // Register the event of interest, here is interested in the accpet event SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); for (;;) { selector.select(); Set readyKeys = selector.selectedKeys(); Iterator i = readyKeys.iterator(); long e = 0; while (i.hasNext()) { SelectionKey sk = (SelectionKey) i.next(); i.remove(); if (sk.isAcceptable()) { doAccept(sk); } else if (sk.isValid() && sk.isReadable()) { if (!geym_time_stat.containsKey(((SocketChannel) sk .channel()).socket())) { geym_time_stat.put( ((SocketChannel) sk.channel()).socket(), System.currentTimeMillis()); } doRead(sk); } else if (sk.isValid() && sk.isWritable()) { doWrite(sk); e = System.currentTimeMillis(); long b = geym_time_stat.remove(((SocketChannel) sk .channel()).socket()); System.out.println("spend:" + (e - b) + "ms"); } } } } private void doWrite(SelectionKey sk) { // TODO Auto-generated method stub SocketChannel channel = (SocketChannel) sk.channel(); EchoClient echoClient = (EchoClient) sk.attachment(); LinkedList<ByteBuffer> outq = echoClient.getOutputQueue(); ByteBuffer bb = outq.getLast(); try { int len = channel.write(bb); if (len == -1) { disconnect(sk); return; } if (bb.remaining() == 0) { outq.removeLast(); } } catch (Exception e) { // TODO: handle exception disconnect(sk); } if (outq.size() == 0) { sk.interestOps(SelectionKey.OP_READ); } } private void doRead(SelectionKey sk) { // TODO Auto-generated method stub SocketChannel channel = (SocketChannel) sk.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try { len = channel.read(bb); if (len < 0) { disconnect(sk); return; } } catch (Exception e) { // TODO: handle exception disconnect(sk); return; } bb.flip(); tp.execute(new HandleMsg(sk, bb)); } private void disconnect(SelectionKey sk) { // TODO Auto-generated method stub //Omit the dry closing operation} private void doAccept(SelectionKey sk) { // TODO Auto-generated method stub ServerSocketChannel server = (ServerSocketChannel) sk.channel(); SocketChannel clientChannel; try { clientChannel = server.accept(); clientChannel.configureBlocking(false); SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); EchoClient echoClinet = new EchoClient(); clientKey.attach(echoClinet); InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("Accepted connection from " + clientAddress.getHostAddress()); } catch (Exception e) { // TODO: handle exception } } public static void main(String[] args) { // TODO Auto-generated method stub MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer(); try { echoServer.startServer(); } catch (Exception e) { // TODO: handle exception } } }The code is for reference only, and its main feature is that you are interested in different events to do different things.
When using the delayed client that was simulated earlier, the time consumption this time is between 2ms and 11ms. The performance improvement is obvious.
Summarize:
1. After NIO prepares the data, it will hand it over to the application for processing. The data reading/writing process is still completed in the application thread, and it will only strip the waiting time to a separate thread.
2. Save data preparation time (because Selector can be reused)
5. AIO
Features of AIO:
1. Notify me after reading
2. IO will not be accelerated, but will be notified after reading it
3. Use callback functions to perform business processing
AIO related code:
AsynchronousServerSocketChannel
server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));
Use the accept method on server
public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
CompletionHandler is a callback interface. When there is a client accept, it does what is in the handler.
Sample code:
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { final ByteBuffer buffer = ByteBuffer.allocate(1024); public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println(Thread.currentThread().getName()); Future<Integer> writeResult = null; try { buffer.clear(); result.read(buffer).get(100, TimeUnit.SECONDS); buffer.flip(); writeResult = result.write(buffer); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { server.accept(null, this); writeResult.get(); result.close(); } catch (Exception e) { System.out.println(e.toString()); } } } @Override public void failed(Throwable exc, Object attachment) { System.out.println("failed: " + exc); } });Here we use Future to achieve instant return. For Future, please refer to the previous article
Based on understanding NIO, looking at AIO, the difference is that AIO waits for the read and write process to call the callback function before completing.
NIO is synchronous and non-blocking
AIO is asynchronous and non-blocking
Since the NIO read and write process is still completed in the application thread, NIO is not suitable for those with long read and write process.
The AIO read and write process is notified only after it is completed, so AIO is competent for heavyweight and long-term reading and writing process tasks.