Preface:
Recently, when analyzing hadoop's RPC (Remote Procedure Call Protocol), a protocol that requests services from remote computer programs through the network without understanding the underlying network technology. You can refer to: http://baike.baidu.com/view/32726.htm) mechanism, it was found that the implementation of hadoop's RPC mechanism mainly uses two technologies: dynamic proxy (you can refer to the blog: http://weixiaolu.iteye.com/blog/1477774) and java NIO. In order to correctly analyze the RPC source code of hadoop, I think it is necessary to first study the principles and specific implementation of java NIO.
In this blog, I mainly analyze java NIO from two directions
Table of contents:
one. The difference between java NIO and blocking I/O
1. Blocking I/O communication model
2. Java NIO principle and communication model 2. Java NIO server and client code implementation
Specific analysis:
one. The difference between java NIO and blocking I/O
1. Blocking I/O communication model
If you have a certain understanding of blocking I/O now, we know that blocking I/O is blocked when calling the InputStream.read() method, it will wait until the data arrives (or timeouts) before returning; similarly, when calling the ServerSocket.accept() method, it will block until there is a client connection before returning. After each client connects, the server will start a thread to process the client's request. The communication model diagram of blocking I/O is as follows:
If you analyze it carefully, you will definitely find that there are some disadvantages of blocking I/O. Based on the blocking I/O communication model, I summarized its two disadvantages:
1. When there are many clients, a large number of processing threads will be created. And each thread takes up stack space and some CPU time
2. Blocking can lead to frequent context switching, and most context switching may be meaningless.
In this case, non-blocking I/O has its application prospects.
2. Java NIO principle and communication model
Java NIO was started in jdk1.4, and it can be said to be both "new I/O" or non-blocking I/O. Here is how java NIO works:
1. A dedicated thread handles all IO events and is responsible for distribution.
2. Event-driven mechanism: triggers when an event arrives, rather than monitoring events simultaneously.
3. Thread communication: threads communicate through wait, notify and other means. Ensure that every context switch makes sense. Reduce unnecessary thread switching.
After reading some information, I posted the working schematic diagram of java NIO that I understand:
(Note: The processing flow of each thread is probably reading data, decoding, computing processing, encoding, and sending responses.)
The Java NIO server only needs to start a special thread to handle all IO events. How is this communication model implemented? Haha, let's explore its mystery together. Java NIO uses a two-way channel for data transmission, rather than a one-way stream, and events of interest can be registered on the channel. There are four events in total:
| Event name | Corresponding value |
| The server receives client connection events | SelectionKey.OP_ACCEPT(16) |
| Client connection server event | SelectionKey.OP_CONNECT(8) |
| Read events | SelectionKey.OP_READ(1) |
| Write events | SelectionKey.OP_WRITE(4) |
The server and the client each maintain an object that manages a channel, which we call a selector, which can detect events on one or more channels. Let’s take the server as an example. If a read event is registered on the selector of the server, the client sends some data to the server at some point. When blocking I/O, it will call the read() method to block the data, and the NIO server will add a read event to the selector. The server's processing thread will poll to access the selector. If an event of interest is found to arrive when accessing the selector, it will process these events. If no event of interest arrives, the processing thread will block until the event of interest arrives. Below is a schematic diagram of the communication model of java NIO that I understand:
two. Java NIO server and client code implementation
In order to better understand java NIO, the following is a simple code implementation for the server and client.
Server:
package cn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO server* @author small path*/ public class NIOServer { //Channel manager private Selector selector; /** * Get a ServerSocket channel and do some initialization work on the channel* @param port bound port * @throws IOException */ public void initServer(int port) throws IOException { // Get a ServerSocket channel ServerSocketChannel serverChannel = ServerSocketChannel.open(); // Set the channel to non-blocking serverChannel.configureBlocking(false); // Bind the ServerSocket corresponding to this channel to the port port serverChannel.socket().bind(new InetSocketAddress(port)); // Get a channel manager this.selector = Selector.open(); //Bind the channel manager to the channel and register the SelectionKey.OP_ACCEPT event for the channel. After registering the event, //When the event arrives, selector.select() will return. If the event does not reach selector.select() will block. serverChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * Use polling to listen to whether there are events on the selector that need to be processed. If so, it will be processed* @throws IOException */ @SuppressWarnings("unchecked") public void listen() throws IOException { System.out.println("Server-side start successfully!"); // Polling to access selector while (true) { // When the registered event arrives, the method returns; otherwise, the method will keep blocking selector.select(); // Get the iterator of the selected item in the selector, and the selected item is the registered event Iterator item = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // Delete the selected key to prevent repeated processing of item.remove(); // Client requests the connection event if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key .channel(); // Obtain the channel to connect to the client SocketChannel channel = server.accept(); // Set to non-blocking channel.configureBlocking(false); //You can send information to the client here channel.write(ByteBuffer.wrap(new String("Send a message to the client").getBytes())); //After the connection with the client is successful, in order to receive the client's information, you need to set read permissions for the channel. channel.register(this.selector, SelectionKey.OP_READ); // A readable event was obtained} else if (key.isReadable()) { read(key); } } } } /** * Processing events that read messages sent by the client* @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ // The server can read messages: get the Socket channel where the event occurs. SocketChannel channel = (SocketChannel) key.channel(); // Create a read buffer ByteBuffer buffer = ByteBuffer.allocate(10); channel.read(buffer); byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("The server received the message: "+msg); ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); channel.write(outBuffer);// Send the message back to the client} /** * Start server test* @throws IOException */ public static void main(String[] args) throws IOException { NIOServer server = new NIOServer(); server.initServer(8000); server.listen(); } } Client:
package cn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO client* @author small path*/ public class NIOClient { //Channel manager private Selector selector; /** * Get a Socket channel and do some initialization of the channel * @param ip The ip of the server connected to * @param port The port number of the server connected to * @throws IOException */ public void initClient(String ip,int port) throws IOException { // Get a Socket channel SocketChannel channel = SocketChannel.open(); // Set the channel to non-blocking channel.configureBlocking(false); // Get a channel manager this.selector = Selector.open(); // The client connects to the server. In fact, the method execution does not implement the connection. You need to call //use channel.finishConnect(); in the listen() method to complete the connection channel.connect(new InetSocketAddress(ip,port)); //Bind the channel manager to the channel and register the SelectionKey.OP_CONNECT event for the channel. channel.register(selector, SelectionKey.OP_CONNECT); } /** * Use polling to listen to whether there are events on the selector that need to be processed. If so, it will be processed* @throws IOException */ @SuppressWarnings("unchecked") public void listen() throws IOException { // Polling to access the selector while (true) { selector.select(); // Get the iterator for the selected item in the selector Iterator item = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) item.next(); // Delete the selected key to prevent repeated processing of item.remove(); // The connection event occurs if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key .channel(); // If connecting is being connected, complete the connection if(channel.isConnectionPending()){ channel.finishConnect(); } // Set to non-blocking channel.configureBlocking(false); // You can send information to the server channel.write(ByteBuffer.wrap(new String("Send a message to the server").getBytes())); //After the connection with the server is successful, in order to receive the server information, the channel needs to be set to read permissions. channel.register(this.selector, SelectionKey.OP_READ); // A readable event was obtained} else if (key.isReadable()) { read(key); } } } } /** * Processing events that read messages sent by the server* @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ //Same as the server's read method} /** * Start client test* @throws IOException */ public static void main(String[] args) throws IOException { NIOClient client = new NIOClient(); client.initClient("localhost",8000); client.listen(); } }summary:
Finally, the dynamic proxy and java NIO have been analyzed. Haha, the following is to analyze the source code of the RPC mechanism of hadoop. The blog address is: http://weixiaolu.iteye.com/blog/1504898. However, if you have any objections to your understanding of java NIO, you are welcome to discuss it together.
If you need to reprint, please indicate the source: http://weixiaolu.iteye.com/blog/1479656