Este artículo se presentará de un biografía tradicional a lo profundo de Nio a AIO, y se acompañará de una explicación completa del código.
Se utilizará un ejemplo en el siguiente código: el cliente envía una cadena de la ecuación al servidor y el servidor devuelve el resultado al cliente después del cálculo.
Todas las instrucciones para el código se usan directamente como comentarios y están integradas en el código, lo que puede ser más fácil de entender al leer el código. Se utilizará una clase de herramientas para calcular el resultado en el código, consulte la sección Código del artículo.
Artículos recomendados para conocimientos básicos relacionados:
Introducción al modelo de E/S de la red Linux (imágenes y texto)
Concurrencia Java (múltiples subprocesos)
1. Programación biológica
1.1. Programación biológica tradicional
El modelo básico de programación de red es el modelo C/S, es decir, la comunicación entre dos procesos.
El servidor proporciona puertos IP y escucha. El cliente inicia una solicitud de conexión a través de la dirección de operación de conexión que el servidor desea escuchar. A través de tres apretones de manos, si la conexión se establece con éxito, ambas partes pueden comunicarse a través de los enchufes.
En el desarrollo del modelo de bloqueo de sincronización tradicional, ServerSocket es responsable de unir direcciones IP y comenzar los puertos de escucha; Socket es responsable de iniciar operaciones de conexión. Después de que la conexión es exitosa, ambas partes realizan comunicación de bloqueo sincrónico a través de las secuencias de entrada y salida.
Una breve descripción del modelo de comunicación del servidor Bio: el servidor que utiliza el modelo de comunicación bio suele ser un hilo de aceptación independiente responsable de escuchar la conexión del cliente. Después de recibir la solicitud de conexión del cliente, crea un nuevo hilo para cada cliente para el procesamiento de enlaces y no lo procesa, y luego devuelve la respuesta al cliente a través del flujo de salida, y el hilo se destruye. Es decir, un modelo típico de una solicitud de una sola noche.
Diagrama de modelo de comunicación biológica tradicional:
El mayor problema con este modelo es que carece de capacidades de escala elástica. Cuando aumenta el número de accesos concurrentes en el cliente, el número de subprocesos en el servidor es proporcional al número de accesos concurrentes en el cliente. Los hilos en Java también son recursos del sistema relativamente valiosos. Después de que el número de hilos se expande rápidamente, el rendimiento del sistema disminuirá bruscamente. A medida que el número de accesos continúa aumentando, el sistema eventualmente morirá.
Código fuente del servidor creado por E/S de bloqueo sincrónico:
paquete com.anxpp.io.calculator.bio; import java.io.ioException; import java.net.serversocket; import java.net.socket; /** * Código fuente del servidor bio * @author yangtao__anxpp.com * @Version 1.0 */public Final Class ServerNormal {// Número de puerto predeterminado privado static int Default_port = 12345; // servidor Singleton Serversocket Serversocket estático privado; // Establezca el puerto de audición de acuerdo con los parámetros entrantes. Si no hay parámetros, llame al siguiente método y use el valor predeterminado Public static void start () lanza ioexception {// use el valor predeterminado Start (default_port); } // No se accederá a este método de una gran cantidad de formas concurrentes, y no es necesario considerar la eficiencia, solo sincronizar el método directamente público sincronizado static void start (int port) lanza ioexception {if (server! = Null) return; Pruebe {// Cree Serversocket a través del constructor // Si el puerto es legal e inactivo, el servidor escuchará con éxito. Servidor = nuevo Serversocket (puerto); System.out.println ("El servidor se ha iniciado, número de puerto:" + puerto); // Escuche las conexiones del cliente a través del bucle inalámbrico // Si no hay acceso al cliente, se bloqueará en la operación de aceptación. while (true) {socket socket = server.accept (); // Cuando hay un nuevo acceso al cliente, el siguiente código se ejecutará // luego cree un nuevo hilo para manejar este enlace de socket nuevo subproceso (nuevo serverHandler (Socket)). Start (); }} Finalmente {// Algunos trabajos de limpieza necesarios if (server! = null) {system.out.println ("El servidor está cerrado"); servidor.close (); servidor = nulo; }}}} Código fuente de los hilos de procesamiento de mensajes del cliente:
paquete com.anxpp.io.calculator.bio; import java.io.bufferedReader; import java.io.ioException; import java.io.inputstreamreader; import java.io.printwriter; import java.net.socket; import com.anxpp.io.utils.calculator; / *** Hilo del cliente* @author yangtao__anxpp.com* Socket Link para un cliente*/ public class ServerHandler implementa Runnable {private Socket Socket; Public ServerHandler (Socket Socket) {this.socket = Socket; } @Override public void run () {bufferedReader in = null; PrintWriter out = null; intente {in = new BufferedReader (new InputStreamReader (Socket.getInputStream ())); out = new PrintWriter (Socket.getOutputStream (), true); Expresión de cadena; Resultado de la cadena; mientras (true) {// lee una línea a través de BufferedReader // Si ha leído la cola de la secuencia de entrada, return null y salga el bucle // si obtiene un valor no nulo, intente calcular el resultado y return if ((expresion = in.readline () == nulo); System.out.println ("El servidor recibió un mensaje:" + expresión); prueba {resultado = calculator.cal (expresión) .ToString (); } capt (excepción e) {result = "calculator.cal (expresión) .ToString ();} catch (excepción e) {e.printstackTrace ();} finalmente {// alguna limpieza necesaria funciona if (in! = null) {try {in.close ();} Catch (ioException e) {printstacktrace ();} in = n. NULL) {OUT.CLOSE (); Código fuente del cliente creado por E/S de bloqueo sincrónico:
paquete com.anxpp.io.calculator.bio; import java.io.bufferedReader; import java.io.ioException; import java.io.inputstreamreader; import java.io.printwriter; import java.net.socket; /** * Cliente creado mediante el bloqueo de I/O * @author yangtao__anxpp.com * @version 1.0 */public class Client {// Número de puerto predeterminado Private static int Default_server_port = 12345; cadena estática privada default_server_ip = "127.0.0.1"; public static void send (string expression) {send (default_server_port, expresión); } public static void send (int puerto, expresión de cadena) {system.out.println ("La expresión aritmética es:" + expresión); Socket de socket = nulo; BufferedReader en = NULL; PrintWriter out = null; intente {socket = new Socket (default_server_ip, puerto); in = new BufferedReader (new InputStreamReader (Socket.GetInputStream ())); out = new PrintWriter (Socket.getOutputStream (), true); out.println (expresión); System.out.println ("___ El resultado es:" + in.readline ()); } catch (Exception e) {E.PrintStackTrace (); } Finalmente {// son el trabajo de limpieza necesario si (en! = null) {try {in.close (); } catch (ioException e) {E.PrintStackTrace (); } in = null; } if (out! = null) {out.close (); out = nulo; } if (socket! = null) {try {socket.close (); } catch (ioException e) {E.PrintStackTrace (); } socket = null; }}}} Pruebe el código, para facilitar la visualización de los resultados de salida en la consola, coloque en el mismo programa (JVM) para que se ejecute:
paquete com.anxpp.io.calculator.bio; import java.io.ioException; import java.util.random; /** * Método de prueba * @author yangtao__anxpp.com * @version 1.0 */public class test {// prueba el método principal public static void main (string [] args) lanza interruptedException {// ejecutar el nuevo hilo del servidor (new runnable () {@override public run () E.PrintStackTrace (); // Evite el cliente ejecutando el código antes de que comience el servidor; // Ejecute los operadores de Char de cliente [] = {'+', '-', '*', '/'}; Aleatorio aleatorio = new Random (System.CurrentTimemillis ()); new Thread (new runnable () {@SupplesSWarnings ("static-access") @Override public void run () {while (true) {// aleator genera expresión aritmética string expresión = random.nextint (10)+"+operadores [random.nextint (4)]+(random.nextint (10) +1); Thread.CurrentThread (). Sleep (Random.NextInt (1000)); }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} Ial }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} Ial } } } } } } } } { Hilo. CurrentThread (). Sleep (Random.NextInt (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Hilo. CurrentThread (). Sleep (Random.NextInt (1000)); }}}}}}}} Los resultados de una de las carreras:
El servidor se ha iniciado, número de puerto: 12345 La expresión aritmética es: 4-2 Servidor recibió el mensaje: 4-2 ___ El resultado es: 2 La expresión aritmética es: 5-10 servidor recibió el mensaje: 5-10__ El resultado: -5 expresión aritmética es: 0-9 servidor recibió el mensaje: 0-9__ El resultado es: -9 La expresión aritmética es: 0+6 Server recibido el mensaje: 0+6__ Resultado es el resultado 6__ es: 6 ARITHMET Recibió el mensaje: 1/6__ El resultado es: 0.1666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666
En el código anterior, es fácil ver que el principal problema de BIO es que cada vez que un nuevo cliente solicita acceso, el servidor debe crear un nuevo hilo para manejar este enlace, que no se puede aplicar en escenarios donde se necesitan un alto rendimiento y una alta concurrencia (una gran cantidad de nuevos hilos afectarán seriamente el rendimiento del servidor e incluso el ataque).
1.2. Programación de E/S pseudoasincrónica
Para mejorar este modelo de una conexión y un hilo, podemos usar un grupo de subprocesos para administrar estos hilos (para obtener más información, consulte el artículo proporcionado anteriormente), implementando un modelo para uno o más hilos para procesar n clientes (pero la capa subyacente todavía usa el bloqueo sincrónico I/O), que a menudo se llama "modelo de I/O pseudo-asíncrono I/O".
Diagrama de modelo de E/S pseudoasíncrono:
La implementación es muy simple. Solo necesitamos entregar el nuevo subproceso a la administración del grupo de subprocesos, y simplemente cambiar el código del servidor en este momento:
paquete com.anxpp.io.calculator.bio; import java.io.ioException; import java.net.serversocket; import java.net.socket; import java.util.concurrent.executorservice; import java.util.concurrent.executors; /** * Código fuente del servidor bio__pseudo-asíncrono I/O * @author yangtao__anxpp.com * @version 1.0 */public final ServidorBetter {// número de puerto predeterminado privado int default_port = 12345; // servidor Singleton Serversocket Serversocket estático privado; // singleton static staticsecorservice ejecutorservice = ejecutors.newfixedthreadpool (60); // Establezca el puerto de audición de acuerdo con los parámetros entrantes. Si no hay parámetro, llame al siguiente método y use el valor predeterminado public static void start () lanza ioexception {// use el valor predeterminado Start (default_port); } // No se accederá a este método en una gran cantidad de simultáneamente, y no es necesario considerar la eficiencia, solo sincronizar el método directamente público sincronizado static void start (int port) lanza ioexception {if (server! = Null) return; Pruebe {// Cree Serversocket a través del constructor // Si el puerto es legal e inactivo, el servidor escuchará con éxito. Servidor = nuevo Serversocket (puerto); System.out.println ("El servidor se ha iniciado, número de puerto:" + puerto); // Superce la conexión del cliente a través del bucle inalámbrico // Si no hay acceso al cliente, se bloqueará en la operación de aceptación. while (true) {socket socket = server.accept (); // Cuando hay un nuevo acceso al cliente, el siguiente código se ejecutará // luego cree un nuevo subproceso para procesar el enlace Socket EjecutorService.execute (nuevo ServerHandler (Socket)); }} Finalmente {// Algunos trabajos de limpieza necesarios if (server! = null) {system.out.println ("El servidor está cerrado"); servidor.close (); servidor = nulo; }}}} Los resultados de la ejecución de la prueba son los mismos.
Sabemos que si usamos el grupo de hilos CachedThreadPool (sin limitar el número de hilos, si no está claro, consulte el artículo proporcionado al comienzo del artículo), de hecho, además de ayudarnos automáticamente a administrar hilos (reutilizar), también se ve como un modelo de conteo de hilos 1: 1. Utilizando FaredThreadPool, controlamos efectivamente el número máximo de subprocesos, garantizamos el control de los recursos limitados del sistema e implementamos el modelo de E/S pseudoasincrónico N: M.
Sin embargo, debido a que el número de subprocesos es limitado, si se producen una gran cantidad de solicitudes concurrentes, los subprocesos que exceden el número máximo solo pueden esperar hasta que haya subprocesos gratuitos en el grupo de subprocesos que se pueden reutilizar. Cuando se lee la secuencia de entrada del socket, se bloqueará hasta que ocurra:
Por lo tanto, cuando la lectura de los datos es lento (como una gran cantidad de datos, transmisión de red lenta, etc.) y grandes cantidades de concurrencia, otros mensajes de acceso solo se pueden esperar todo el tiempo, que es la mayor desventaja.
El NIO que se introducirá más adelante puede resolver este problema.
2. Programación de NIO
La nueva biblioteca Java I/O se introduce en el paquete Java.nio.* En JDK 1.4, con el propósito de aumentar la velocidad. De hecho, el paquete de E/S "antiguo" se ha reimplemento usando NIO, y podemos beneficiarnos de él incluso si no usamos explícitamente la programación NIO. Las mejoras de velocidad pueden ocurrir tanto en la E/S del archivo como en la E/S de red, pero este artículo solo discute el último.
2.1. Introducción
Generalmente pensamos en NIO como una nueva E/S (también el nombre oficial), porque es nuevo para la antigua biblioteca de E/S (en realidad se ha introducido en JDK 1.4, pero este sustantivo continuará utilizándose durante mucho tiempo, incluso si son "antiguos" ahora, por lo que también nos recuerda que debemos considerarlo cuidadosamente cuando lo hagamos), y lo han hecho grandes cambios. Sin embargo, muchas personas se llama E/S sin bloqueo, es decir, E/S sin bloqueo, porque esto se llama, puede reflejar mejor sus características. El NIO en el siguiente texto no se refiere a toda la nueva biblioteca de E/S, pero no está bloqueando la E/S.
NIO proporciona dos implementaciones de canales de socket diferentes: Socketchannel y ServerSocketchannel correspondiente a Socket y ServerSocket en el modelo BIO tradicional.
Ambos canales recién agregados admiten modos de bloqueo y no bloqueo.
El uso del modo de bloqueo es tan simple como el soporte tradicional, pero el rendimiento y la confiabilidad no son buenos; El modo sin bloqueo es exactamente lo contrario.
Para aplicaciones de baja carga y bajas concurrencias, se pueden utilizar E/S de bloqueo sincrónico para mejorar la tasa de desarrollo y un mejor mantenimiento; Para aplicaciones de alta carga y alta frecuencia (red), el modo sin bloqueo de NIO debe usarse para desarrollarse.
El conocimiento básico se introducirá primero a continuación.
2.2. Amortiguador
Un búfer es un objeto que contiene algunos datos a escribir o leer.
En la Biblioteca NIO, todos los datos se procesan en un búfer. Al leer datos, se lee directamente en el búfer; Al escribir datos, también se escribe en el búfer. En cualquier momento que acceda a datos en NIO, se opera a través de un búfer.
Un búfer es en realidad una matriz y proporciona información como acceso estructurado a datos y mantenimiento de ubicaciones de lectura y escritura.
Las áreas específicas de caché son: Bytebuffe, Charbuffer, Shortbuffer, Intbuffer, Longbuffer, Floatbuffer, DoubleBuffer. Implementan la misma interfaz: búfer.
2.3. Canal
Nuestra lectura y escritura de datos debe pasar a través del canal, que es como una tubería de agua, un canal. La diferencia entre un canal y una transmisión es que el canal es bidireccional y puede usarse para operaciones de lectura y escritura de lectura, escritura y simultánea.
Los canales del sistema operativo subyacente generalmente son full-dúplex, por lo que un canal de dúplex puede asignar mejor la API del sistema operativo subyacente que una transmisión.
Los canales se dividen principalmente en dos categorías:
El ServersocketchEnchannel y Socketchannel que estarán involucrados en el siguiente código son subclases de selectableChannel.
2.4. Selector de multiplexor
El selector es la base de la programación Java Nio.
El selector proporciona la capacidad de seleccionar tareas listas: Selector encuestará constantemente el canal registrado en él. Si se produce un evento de lectura o escritura en un canal, el canal estará en el estado listo y será encuestado por el selector. Luego, el conjunto de canales listos se puede obtener a través de la tecla de selección para realizar operaciones de E/S posteriores.
Un selector puede encuestar múltiples canales al mismo tiempo, debido a que el JDK usa epoll () en lugar de la implementación seleccionada tradicional, no hay límite en el mango de conexión máximo 1024/2048. Por lo tanto, solo se requiere un hilo para ser responsable de la encuesta de selección, y puede acceder a miles de clientes.
2.5. Servidor NIO
El código parece mucho más complicado que la programación de socket tradicional.
Simplemente pegue el código y dé la descripción del código en forma de comentarios.
Código fuente del servidor creado por NIO:
paquete com.anxpp.io.calculator.nio; Public Class Server {private static int default_port = 12345; Servidor de servidor static privado ServidorHandle; public static void start () {start (default_port); } public static sincronized void start (int puerto) {if (serverHandle! = null) serverHandle.stop (); ServerHandle = nuevo ServerHandle (puerto); nuevo hilo (servidorhandle, "servidor"). start (); } public static void main (string [] args) {start (); }} ServerHandle:
paquete com.anxpp.io.calculator.nio; import java.io.ioException; import java.net.InetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionKey; import java.nio.channels.selector; import java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; import java.util.iterator; import java.util.set; import com.anxpp.io.utils.calculator; / ** * NIO Server * @author yangtao__anxpp.com * @version 1.0 */ public class ServerHandle implements runnable {private selector selector; SERVERSOCHECHECHANTE PRIVADO SERVERCHANNEL; Comenzó el booleano volátil privado; /*** Constructor* @param Port especifica el número de puerto que se escuchará*/public ServerHandle (int Port) {try {// create selector = selector.open (); // Abrir el canal de escucha ServerChannel = Serversocketchannel.open (); // Si es cierto, este canal se colocará en modo de bloqueo; Si False, este canal se colocará en modo sin bloqueo ServerChannel.ConfigureBlocking (falso); // Habilitar el modo sin bloqueo // Backleg de puerto de enlace se establece en 1024 serverChannel.socket (). Bind (nuevo InetSocketAddress (puerto), 1024); // Solicitud de conexión de cliente Superce ServerChannel.register (selector, selectionKey.op_accept); // Marque El servidor está habilitado iniciado = True; System.out.println ("Se ha iniciado el servidor, número de puerto:" + puerto); } catch (ioException e) {E.PrintStackTrace (); System.exit (1); }} public void stop () {inicio = false; } @Override public void run () {// bucle a través de selector mientras (iniciado) {try {// si hay un evento de lectura y escritura, el selector se despierta cada 1s selector.select (1000); // Bloqueo, continuará solo cuando ocurra al menos un evento registrado. // selector.select (); Establecer <SelectionKey> keys = selector.selectedKeys (); Iterador <selectionKey> it = keys.iterator (); SelectionKey Key = NULL; while (it.hasnext ()) {key = it.next (); it.remove (); intente {HandleInput (clave); } catch (excepción e) {if (key! = null) {key.cancel (); if (key.channel ()! = NULL) {key.channel (). Close (); }}}}}}}}} catch (showable t) {t.printstackTrace (); }} // Después de que se cierre el selector, los recursos administrados se lanzarán automáticamente si (Selector! = NULL) intente {selector.close (); } catch (Exception e) {E.PrintStackTrace (); }} private void handleInput (SelectionKey Key) lanza IOException {if (key.ISValid ()) {// Processing el mensaje de solicitud para un nuevo acceso if (key.isaceptable ()) {Serversocketchannel ssc = (ServersockEnchannel) key.channel (); // Crear una instancia de Socketchannel a través de Serversocketchetchannel Aceptar // Completa esta operación significa completar el TCP Tres Handshake, y el enlace físico TCP se establece oficialmente. Socketchannel sc = ssc.accept (); // Establecer en SC.ConfigureBlocking (falso) sin bloqueo (falso); // Registrarse como Read Sc.Register (selector, selectionKey.op_read); } // Leer mensaje if (key.iseadable ()) {Socketchannel sc = (Socketchannel) key.channel (); // Crear bytebuffer y abrir un búfer de 1M bytebuffer buffer = bytebuffer.allocate (1024); // leer la transmisión de solicitud y devolver el número de bytes read int readbytes = sc.read (buffer); // lea bytes y codifique los bytes if (readbytes> 0) {// Establezca el límite actual del búfer en posición = 0, para las operaciones de lectura posteriores de buffer.flip (); // Crear una matriz de bytes basada en el número de bytes legibles bytes bytes = new byte [buffer.remaining ()]; // Copiar la matriz de bytes legible del búfer en la matriz recién creada Buffer.get (bytes); String Expression = new String (bytes, "UTF-8"); System.out.println ("El servidor recibió un mensaje:" + expresión); // Processing Data String Result = null; prueba {resultado = calculator.cal (expresión) .ToString (); } capt (excepción e) {resultado = "error de cálculo:" + e.getMessage (); } // Enviar el mensaje de respuesta DowRite (SC, resultado); } // No hay bytes leer e ignorar // else if (readBytes == 0); // El enlace se ha cerrado, liberando el recurso más if (readBytes <0) {key.cancel (); sc.close (); }}}}} // Enviar el mensaje de respuesta DowRite vacío privado asíncrono (canal Socketchannel, respuesta de cadena) lanza ioexception {// codificando el mensaje como un byte matriz byte [] bytes = respuesta.getBytes (); // Crear bytebuffer de acuerdo con la capacidad de matriz bytebuffer writeBuffer = bytebuffer.allocate (bytes.length); // Copiar la matriz de bytes al búfer writeBuffer.put (bytes); // Flip Operation WriteBuffer.flip (); // Enviar la matriz de bytes de buffer canal.write (writeBuffer); // ***** El código para procesar "escribir medio paquete" no está incluido aquí}}Como puede ver, los principales pasos para crear un servidor NIO son los siguientes:
Debido a que se envía el mensaje de respuesta, Socketchannel también es asincrónico y no bloqueado, por lo que no se puede garantizar que los datos que deben enviarse se pueden enviar a la vez, y habrá un problema de escribir medio paquete en este momento. Necesitamos registrar una operación de escritura, sondear constantemente el selector para enviar los mensajes no entendidos y luego usar el método hasremain () del búfer para determinar si se envía el mensaje.
2.6. Cliente de NIO
Es mejor cargar el código. El proceso no requiere demasiada explicación, es un poco similar al código del servidor.
Cliente:
paquete com.anxpp.io.calculator.nio; Cliente de clase pública {cadena estática privada default_host = "127.0.0.1"; privado static int default_port = 12345; Private Static ClientHandle ClientHandle; public static void start () {start (default_host, default_port); } public static static sincronized void start (string ip, int puerto) {if (clientHandle! = null) clientHandle.stop (); ClientHandle = new ClientHandle (IP, Port); nuevo hilo (ClientHandle, "servidor"). start (); } // Enviar un mensaje al servidor público static boolean sendmsg (string msg) lanza la excepción {if (msg.equals ("q")) return false; ClientHandle.SendMsg (MSG); devolver verdadero; } public static void main (string [] args) {start (); }} ClientHandle:
paquete com.anxpp.io.calculator.nio; import java.io.ioException; import java.net.InetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionKey; import java.nio.channels.selector; import java.nio.channels.socketchannel; import java.util.iterator; import java.util.set; / ** * NIO Client * @author yangtao__anxpp.com * @version 1.0 */ public class ClientHandle implements runnable {private string host; Puerto privado int; selector de selección privada; Socketchannel privado Socketchannel; Comenzó el booleano volátil privado; Public ClientHandle (String IP, Int Port) {this.host = ip; this.port = puerto; Pruebe {// cree selector selector = selector.open (); // Abra el canal de escucha Socketchannel = Socketchannel.open (); // Si es cierto, este canal se colocará en modo de bloqueo; Si False, este canal se colocará en modo sin bloqueo Socketchannel.ConfigureBlowing (falso); // Abrir modo sin bloqueo iniciado = True; } catch (ioException e) {E.PrintStackTrace (); System.exit (1); }} public void stop () {inicio = false; } @Override public void run () {try {Doconnect (); } catch (ioException e) {E.PrintStackTrace (); System.exit (1); } // bucle a través de selector mientras (iniciado) {try {// Independientemente de si hay un evento de lectura y escritura, el selector se despierta cada 1s selector.select (1000); // Bloqueo, y continuará solo cuando ocurra al menos un evento registrado. // selector.select (); Establecer <SelectionKey> keys = selector.selectedKeys (); Iterador <selectionKey> it = keys.iterator (); SelectionKey Key = NULL; while (it.hasnext ()) {key = it.next (); it.remove (); intente {HandleInput (clave); } catch (excepción e) {if (key! = null) {key.cancel (); if (key.channel ()! = NULL) {key.channel (). Close (); }}}}}}}} Catch (Exception e) {E.PrintStackTRace (); System.exit (1); }} // Después de que se cierre el selector, los recursos administrados se lanzarán automáticamente si (Selector! = NULL) intente {selector.close (); } catch (Exception e) {E.PrintStackTrace (); }} Private void HandleInput (selectionKey Key) lanza ioexception {if (key.isValid ()) {Socketchannel sc = (Socketchannel) key.annel (); if (key.isconnectable ()) {if (sc.finishconnect ()); else System.exit (1); } // Lea el mensaje if (key.iseadable ()) {// crea bytebuffer y abra un buffer de 1M bytebuffer buffer = bytebuffer.allocate (1024); // Lea la transmisión del código de solicitud y devuelva el número de bytes read int readbytes = sc.read (buffer); // lea bytes y codifique los bytes if (readbytes> 0) {// Establezca el límite actual del búfer en posición = 0, para las operaciones de lectura posteriores de buffer.flip (); // Crear matriz de bytes basada en el número de bytes legibles en el byte de búfer [] bytes = new byte [buffer.remaining ()]; // Copiar la matriz de bytes legible del búfer en la matriz recién creada Buffer.get (bytes); String result = new String (bytes, "UTF-8"); System.out.println ("Cliente recibió un mensaje:" + resultado); } // No se ignora los bytes//else if (readBytes == 0); // El enlace se ha cerrado, liberando el recurso más if (readBytes <0) {key.cancel (); sc.close (); }}}}} // Enviar mensajes de DowRite vacío privado asincrónicamente (canal Socketchannel, solicitud de cadena) lanza ioexception {// codificando el mensaje como una matriz de byte byte [] bytes = request.getBytes (); // Creación de ByteBuffer basado en la capacidad de matriz bytebuffer writeBuffer = bytebuffer.allocate (bytes.length); // Copiar la matriz de bytes al búfer writeBuffer.put (bytes); // Flip Operation WriteBuffer.flip (); // Enviar la matriz de byte Channel.write (WriteBuffer); // ***** El código para procesar "escribir medio paquete" no está incluido aquí} private void doconnect () lanza ioexception {if (socketchannel.connect (nueva inetSocketAddress (host, puerto))); else socketchannel.register (selector, selectionKey.op_connect); } public void sendmsg (string msg) lanza la excepción {Socketchannel.register (selector, selectionKey.op_read); DowRite (Socketchannel, MSG); }} 2.7. Resultados de demostración
Primero ejecute el servidor y ejecute un cliente por cierto:
paquete com.anxpp.io.calculator.nio; import java.util.scanner; /** * Método de prueba * @author yangtao__anxpp.com * @version 1.0 */public class test {// prueba el método principal @SupessWarnings ("recursos") public static void main (string [] args) lanza la excepción {// ejecutar server.start (); // Evite el cliente ejecutando el código Thread.sleep (100); // ejecutar Client Client.start (); while (client.sendmsg (nuevo escáner (system.in) .nextline ())); }} También podemos ejecutar el cliente por separado, y los efectos son los mismos.
Resultados de una prueba:
El servidor se ha iniciado, número de puerto: 123451+2+3+4+5+6 El servidor recibió el mensaje: 1+2+3+4+5+6 El cliente recibió el mensaje: 211*2/3-4+5*6/7-8 El servidor recibió el mensaje: 1*2/3-4+5*6/7-8 El cliente recibió el mensaje: -7.0476190476190474
No hay ningún problema en ejecutar múltiples clientes.
3. Programación AIO
NIO 2.0 presenta el concepto de nuevos canales asincrónicos y proporciona implementaciones de canales de archivos asíncronos y canales de socket asincrónicos.
El canal de socket asíncrono es realmente asincrónico de E/S sin bloqueo, correspondiente a la E/S basada en eventos (AIO) en la programación de redes UNIX. No requiere demasiados selectores para sondear los canales registrados para lograr lectura y escritura asincrónicas, simplificando así el modelo de programación NIO.
Simplemente suba el código.
3.1. Código del lado del servidor
Servidor:
paquete com.anxpp.io.calculator.aio.server; / ** * servidor AIO * @author yangtao__anxpp.com * @version 1.0 */ public class Server {private static int default_port = 12345; static static static asyncserverhandler servidorhandle; Público Volátil Estático Long ClientCount = 0; public static void start () {start (default_port); } public static static sincronized void start (int puerto) {if (serverHandle! = null) return; serverHandle = new AsyncServerHandler (puerto); nuevo hilo (servidorhandle, "servidor"). start (); } public static void main (string [] args) {server.start (); }} AsyncServerHandler:
paquete com.anxpp.io.calculator.aio.server; import java.io.ioException; import java.net.InetSocketAddress; import java.nio.channels.asyncServersocketchannel; import java.util.concurrent.countdownLatch; public class AsyncServerHandler implementa Runnable {public CountdownLatch Latch; canal de asyncServersocketchannel público; public asyncServerHandler (int puerto) {try {// crea server channel = asynchronousServerSocketchannel.open (); // Bind Port Channel.Bind (nuevo inetSocketAddress (puerto)); System.out.println ("Se ha iniciado el servidor, número de puerto:" + puerto); } catch (ioException e) {E.PrintStackTrace (); }} @Override public void run () {// CountdownLatch Initialization // Su función: Permitir que el campo actual bloquee todo el tiempo antes de completar un conjunto de operaciones que se ejecutan // aquí, deje que el campo bloquee aquí para evitar que el servidor salga después de la ejecución // también puede usar (verdadero)+dormir // el entorno de generación no necesita preocuparse por este problema, pensando que el servidor no saldrá a latch = new cuenta- // Connection Channel.accept (this, nuevo accepthandler ()); intente {latch.await (); } catch (InterruptedException e) {E.PrintStackTrace (); }}} Acceptthandler:
paquete com.anxpp.io.calculator.aio.server; import java.nio.bytebuffer; import java.nio.channels.asynchronoussocketchannel; import java.nio.channels.completionHandler; // Conéctate como una clase pública de controlador Accepthandler implementa FinalationHandler <AsynchronousSocketchannel, AsyncServerHandler> {@Override public void completado (AsynchronousSocketchannel Channel, AsyncServerHandler ServerHandler) {// continúa aceptando solicitudes de otros clientes Server.ClientCount ++; System.out.println ("Número de clientes conectados:" + server.clientCount); serverHandler.channel.accept (ServerHandler, esto); //Create a new Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; import java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2. Client side code
Cliente:
package com.anxpp.io.calculator.aio.client; import java.util.scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); devolver verdadero; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; import java.io.ioException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; Puerto privado int; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; import java.io.ioException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //Complete writing of all data if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //Read data ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("Data send failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; import java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Client received result:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("Data read failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. Prueba
Prueba:
package com.anxpp.io.calculator.aio; import java.util.scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.