This article will introduce from shallow to deep from traditional BIO to NIO to AIO, and will be accompanied by a complete code explanation.
An example will be used in the following code: the client sends a string of the equation to the server, and the server returns the result to the client after calculation.
All instructions for the code are directly used as comments and are embedded in the code, which can be easier to understand when reading the code. A tool class for calculating the result will be used in the code, see the code section of the article.
Recommended articles for related basic knowledge:
Introduction to Linux network I/O model (pictures and text)
Java concurrency (multi-threading)
1. BIO programming
1.1. Traditional BIO programming
The basic model of network programming is the C/S model, that is, communication between two processes.
The server provides IP and listening ports. The client initiates a connection request through the connection operation address that the server wants to listen to. Through three handshakes, if the connection is successfully established, both parties can communicate through sockets.
In the development of traditional synchronization blocking model, ServerSocket is responsible for binding IP addresses and starting listening ports; Socket is responsible for initiating connection operations. After the connection is successful, both parties conduct synchronous blocking communication through the input and output streams.
A brief description of the BIO server communication model: the server using the BIO communication model is usually an independent Acceptor thread responsible for listening to the client's connection. After receiving the client connection request, it creates a new thread for each client for link processing and fails to process it, and then returns the reply to the client through the output stream, and the thread is destroyed. That is, a typical one-request-to-reply all-night model.
Traditional BIO communication model diagram:
The biggest problem with this model is that it lacks elastic scaling capabilities. When the number of concurrent accesses on the client increases, the number of threads on the server is proportional to the number of concurrent accesses on the client. Threads in Java are also relatively valuable system resources. After the number of threads expands rapidly, the performance of the system will drop sharply. As the number of accesses continues to increase, the system will eventually die.
Server source code created by synchronous blocking I/O:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * BIO server source code* @author yangtao__anxpp.com * @version 1.0 */ public final class ServerNormal { //Default port number private static int DEFAULT_PORT = 12345; //Singleton ServerSocket private static ServerSocket server; //Set the listening port according to the incoming parameters. If there are no parameters, call the following method and use the default value public static void start() throws IOException{ //Use the default value start(DEFAULT_PORT); } //This method will not be accessed in a large number of concurrent ways, and there is no need to consider the efficiency, just synchronize the method directly public synchronized static void start(int port) throws IOException{ if(server != null) return; try{ //Create ServerSocket through the constructor //If the port is legal and idle, the server will listen successfully. Server = new ServerSocket(port); System.out.println("The server has been started, port number: " + port); // Listen to client connections through wireless loop//If there is no client access, it will be blocked on the accept operation. while(true){ Socket socket = server.accept(); //When there is a new client access, the following code will be executed//Then create a new thread to handle this Socket link new Thread(new ServerHandler(socket)).start(); } } finally{ //Some necessary cleaning work if(server != null){ System.out.println("The server is closed."); server.close(); server = null; } } } } Client message processing thread ServerHandler source code:
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import com.anxpp.io.utils.Calculator; /** * Client thread* @author yangtao__anxpp.com * Socket link for a client*/ public class ServerHandler implements Runnable{ private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try{ in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); String expression; String result; while(true){ //Read a line through BufferedReader//If you have read the tail of the input stream, return null and exit the loop//If you get a non-null value, try to calculate the result and return if((expression = in.readLine())==null) break; System.out.println("The server received a message: " + expression); try{ result = Calculator.cal(expression).toString(); } catch(Exception e){ result = "Calculator.cal(expression).toString(); } catch(Exception e){ e.printStackTrace(); } finally{ //Some necessary cleanup work if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } } } Client source code created by synchronous blocking I/O:
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * Client created by blocking I/O* @author yangtao__anxpp.com * @version 1.0 */ public class Client { //Default port number private static int DEFAULT_SERVER_PORT = 12345; private static String DEFAULT_SERVER_IP = "127.0.0.1"; public static void send(String expression){ send(DEFAULT_SERVER_PORT,expression); } public static void send(int port,String expression){ System.out.println("Arithmetic expression is: " + expression); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try{ socket = new Socket(DEFAULT_SERVER_IP,port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(expression); System.out.println("___The result is: " + in.readLine()); }catch(Exception e){ e.printStackTrace(); } finally{ //Are the necessary cleaning work if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } } Test the code, in order to facilitate viewing the output results in the console, put it in the same program (jvm) to run:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.util.Random; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test the main method public static void main(String[] args) throws InterruptedException { //Run the server new Thread(new Runnable() { @Override public void run() { try { ServerBetter.start(); } catch (IOException e) { e.printStackTrace(); } } }).start(); //Avoid the client executing the code before the server starts; //Run the client char operators[] = {'+','-','*','/'}; Random random = new Random(System.currentTimeMillis()); new Thread(new Runnable() { @SuppressWarnings("static-access") @Override public void run() { while(true){ //Random generates arithmetic expression String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1); Client.send(expression); try { Thread.currentThread().sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Thread. currentThread().sleep(random.nextInt(1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Thread. currentThread().sleep(random.nextInt(1000)); } } } } } } } } The results of one of the runs:
The server has been started, port number: 12345 Arithmetic expression is: 4-2 Server received the message: 4-2___ Result is: 2 Arithmetic expression is: 5-10 Server received the message: 5-10__ Result is: -5 Arithmetic expression is: 0-9 Server received the message: 0-9__ Result is: -9 Arithmetic expression is: 0+6 Server received the message: 0+6__ Result is: 6 Arithmetic expression is: 1/6 Server received the message: 1/6__ Result is: 0.166666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666
From the above code, it is easy to see that the main problem of BIO is that whenever a new client requests access, the server must create a new thread to handle this link, which cannot be applied in scenarios where high performance and high concurrency are needed (a large number of new threads will seriously affect server performance and even strike).
1.2. Pseudo-asynchronous I/O programming
To improve this one-connection-one-thread model, we can use a thread pool to manage these threads (for more information, please refer to the article provided earlier), implementing a model for one or more threads to process N clients (but the underlying layer still uses synchronous blocking I/O), which is often called the "pseudo-asynchronous I/O model".
Pseudo-asynchronous I/O model diagram:
The implementation is very simple. We just need to hand over the new thread to thread pool management, and just change the Server code just now:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * BIO server source code__pseudo-asynchronous I/O * @author yangtao__anxpp.com * @version 1.0 */ public final class ServerBetter { //Default port number private static int DEFAULT_PORT = 12345; //Singleton ServerSocket private static ServerSocket server; //Singleton private static ExecutorService executorService = Executors.newFixedThreadPool(60); //Set the listening port according to the incoming parameters. If there is no parameter, call the following method and use the default value public static void start() throws IOException{ //Use the default value start(DEFAULT_PORT); } //This method will not be accessed in a large number of concurrently, and there is no need to consider the efficiency, just synchronize the method directly public synchronized static void start(int port) throws IOException{ if(server != null) return; try{ //Create ServerSocket through the constructor //If the port is legal and idle, the server will listen successfully. Server = new ServerSocket(port); System.out.println("The server has been started, port number: " + port); //Superce the client connection through wireless loop//If there is no client access, it will be blocked on the accept operation. while(true){ Socket socket = server.accept(); //When there is a new client access, the following code will be executed // Then create a new thread to process the Socket link executorService.execute(new ServerHandler(socket)); } } finally{ //Some necessary cleaning work if(server != null){ System.out.println("The server is closed."); server.close(); server = null; } } } } The test run results are the same.
We know that if we use CachedThreadPool thread pool (no limiting the number of threads, if not clear, please refer to the article provided at the beginning of the article), in fact, in addition to automatically helping us manage threads (reuse), it also looks like a 1:1 client: thread count model. Using FixedThreadPool, we effectively control the maximum number of threads, ensure the control of limited resources of the system, and implement the N:M pseudo-asynchronous I/O model.
However, because the number of threads is limited, if a large number of concurrent requests occur, the threads exceeding the maximum number can only wait until there are free threads in the thread pool that can be reused. When the Socket input stream is read, it will be blocked until it occurs:
Therefore, when reading data is slow (such as large amount of data, slow network transmission, etc.) and large amounts of concurrency, other access messages can only be waited all the time, which is the biggest disadvantage.
The NIO that will be introduced later can solve this problem.
2. NIO programming
New Java I/O library is introduced into the java.nio.* package in JDK 1.4, with the purpose of increasing speed. In fact, the "old" I/O package has been reimplemented using NIO, and we can benefit from it even if we don't explicitly use NIO programming. Speed improvements can occur in both file I/O and network I/O, but this article only discusses the latter.
2.1. Introduction
We generally think of NIO as New I/O (also the official name), because it is new to the old I/O library (actually it has been introduced in JDK 1.4, but this noun will continue to be used for a long time, even if they are "old" now, so it also reminds us that we need to consider it carefully when naming it), and have made great changes. However, it is called Non-block I/O by many people, that is, non-blocking I/O, because this is called, it can better reflect its characteristics. The NIO in the following text does not refer to the entire new I/O library, but is not blocking I/O.
NIO provides two different socket channel implementations: SocketChannel and ServerSocketChannel corresponding to Socket and ServerSocket in the traditional BIO model.
Both newly added channels support blocking and non-blocking modes.
The use of blocking mode is as simple as traditional support, but the performance and reliability are not good; the non-blocking mode is exactly the opposite.
For low-load, low-concurrency applications, synchronous blocking I/O can be used to improve development rate and better maintenance; for high-load, high-concurrency (network) applications, the non-blocking mode of NIO should be used to develop.
The basic knowledge will be introduced first below.
2.2. Buffer Buffer
A Buffer is an object that contains some data to be written or read out.
In the NIO library, all data is processed in a buffer. When reading data, it is read directly into the buffer; when writing data, it is also written into the buffer. At any time you access data in NIO, it is operated through a buffer.
A buffer is actually an array and provides information such as structured access to data and maintaining read and write locations.
The specific cache areas are: ByteBuffe, CharBuffer, ShortBuffer, IntBuffer, LongBuffer, FloatBuffer, DoubleBuffer. They implement the same interface: Buffer.
2.3. Channel
Our reading and writing of data must be passed through the Channel, which is like a water pipe, a channel. The difference between a channel and a stream is that the channel is bidirectional and can be used for read, write and simultaneous read and write operations.
The channels of the underlying operating system are generally full-duplex, so a full-duplex channel can better map the API of the underlying operating system than a stream.
Channels are mainly divided into two categories:
The ServerSocketChannel and SocketChannel that will be involved in the following code are both subclasses of SelectableChannel.
2.4. Multiplexer Selector
Selector is the basis of Java NIO programming.
Selector provides the ability to select ready tasks: Selector will constantly poll the Channel registered on it. If a read or write event occurs on a Channel, the Channel will be in the ready state and will be polled by the Selector. Then, the set of ready channels can be obtained through the SelectionKey to perform subsequent I/O operations.
A Selector can poll multiple channels at the same time, because the JDK uses epoll() instead of the traditional select implementation, there is no limit on the maximum connection handle 1024/2048. So, only one thread is required to be responsible for the polling of Selector, and it can access thousands of clients.
2.5. NIO server
The code seems much more complicated than traditional Socket programming.
Just paste the code and give the code description in the form of comments.
Server source code created by NIO:
package com.anxpp.io.calculator.nio; public class Server { private static int DEFAULT_PORT = 12345; private static ServerHandle serverHandle; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) serverHandle.stop(); serverHandle = new ServerHandle(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ start(); } } ServerHandle:
package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import com.anxpp.io.utils.Calculator; /** * NIO server* @author yangtao__anxpp.com * @version 1.0 */ public class ServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * Constructor* @param port Specify the port number to be listened to*/ public ServerHandle(int port) { try{ //Create selector = Selector.open(); //Open the listening channel serverChannel = ServerSocketChannel.open(); //If true, this channel will be placed in blocking mode; if false, this channel will be placed in non-blocking mode serverChannel.configureBlocking(false); //Enable non-blocking mode//Bind port backlog is set to 1024 serverChannel.socket().bind(new InetSocketAddress(port),1024); //Superce client connection request serverChannel.register(selector, SelectionKey.OP_ACCEPT); //Mark the server is enabled started = true; System.out.println("Server has been started, port number: " + port); }catch(IOException e){ e.printStackTrace(); System.exit(1); } } public void stop(){ started = false; } @Override public void run() { //Loop through selector while(started){ try{ // Whether there is a read and write event, the selector is waked up every 1s selector.select(1000); // Blocking, it will continue only when at least one registered event occurs. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); } catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } } } } } catch(Throwable t){ t.printStackTrace(); } } //After the selector is closed, the managed resources will be automatically released if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //Processing the request message for a new access if(key.isAcceptable()){ ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //Creating a SocketChannel instance through the accept of ServerSocketChannel //Completing this operation means completing the TCP three-way handshake, and the TCP physical link is officially established. SocketChannel sc = ssc.accept(); //Set to non-blocking sc.configureBlocking(false); //Register as read sc.register(selector, SelectionKey.OP_READ); } //Read message if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); //Create ByteBuffer and open a 1M buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Read request stream and return the number of bytes read int readBytes = sc.read(buffer); //Read bytes and code the bytes if(readBytes>0){ //Set the current limit of the buffer to position=0, for subsequent read operations of buffer.flip(); //Create a byte array based on the number of buffer readable bytes bytes = new byte[buffer.remaining()]; //Copy the buffer readable byte array into the newly created array buffer.get(bytes); String expression = new String(bytes,"UTF-8"); System.out.println("Server received a message: " + expression); //Processing data String result = null; try{ result = Calculator.cal(expression).toString(); }catch(Exception e){ result = "Computation error:" + e.getMessage(); } //Send the reply message doWrite(sc,result); } //No bytes read and ignore// else if(readBytes==0); //The link has been closed, freeing the resource else if(readBytes<0){ key.cancel(); sc.close(); } } } } } //Send the reply message asynchronously private void doWrite(SocketChannel channel,String response) throws IOException{ //Encoding the message as a byte array byte[] bytes = response.getBytes(); //Create ByteBuffer according to the array capacity ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //Copy the byte array to the buffer writeBuffer.put(bytes); //flip operation writeBuffer.flip(); //Send the byte array of buffer channel.write(writeBuffer); //*****The code for processing "write half-packet" is not included here} }As you can see, the main steps for creating a NIO server are as follows:
Because the response message is sent, SocketChannel is also asynchronous and non-blocking, so it cannot be guaranteed that the data that needs to be sent can be sent at one time, and there will be a problem of writing half a packet at this time. We need to register a write operation, constantly poll the Selector to send the unsent messages, and then use the hasRemain() method of the Buffer to determine whether the message is sent.
2.6. NIO client
It’s better to just upload the code. The process doesn’t require too much explanation, it’s a bit similar to the server code.
Client:
package com.anxpp.io.calculator.nio; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static ClientHandle clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) clientHandle.stop(); clientHandle = new ClientHandle(ip,port); new Thread(clientHandle,"Server").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } public static void main(String[] args){ start(); } } ClientHandle:
package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * NIO Client* @author yangtao__anxpp.com * @version 1.0 */ public class ClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public ClientHandle(String ip,int port) { this.host = ip; this.port = port; try{ //Create selector selector = Selector.open(); //Open the listening channel socketChannel = SocketChannel.open(); //If true, this channel will be placed in blocking mode; if false, this channel will be placed in non-blocking mode socketChannel.configureBlocking(false);//Open non-blocking mode started = true; }catch(IOException e){ e.printStackTrace(); System.exit(1); } } public void stop(){ started = false; } @Override public void run() { try{ doConnect(); }catch(IOException e){ e.printStackTrace(); System.exit(1); } //Loop through selector while(started){ try{ // Regardless of whether there is a read and write event, the selector is awakened every 1s selector.select(1000); // Blocking, and it will continue only when at least one registered event occurs. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } } } } catch(Exception e){ e.printStackTrace(); System.exit(1); } } //After the selector is closed, the managed resources will be automatically released if(selector != null) try{ selector.close(); } catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()); else System.exit(1); } //Read the message if(key.isReadable()){ //Create ByteBuffer and open a 1M buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Read the request code stream and return the number of bytes read int readBytes = sc.read(buffer); //Read bytes and code the bytes if(readBytes>0){ //Set the current limit of the buffer to position=0, for subsequent read operations of buffer.flip(); //Create byte array based on the number of readable bytes in the buffer byte[] bytes = new byte[buffer.remaining()]; //Copy the buffer readable byte array into the newly created array buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("Client received a message: " + result); } //No bytes read is ignored// else if(readBytes==0); //The link has been closed, freeing the resource else if(readBytes<0){ key.cancel(); sc.close(); } } } } } //Send messages asynchronously private void doWrite(SocketChannel channel,String request) throws IOException{ //Encoding the message as a byte array byte[] bytes = request.getBytes(); //Creating ByteBuffer based on the array capacity ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //Copying the byte array to the buffer writeBuffer.put(bytes); //Flip operation writeBuffer.flip(); //Send the byte array channel.write(writeBuffer); //*****The code for processing "write half-packet" is not included here} private void doConnect() throws IOException{ if(socketChannel.connect(new InetSocketAddress(host,port))); else socketChannel.register(selector, SelectionKey.OP_CONNECT); } public void sendMsg(String msg) throws Exception{ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel, msg); } } 2.7. Demonstration results
First run the server and run a client by the way:
package com.anxpp.io.calculator.nio; import java.util.Scanner; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test the main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); while(Client.sendMsg(new Scanner(System.in).nextLine())); } } We can also run the client separately, and the effects are the same.
Results of a test:
The server has been started, port number: 123451+2+3+4+5+6 The server received the message: 1+2+3+4+5+6 The client received the message: 211*2/3-4+5*6/7-8 The server received the message: 1*2/3-4+5*6/7-8 The client received the message: -7.0476190476190474
There is no problem running multiple clients.
3. AIO programming
NIO 2.0 introduces the concept of new asynchronous channels and provides implementations of asynchronous file channels and asynchronous socket channels.
Asynchronous socket channel is truly asynchronous non-blocking I/O, corresponding to event-driven I/O (AIO) in UNIX network programming. It does not require too many Selectors to poll the registered channels to achieve asynchronous read and write, thus simplifying the NIO programming model.
Just upload the code.
3.1. Server side code
Server:
package com.anxpp.io.calculator.aio.server; /** * AIO server* @author yangtao__anxpp.com * @version 1.0 */ public class Server { private static int DEFAULT_PORT = 12345; private static AsyncServerHandler serverHandle; public volatile static long clientCount = 0; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) return; serverHandle = new AsyncServerHandler(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ Server.start(); } } AsyncServerHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsyncServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncServerHandler implements Runnable { public CountDownLatch latch; public AsyncServerSocketChannel channel; public AsyncServerHandler(int port) { try { //Create server channel = AsynchronousServerSocketChannel.open(); //Bind port channel.bind(new InetSocketAddress(port)); System.out.println("Server has been started, port number: " + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //CountDownLatch initialization//Its function: allow the current field to block all the time before completing a set of operations being executed //Here, let the field block here to prevent the server from exiting after execution //You can also use while(true)+sleep //The generation environment does not need to worry about this problem, thinking that the server will not exit latch = new CountDownLatch(1); //Connection channel.accept(this,new AcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } AcceptHandler:
package com.anxpp.io.calculator.aio.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; //Connect as a handler public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> { @Override public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { //Continue to accept requests from other clients Server.clientCount++; System.out.println("Number of connected clients:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this); //Create a new Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } } ReadHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } } OK, this is done, but it is actually simple to say. Although there is a lot of code, the API is really much simpler to use than NIO. It mainly includes monitoring, reading, writing and other CompletionHandlers. There should have been a WriteHandler here, and indeed, we implemented it in ReadHandler as an anonymous inner class.
Let’s see the client code below.
3.2. Client side code
Client:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //向服务器发送消息public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("请输入请求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } } AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //创建异步的客户端通道clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //创建CountDownLatch等待latch = new CountDownLatch(1); //发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //连接服务器成功//意味着TCP三次握手完成@Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("客户端成功连接到服务器..."); } //连接服务器失败@Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("连接服务器失败..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //向服务器发送消息public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //异步写clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); } } WriteHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成全部数据的写入if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //读取数据ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客户端收到结果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3、测试
Test:
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * 测试方法* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //测试主方法@SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //运行服务器Server.start(); //避免客户端先于服务器启动前执行代码Thread.sleep(100); //运行客户端Client.start(); System.out.println("请输入请求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.