Prefacio:
Recientemente, al analizar el RPC de Hadoop (Protocolo de llamadas de procedimiento remoto), un protocolo que solicita servicios de programas de computadora remotos a través de la red sin comprender la tecnología de red subyacente. Puede consultar: http://baike.baidu.com/view/32726.htm), se descubrió que la implementación del mecanismo RPC de Hadoop utiliza principalmente dos tecnologías: proxy dinámico (puede referirse al blog: http://weixiaolu.iteye.com/blog/1477774) y java nio. Para analizar correctamente el código fuente RPC de Hadoop, creo que es necesario estudiar primero los principios y la implementación específica de Java Nio.
En este blog, analizo principalmente Java Nio desde dos direcciones
Tabla de contenido:
uno. La diferencia entre Java Nio y el bloqueo de E/S
1. Bloqueo del modelo de comunicación de E/S
2. Java NIO Principio y Modelo de comunicación 2. Implementación del Código del Servidor Java Nio y del cliente
Análisis específico:
uno. La diferencia entre Java Nio y el bloqueo de E/S
1. Bloqueo del modelo de comunicación de E/S
Si ahora tiene una cierta comprensión del bloqueo de E/S, sabemos que el bloqueo de E/S se bloquea al llamar al método inputStream.read (), esperará hasta que lleguen los datos (o tiempos de espera) antes de regresar; Del mismo modo, al llamar al método Serversocket.accept (), bloqueará hasta que haya una conexión de cliente antes de regresar. Después de que cada cliente se conecta, el servidor iniciará un hilo para procesar la solicitud del cliente. El diagrama del modelo de comunicación de la E/S de bloqueo es el siguiente:
Si lo analiza cuidadosamente, definitivamente encontrará que hay algunas desventajas de bloquear la E/S. Basado en el modelo de comunicación de E/S de bloqueo, resumí sus dos desventajas:
1. Cuando hay muchos clientes, se creará una gran cantidad de hilos de procesamiento. Y cada hilo ocupa espacio de pila y algo de tiempo de CPU
2. El bloqueo puede conducir a una conmutación de contexto frecuente, y la mayoría de los cambios de contexto pueden no tener sentido.
En este caso, la E/S sin bloqueo tiene sus perspectivas de aplicación.
2. Java Nio Principio y modelo de comunicación
Java Nio se inició en JDK1.4, y se puede decir que son "nuevas E/S" o E/S sin bloqueo. Así es como funciona Java Nio:
1. Un hilo dedicado maneja todos los eventos IO y es responsable de la distribución.
2. Mecanismo impulsado por el evento: se desencadena cuando llega un evento, en lugar de monitorear los eventos simultáneamente.
3. Comunicación de hilos: los hilos se comunican a través de Wait, Notify y otros medios. Asegúrese de que cada cambio de contexto tenga sentido. Reduzca el cambio de rosca innecesaria.
Después de leer información, publiqué el diagrama esquemático de trabajo de Java Nio que entiendo:
(Nota: El flujo de procesamiento de cada hilo probablemente está leyendo datos, decodificación, procesamiento de computación, codificación y envío de respuestas).
El servidor Java NIO solo necesita iniciar un hilo especial para manejar todos los eventos IO. ¿Cómo se implementa este modelo de comunicación? Jaja, exploremos su misterio juntos. Java NIO utiliza un canal de dos vías para la transmisión de datos, en lugar de una transmisión unidireccional, y los eventos de interés se pueden registrar en el canal. Hay cuatro eventos en total:
| Nombre del evento | Valor correspondiente |
| El servidor recibe eventos de conexión del cliente | SelectionKey.op_accept (16) |
| Evento del servidor de conexión del cliente | SelectionKey.op_connect (8) |
| Leer eventos | SelectionKey.op_read (1) |
| Escribir eventos | SelectionKey.op_write (4) |
El servidor y el cliente mantienen cada uno un objeto que administra un canal, que llamamos un selector, que puede detectar eventos en uno o más canales. Tomemos el servidor como ejemplo. Si se registra un evento de lectura en el selector del servidor, el cliente envía algunos datos al servidor en algún momento. Al bloquear la E/S, llamará al método Read () para bloquear los datos, y el servidor NIO agregará un evento de lectura al selector. El hilo de procesamiento del servidor encuestará para acceder al selector. Si se encuentra que llega un evento de interés al acceder al selector, procesará estos eventos. Si no llega ningún evento de interés, el hilo de procesamiento se bloqueará hasta que llegue el evento de interés. A continuación se muestra un diagrama esquemático del modelo de comunicación de Java Nio que entiendo:
dos. Implementación del servidor Java NIO y del código del cliente
Para comprender mejor a Java NIO, la siguiente es una implementación de código simple para el servidor y el cliente.
Servidor:
paquete cn.nio; import java.io.ioException; import java.net.InetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionKey; import java.nio.channels.selector; import java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; import java.util.iterator; /*** NIO Server* @Author Small Path*/public class Nioserver {// Channel Manager Selector privado selector; / ** * Obtenga un canal Serversocket y realice un trabajo de inicialización en el canal * @param puerto de puerto unido * @throws ioexception */ public void initserver (int puerto) lanza IOException {// Obtener un canal de ServerSocket Serversocketchannel ServerChannel = ServersChetchannel.open (); // Establezca el canal en servidor no bloqueado Servidor.ConfigureBlowing (FALSE); // Vincula el SerververSocket correspondiente a este canal al puerto del puerto ServerChannel.socket (). Bind (nuevo inetSocketAddress (puerto)); // Obtener un canal administrador este.selector = selector.open (); // Vincula el administrador del canal al canal y registra el evento SelectionKey.op_accept para el canal. Después de registrar el evento, // Cuando llegue el evento, Selector.Select () regresará. Si el evento no alcanza Selector.select () se bloqueará. serverChannel.register (selector, selectionKey.op_accept); } /*** Use la encuesta para escuchar si hay eventos en el selector que deben procesarse. Si es así, se procesará * @throws ioException */ @supesswarnings ("sin verificar") public void escuchar () lanza ioexception {system.out.println ("Servidor comienza con éxito!"); // Policilías para acceder al selector mientras (verdadero) {// Cuando llega el evento registrado, el método regresa; De lo contrario, el método seguirá bloqueando Selector.Select (); // Obtenga el iterador del elemento seleccionado en el selector, y el elemento seleccionado es el iterador de evento registrado = this.selector.selectedKeys (). Iterator (); while (ite.hasnext ()) {selectionKey key = (selectionKey) ite.next (); // Eliminar la clave seleccionada para evitar el procesamiento repetido de item.remove (); // El cliente solicita el evento de conexión if (key.isaceptable ()) {Serversocketchannel Server = (ServerSocketchannel) Key .channel (); // Obtener el canal para conectarse al cliente Socketchannel Channel = Server.accept (); // Establecer en canal sin bloqueo.configureBlocking (falso); // Puede enviar información al cliente aquí Channel.write (bytebuffer.wrap (nueva cadena ("Enviar un mensaje al cliente"). GetBytes ())); // Después de que la conexión con el cliente sea exitosa, para recibir la información del cliente, debe establecer permisos de lectura para el canal. Channel.register (this.selector, selectionKey.op_read); // se obtuvo un evento legible} else if (key.iseadable ()) {read (clave); }}}} / ** * Eventos de procesamiento que leen mensajes enviados por el cliente * @param clave * @throws ioexception * / public void read (Key selectionKey) lanza IoException {// El servidor puede leer mensajes: obtenga el canal de socket donde ocurre el evento. Socketchannel Channel = (Socketchannel) key.channel (); // crear un búfer de lectura bytebuffer buffer = bytebuffer.allocate (10); Channel.read (búfer); byte [] data = buffer.Array (); Cadena msg = new String (data) .trim (); System.out.println ("El servidor recibió el mensaje:"+msg); Bytebuffer outbuffer = bytebuffer.wrap (msg.getbytes ()); Channel.Write (outbuffer); // Envía el mensaje de nuevo al cliente}/ *** Prueba de Servidor de inicio* @throws ioexception*/ public static void main (string [] args) lanza ioexception {nioserver server = new nioserver (); servidor.initserver (8000); servidor.listen (); }} Cliente:
paquete cn.nio; import java.io.ioException; import java.net.InetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionKey; import java.nio.channels.selector; import java.nio.channels.socketchannel; import java.util.iterator; /*** NIO Client* @Author Small Path*/public class NioClient {// Channel Manager Selector Private Selector Selector; / ** * Obtenga un canal de socket y haga alguna inicialización del canal * @param IP La IP del servidor conectado a * @param Port El número de puerto del servidor conectado a * @throws ioexception */ public void initClient (string ip, int puerto) lanza IoException {// Obtener un canal Socketchannel de Socketchannel = Socketchannel.open (); // Establezca el canal en canal no bloqueado.configureBlocking (falso); // Obtener un canal administrador este.selector = selector.open (); // El cliente se conecta al servidor. De hecho, la ejecución del método no implementa la conexión. Debe llamar // usar channel.finishconnect (); en el método Listen () para completar el canal de conexión.connect (nuevo inetSocketAddress (IP, puerto)); // Vincula el administrador del canal al canal y registra el evento selectionKey.op_connect para el canal. Channel.register (selector, selectionKey.op_connect); } /*** Use la encuesta para escuchar si hay eventos en el selector que deben procesarse. Si es así, se procesará * @throws ioException */ @supesswarnings ("sin verificar") public void escuchar () lanza ioexception {// encuestando para acceder al selector mientras (true) {selector.select (); // Obtenga el iterador para el elemento seleccionado en el iterador del selector iterator = this.selector.selectedKeys (). Iterator (); while (ite.hasnext ()) {selectionKey key = (selectionKey) item.next (); // Eliminar la clave seleccionada para evitar el procesamiento repetido de item.remove (); // El evento de conexión ocurre si (Key.Isconnectable ()) {Socketchannel Channel = (Socketchannel) Key .channel (); // Si se está conectando la conexión, complete la conexión if (channel.isConnectionPending ()) {channel.finishconnect (); } // Establecer en canal sin bloqueo.configureBlowing (falso); // Puede enviar información al servidor channel.write (bytebuffer.wrap (nueva cadena ("Enviar un mensaje al servidor"). GetBytes ())); // Después de que la conexión con el servidor es exitosa, para recibir la información del servidor, el canal debe configurarse para leer permisos. Channel.register (this.selector, selectionKey.op_read); // se obtuvo un evento legible} else if (key.iseadable ()) {read (clave); }}}}} /*** Eventos de procesamiento que leen mensajes enviados por el servidor* @param clave* @throws ioexception* /public void read (selectionkey key) lanza ioexception {// igual que el método de lectura del servidor} /*** iniciar el cliente* @throws ioexception* /public void void main (string [] args) lanza IOException {niiOclient NiClient (); client.initClient ("localhost", 8000); Client.listen (); }}resumen:
Finalmente, se completa el análisis dinámico de proxy y java nio. Jaja, lo siguiente es analizar el código fuente del mecanismo RPC de Hadoop. La dirección del blog es: http://weixiaolu.iteye.com/blog/1504898. Sin embargo, si tiene alguna objeción a su comprensión de Java Nio, puede discutirlo juntos.
Si necesita reimprimir, indique la fuente: http://weixiaolu.iteye.com/blog/1479656