Cet article introduira de peu profond à la biographie traditionnelle en NIO à AIO, et sera accompagné d'une explication complète du code.
Un exemple sera utilisé dans le code suivant: le client envoie une chaîne de l'équation au serveur et le serveur renvoie le résultat au client après le calcul.
Toutes les instructions pour le code sont directement utilisées comme commentaires et sont intégrées dans le code, ce qui peut être plus facile à comprendre lors de la lecture du code. Une classe d'outils pour calculer le résultat sera utilisée dans le code, voir la section Code de l'article.
Articles recommandés pour les connaissances de base connexes:
Introduction au modèle d'E / S du réseau Linux (images et texte)
Concurrence Java (multi-threading)
1. Programmation bio
1.1. Programmation bio traditionnelle
Le modèle de base de la programmation réseau est le modèle C / S, c'est-à-dire la communication entre deux processus.
Le serveur fournit des ports IP et d'écoute. Le client initie une demande de connexion via l'adresse de l'opération de connexion que le serveur souhaite écouter. Grâce à trois poignées de main, si la connexion est établie avec succès, les deux parties peuvent communiquer via des prises.
Dans le développement du modèle de blocage traditionnel de synchronisation, Serversocket est responsable de la liaison des adresses IP et du démarrage des ports d'écoute; Socket est responsable de l'initiation des opérations de connexion. Une fois la connexion réussie, les deux parties effectuent une communication de blocage synchrone via les flux d'entrée et de sortie.
Une brève description du modèle de communication Bio Server: Le serveur utilisant le modèle de communication BIO est généralement un thread accepteur indépendant responsable de l'écoute de la connexion du client. Après avoir reçu la demande de connexion client, il crée un nouveau thread pour chaque client pour le traitement des liens et ne le traite pas, puis renvoie la réponse au client via le flux de sortie, et le thread est détruit. C'est-à-dire un modèle typique d'une demande à une nouvelle nuit.
Diagramme traditionnel du modèle de communication bio:
Le plus gros problème avec ce modèle est qu'il manque de capacités de mise à l'échelle élastiques. Lorsque le nombre d'accès simultanés sur le client augmente, le nombre de threads sur le serveur est proportionnel au nombre d'accès simultanés sur le client. Les threads en Java sont également des ressources système relativement précieuses. Une fois que le nombre de threads s'est développé rapidement, les performances du système baisseront fortement. Alors que le nombre d'accès continue d'augmenter, le système finira par mourir.
Code source du serveur créé par des E / S de blocage synchrone:
package com.anxpp.io.calculator.bio; Importer java.io.ioException; import java.net.serversocket; import java.net.socket; / ** * Bio Server Source Code * @author yangtao__anxpp.com * @version 1.0 * / public final class ServerNormal {// Numéro de port par défaut privé static int default_port = 12345; // Singleton Serversocket Static STATIC SERVERSOCKET Server; // Définissez le port d'écoute en fonction des paramètres entrants. S'il n'y a pas de paramètres, appelez la méthode suivante et utilisez la valeur par défaut public static void start () lève ioException {// Utilisez la valeur par défaut start (default_port); } // Cette méthode ne sera pas accessible de manière simultanée, et il n'est pas nécessaire de considérer l'efficacité, il suffit de synchroniser la méthode directement synchronisée statique void start (int) lance ioException {if (server! = Null) return; Essayez {// Créer Serversocket via le constructeur // Si le port est légal et inactif, le serveur écoutera avec succès. Server = new serversocket (port); System.out.println ("Le serveur a été démarré, numéro de port:" + port); // Écoutez les connexions du client via une boucle sans fil // s'il n'y a pas d'accès client, il sera bloqué sur l'opération d'acceptation. while (true) {socket socket = server.accept (); // Lorsqu'il y a un nouvel accès client, le code suivant sera exécuté // Créera ensuite un nouveau thread pour gérer ce nouveau thread de liaison de socket (nouveau serverhandler (socket)). Start (); }} Enfin {// certains travaux de nettoyage nécessaires if (serveur! = null) {System.out.println ("Le serveur est fermé."); server.close (); server = null; }}}} Traitement des messages du client Code source du serveur de serveur:
package com.anxpp.io.calculator.bio; Importer java.io.bufferedReader; Importer java.io.ioException; Importer java.io.inputStreamReader; import java.io.printwriter; import java.net.socket; import com.anxpp.io.utils.calculateur; / ** * Filetage client * @author yangtao__anxpp.com * lien de socket pour un client * / classe publique ServerHandler implémente Runnable {private socket socket; Public ServerHandler (socket socket) {this.socket = socket; } @Override public void run () {BufferedReader dans = null; Printwriter out = null; essayez {in = new BuffereDReader (new inputStreamReader (socket.getInputStream ())); out = new printwriter (socket.getOutputStream (), true); Expression de chaîne; Résultat de la chaîne; tandis que (true) {// lisez une ligne via BufferedReader // Si vous avez lu la queue du flux d'entrée, renvoyez NULL et quittez la boucle // Si vous obtenez une valeur non nul, essayez de calculer le résultat et de retourner si ((expression = in.readline ()) == null) Break; System.out.println ("Le serveur a reçu un message:" + expression); try {result = calculator.cal (expression) .toString (); } catch (exception e) {result = "calculator.cal (expression) .toString ();} catch (exception e) {e.printStackTrace ();} enfin {// certains travaux de nettoyage nécessaires if (in! = null) {try {in.close ();} catch (ioException e) {e.printStackTrace ();} in = null; Null) {out.close (); Code source client créé par des E / S de blocage synchrone:
package com.anxpp.io.calculator.bio; Importer java.io.bufferedReader; Importer java.io.ioException; Importer java.io.inputStreamReader; import java.io.printwriter; import java.net.socket; / ** * Client créé en bloquant les E / O * @author yangtao__anxpp.com * @version 1.0 * / public class Client {// Numéro de port par défaut privé static int default_server_port = 12345; chaîne statique privée default_server_ip = "127.0.0.1"; public static void Send (String Expression) {send (default_server_port, expression); } public static void send (int port, chaîne expression) {System.out.println ("l'expression arithmétique est:" + expression); Douille à douille = null; BufferedReader dans = null; Printwriter out = null; essayez {socket = new socket (default_server_ip, port); dans = new BufferedReader (new inputStreamReader (socket.getInputStream ())); out = new printwriter (socket.getOutputStream (), true); out.println (expression); System.out.println ("___ Le résultat est:" + in.readline ()); } catch (exception e) {e.printStackTrace (); } Enfin {// sont le travail de nettoyage nécessaire if (in! = null) {try {in.close (); } catch (ioException e) {e.printStackTrace (); } in = null; } if (out! = null) {out.close (); out = null; } if (socket! = null) {try {socket.close (); } catch (ioException e) {e.printStackTrace (); } socket = null; }}}} Testez le code, afin de faciliter la visualisation des résultats de sortie dans la console, mettez-le dans le même programme (JVM) à exécuter:
package com.anxpp.io.calculator.bio; Importer java.io.ioException; import java.util.random; / ** * Méthode de test * @author yangtao__anxpp.com * @version 1.0 * / public class test {// tester la méthode principale publique statique void main (string [] args) lance InterruptedException {// exécuter le serveur nouveau thread (new Runnable () {@Override public Void run () {try {serverBetter e.printStackTrace ();}}}). start (); // Évitez le client exécutant le code avant le début du serveur; // Exécute les opérateurs de Client char [] = {'+', '-', '*', '/'}; Random Random = new Random (System.CurrentTimemillis ()); Nouveau thread (new Runnable () {@SuppressWarnings ("static-access") @Override public void run () {while (true) {// Random génère une expression d'arithmétique expression = random.nextint (10) + "" + opérateurs [random.nextint (4)] + (random.nextint (10) +1); Client.Send (expression); essai {essai { Thread.currentThread (). }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}, }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}, } } } } } } } } { Fil de discussion. CurrentThread (). Sleep (Random.Nextint (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Fil de discussion. CurrentThread (). Sleep (Random.Nextint (1000)); }}}}}}}} Les résultats de l'une des courses:
Le serveur a été démarré: Numéro de port: 12345 L'expression arithmétique est: 4-2 Server reçu le message: 4-2 ___ Résultat est: 2 L'expression arithmétique est: 5-10 Server reçu le message: 0-10__ Résultat est: -5 expression arithmétique est: 0 + 6 Server reçu le message: 0 + 6__ Result est: 6 ARITHMET a reçu le message: le résultat 1/6__ est: 0.16666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666666
À partir du code ci-dessus, il est facile de voir que le principal problème de BIO est que chaque fois qu'un nouveau client demande l'accès, le serveur doit créer un nouveau thread pour gérer ce lien, qui ne peut pas être appliqué dans des scénarios où des performances élevées et une concurrence élevée sont nécessaires (un grand nombre de nouveaux threads affecteront sérieusement les performances du serveur et même la grève).
1.2. Programmation des E / S pseudo-asynchrones
Pour améliorer ce modèle à un thread à une connexion, nous pouvons utiliser un pool de threads pour gérer ces threads (pour plus d'informations, veuillez vous référer à l'article fourni précédemment), en mettant en œuvre un modèle pour un ou plusieurs threads pour traiter les clients N (mais la couche sous-jacente utilise toujours un modèle d'E / S syschronique), qui est souvent appelée "Pseudo-asynchrone I / O Model".
Diagramme du modèle d'E / S pseudo-asynchrone:
L'implémentation est très simple. Nous avons juste besoin de remettre le nouveau thread sur la gestion du pool de thread et de changer simplement le code du serveur tout à l'heure:
package com.anxpp.io.calculator.bio; Importer java.io.ioException; import java.net.serversocket; import java.net.socket; Importer java.util.concurrent.executorService; Importer java.util.concurrent.executors; / ** * Bio Server Source Code__pseudo-Asynchronous I / O * @author yangtao__anxpp.com * @version 1.0 * / public final class serverbetter {// Numéro de port par défaut privé static int default_port = 12345; // Singleton Serversocket Static STATIC SERVERSOCKET Server; // Singleton Private Static ExecutorService ExecutorService = Exécutors.NewFixEdThreadPool (60); // Définissez le port d'écoute en fonction des paramètres entrants. S'il n'y a pas de paramètre, appelez la méthode suivante et utilisez la valeur par défaut public static void start () lève ioException {// Utilisez la valeur par défaut start (default_port); } // Cette méthode ne sera pas accessible dans un grand nombre de simultanément, et il n'est pas nécessaire de considérer l'efficacité, il suffit de synchroniser la méthode directement synchronisée statique void start (int) lance ioException {if (server! = Null) return; Essayez {// Créer Serversocket via le constructeur // Si le port est légal et inactif, le serveur écoutera avec succès. Server = new serversocket (port); System.out.println ("Le serveur a été démarré, numéro de port:" + port); // Superce la connexion du client via une boucle sans fil // s'il n'y a pas d'accès client, il sera bloqué sur l'opération d'acceptation. while (true) {socket socket = server.accept (); // Lorsqu'il y a un nouvel accès client, le code suivant sera exécuté // Créera ensuite un nouveau thread pour traiter le lien de socket exécutor-service.exécute (new ServerHandler (socket)); }} Enfin {// certains travaux de nettoyage nécessaires if (serveur! = null) {System.out.println ("Le serveur est fermé."); server.close (); server = null; }}}} Les résultats des essais sont les mêmes.
Nous savons que si nous utilisons CachedThreadPool Thread Pool (pas de limitation du nombre de threads, sinon claire, veuillez vous référer à l'article fourni au début de l'article), en fait, en plus de nous aider automatiquement à gérer les threads (réutilisation), il ressemble également à un modèle de comptage client 1: 1. À l'aide de FixedTheredpool, nous contrôlons efficacement le nombre maximum de threads, assurons le contrôle des ressources limitées du système et implémentons le modèle d'E / S pseudo-asynchrone N: M.
Cependant, comme le nombre de threads est limité, si un grand nombre de demandes simultanées se produisent, les threads dépassant le nombre maximum ne peuvent qu'attendre qu'il y ait des threads libres dans le pool de threads qui peuvent être réutilisés. Lorsque le flux d'entrée de socket est lu, il sera bloqué jusqu'à ce qu'il se produise:
Par conséquent, lorsque la lecture des données est lente (comme une grande quantité de données, une transmission de réseau lente, etc.) et de grandes quantités de concurrence, d'autres messages d'accès ne peuvent être attendus que tout le temps, ce qui est le plus grand inconvénient.
Le NIO qui sera introduit plus tard peut résoudre ce problème.
2. Programmation NIO
La nouvelle bibliothèque d'E / S Java est introduite dans le package java.nio. * Dans JDK 1.4, dans le but d'augmenter la vitesse. En fait, le «vieux» package d'E / S a été réimplémenté à l'aide de NIO, et nous pouvons en bénéficier même si nous n'utilisons pas explicitement la programmation NIO. Des améliorations de vitesse peuvent se produire à la fois dans les E / S de fichiers et les E / S de réseau, mais cet article ne traite que de ce dernier.
2.1. Introduction
Nous considérons généralement Nio comme de nouvelles E / S (également le nom officiel), car il est nouveau dans l'ancienne bibliothèque d'E / S (en fait, il a été introduit dans JDK 1.4, mais ce nom continuera d'être utilisé depuis longtemps, même s'ils sont "vieux" maintenant, donc cela nous rappelle également que nous devons le considérer attentivement lors de la dénomination) et a fait de grands changements. Cependant, il est appelé E / S non block par beaucoup de gens, c'est-à-dire des E / S non bloquantes, car cela s'appelle, il peut mieux refléter ses caractéristiques. Le NIO dans le texte suivant ne fait pas référence à toute la nouvelle bibliothèque d'E / S, mais ne bloque pas les E / S.
NIO fournit deux implémentations de canaux de socket différentes: Socketchannel et Serversocketchannel correspondant à Socket et SERVERSocket dans le modèle BIO traditionnel.
Les deux canaux nouvellement ajoutés prennent en charge les modes de blocage et de non-blocage.
L'utilisation du mode de blocage est aussi simple que le support traditionnel, mais les performances et la fiabilité ne sont pas bonnes; Le mode non bloquant est exactement le contraire.
Pour les applications à faible charge et à faible monnaie, les E / S de blocage synchrone peuvent être utilisées pour améliorer le taux de développement et une meilleure maintenance; Pour les applications à forte charge et à haute monnaie (réseau), le mode non bloquant de NIO doit être utilisé pour se développer.
Les connaissances de base seront introduites d'abord ci-dessous.
2.2. Tampon tampon
Un tampon est un objet qui contient des données à écrire ou à lire.
Dans la bibliothèque NIO, toutes les données sont traitées dans un tampon. Lors de la lecture des données, il est lu directement dans le tampon; Lors de l'écriture de données, il est également écrit dans le tampon. À tout moment, vous accédez à des données dans NIO, il fonctionne via un tampon.
Un tampon est en fait un tableau et fournit des informations telles que l'accès structuré aux données et la maintenance des emplacements de lecture et d'écriture.
Les zones de cache spécifiques sont: ByteBuffe, Charbuffer, Shortbuffer, Intbuffer, LongBuffer, FloatBuffer, DoubleBuffer. Ils implémentent la même interface: tampon.
2.3. Canal
Notre lecture et notre écriture de données doivent être passées à travers le canal, ce qui est comme un tuyau d'eau, un canal. La différence entre un canal et un flux est que le canal est bidirectionnel et peut être utilisé pour la lecture, l'écriture et les opérations de lecture et d'écriture simultanées.
Les canaux du système d'exploitation sous-jacent sont généralement complets, donc un canal complet peut mieux cartographier l'API du système d'exploitation sous-jacent qu'un flux.
Les canaux sont principalement divisés en deux catégories:
Le SERVERSOCHETCHANNEL et SOCKetChannel qui seront impliqués dans le code suivant sont tous deux des sous-classes de SelectableChannel.
2.4. Sélecteur de multiplexeur
Le sélecteur est la base de la programmation Java Nio.
Selector offre la possibilité de sélectionner les tâches prêtes: Selector interrogera constamment le canal enregistré dessus. Si un événement de lecture ou d'écriture se produit sur un canal, le canal sera à l'état prêt et sera interrogé par le sélecteur. Ensuite, l'ensemble des canaux prêts peut être obtenu via le SelectionKey pour effectuer des opérations d'E / S ultérieures.
Un sélecteur peut interroger plusieurs canaux en même temps, car le JDK utilise epoll () au lieu de l'implémentation de sélection traditionnelle, il n'y a pas de limite sur la poignée de connexion maximale 1024/2048. Ainsi, un seul fil doit être responsable du scrutin du sélecteur, et il peut accéder à des milliers de clients.
2.5. Serveur NIO
Le code semble beaucoup plus compliqué que la programmation de socket traditionnelle.
Collez simplement le code et donnez la description du code sous forme de commentaires.
Code source du serveur créé par Nio:
package com.anxpp.io.calculator.nio; public class Server {private static int default_port = 12345; Private Static ServerHandle ServerHandle; public static void start () {start (default_port); } public static synchronisé void start (int port) {if (serverhandle! = null) serverhandle.stop (); serverHandle = new ServerHandle (port); nouveau thread (serverhandle, "serveur"). start (); } public static void main (string [] args) {start (); }} ServerHandle:
package com.anxpp.io.calculator.nio; Importer java.io.ioException; Importer java.net.inetsocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; Importer java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; Importer java.util.iterator; import java.util.set; import com.anxpp.io.utils.calculateur; / ** * Nio Server * @author yangtao__anxpp.com * @version 1.0 * / public class ServerHandle implémente Runnable {private Selector Selector; SERVERSOCHAGE PRIVÉECHANNEL ServerChannel; Le booléen volatil privé a commencé; / ** * Constructeur * @param port Spécifiez le numéro de port à écouter * / public serverHandle (int port) {try {// Create Selector = Selector.Open (); // Ouvrez le canal d'écoute ServerChannel = serversOCHANNEL.Open (); // Si vrai, ce canal sera placé en mode de blocage; Si FALSE, ce canal sera placé dans le mode non bloquant ServerChannel.ConfigureBlocking (FALSE); // Activer le mode non bloquant // Le backlog du port de bind est défini sur 1024 serverChannel.socket (). Bind (new IneTSocketAddress (port), 1024); // Superce Client Connection Demande ServerChannel.Register (Selector, SelectionKey.OP_ACcept); // Marque le serveur est activé démarré = true; System.out.println ("Le serveur a été démarré, numéro de port:" + port); } catch (ioException e) {e.printStackTrace (); System.exit (1); }} public void stop () {start = false; } @Override public void run () {// Loop via le sélecteur tandis que (démarré) {try {// s'il y a un événement de lecture et d'écriture, le sélecteur est réveillé tous les 1S Selector.Select (1000); // Blocage, il ne continuera que lorsque au moins un événement enregistré se produit. // selector.select (); Set <lelectionKey> keys = Selector.SelectedKeys (); Iterator <lelectionKey> it = keys.iterator (); SELECTIONKEKEY KEY = NULL; while (it.hasnext ()) {key = it.next (); it.remove (); essayez {handleInput (key); } catch (exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}}} Catch (Throwable T) {t.printStackTrace (); }} // Une fois le sélecteur fermé, les ressources gérées seront automatiquement publiées si (sélecteur! = Null) try {Selector.close (); } catch (exception e) {e.printStackTrace (); }} private void handleInput (sélectionKey key) lève ioException {if (key.isvalid ()) {// Traitement le message de demande pour un nouvel accès if (key.isacceptable ()) {serversocketchannel ssc = (serversocketchannel) key.channel (); // Création d'une instance Socketchannel via Accept // Terminez cette opération de ServersocketChannel signifie terminer la poignée de main à trois voies TCP, et le lien physique TCP est officiellement établi. Socketchannel sc = ssc.accept (); // défini sur SC.C.ConfigureBlocking (FALSE); // Inscrivez-vous en tant que lecture sc.register (Selector, SelectionKey.op_read); } // lire le message if (key.isreadable ()) {socketchannel sc = (socketchannel) key.channel (); // Créer ByteBuffer et ouvrir un tampon ByteBuffer ByteBuffer de 1 m = ByteBuffer.Allocation (1024); // Lire le flux de la demande et renvoyer le nombre d'octets Read Int ReadBytes = SC.Read (Buffer); // Lire les octets et code les octets if (readBytes> 0) {// Définissez la limite actuelle du tampon en position = 0, pour les opérations de lecture ultérieures de tampon.flip (); // Créer un tableau d'octet basé sur le nombre d'octets d'octets lisibles de tampon = new byte [buffer.reMinging ()]; // Copiez le tableau d'octets lisible par le tampon dans le tableau nouvellement créé Buffer.get (octets); String expression = new String (Bytes, "UTF-8"); System.out.println ("Server a reçu un message:" + expression); // Traitement des données de données Résultat = null; try {result = calculator.cal (expression) .toString (); } catch (exception e) {result = "Erreur de calcul:" + e.getMessage (); } // Envoi du message de réponse dowrite (sc, résultat); } // pas d'octets lus et ignorer // else if (readBytes == 0); // Le lien a été fermé, libérant la ressource else if (readBytes <0) {key.cancel (); sc.close (); }}}}} // Envoi le message de réponse de réponse de manière asynchrone void dowrite (canal socketchannel, réponse de chaîne) lève ioException {// codant le message en tant que byte de tableau d'octet [] bytes = réponse.getBytes (); // Créer ByteBuffer en fonction de la capacité du tableau ByteBuffer WriteBuffer = ByteBuffer.AllOcy (Bytes.Length); // Copiez le tableau d'octets dans le tampon writebuffer.put (octets); // Flip Operation WriteBuffer.flip (); // Envoyez le tableau d'octets de Buffer Channel.Write (WriteBuffer); // ***** Le code pour le traitement "écrire demi-packet" n'est pas inclus ici}}Comme vous pouvez le voir, les principales étapes de création d'un serveur NIO sont les suivantes:
Étant donné que le message de réponse est envoyé, Socketchannel est également asynchrone et non bloquant, il ne peut donc pas être garanti que les données qui doivent être envoyées peuvent être envoyées en même temps, et il y aura un problème d'écriture d'un demi-paquet à ce moment. Nous devons enregistrer une opération d'écriture, interroger constamment le sélecteur pour envoyer les messages unsets, puis utiliser la méthode HasRemain () du tampon pour déterminer si le message est envoyé.
2.6. NIO Client
Il vaut mieux télécharger le code. Le processus ne nécessite pas trop d'explications, il est un peu similaire au code du serveur.
Client:
package com.anxpp.io.calculator.nio; CLASSE PUBLIQUE CLIENT {STATIQUE PRIVATE STATIQUE DEUNTAUT_HOST = "127.0.0.1"; private static int default_port = 12345; ClientHandle statique privé Handlehandle; public static void start () {start (default_host, default_port); } public static synchronisé void start (String ip, int port) {if (clientHandle! = null) clientHandle.stop (); clientHandle = new ClientHandle (IP, port); nouveau thread (clientHandle, "serveur"). start (); } // Envoi un message au serveur public static booléen sendmsg (String msg) lève une exception {if (msg.equals ("q")) return false; ClientHandle.SendMsg (MSG); Retour Vrai; } public static void main (string [] args) {start (); }} ClientHandle:
package com.anxpp.io.calculator.nio; Importer java.io.ioException; Importer java.net.inetsocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.socketchannel; Importer java.util.iterator; import java.util.set; / ** * Nio Client * @author yangtao__anxpp.com * @version 1.0 * / public class ClientHandle implémente Runnable {private String Host; port int privé; Selecteur privé sélecteur; Socketchannel privé Socketchannel; Le booléen volatil privé a commencé; public clientHandle (String ip, int port) {this.host = ip; this.port = port; essayez {// Créer Selector Selector = Selector.Open (); // Ouvrez le canal d'écoute socketchannel = socketchannel.open (); // Si vrai, ce canal sera placé en mode de blocage; Si faux, ce canal sera placé en mode non bloquant socketchannel.configureBlocking (false); // ouvrir le mode non bloquant démarré = true; } catch (ioException e) {e.printStackTrace (); System.exit (1); }} public void stop () {start = false; } @Override public void run () {try {doconnect (); } catch (ioException e) {e.printStackTrace (); System.exit (1); } // Loop via le sélecteur tandis que (démarré) {try {//, qu'il y ait un événement de lecture et d'écriture, le sélecteur est éveillé tous les 1S Selector.Select (1000); // Blocking, et il ne se poursuivra que lorsque au moins un événement enregistré se produit. // selector.select (); Set <lelectionKey> keys = Selector.SelectedKeys (); Iterator <lelectionKey> it = keys.iterator (); SELECTIONKEKEY KEY = NULL; while (it.hasnext ()) {key = it.next (); it.remove (); essayez {handleInput (key); } catch (exception e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}} catch (exception e) {e.printStackTrace (); System.exit (1); }} // Une fois le sélecteur fermé, les ressources gérées seront automatiquement publiées si (sélecteur! = Null) try {Selector.close (); } catch (exception e) {e.printStackTrace (); }} private void handleInput (sélectionkey key) lève ioException {if (key.isvalid ()) {socketchannel sc = (socketchannel) key.channel (); if (key.isconnectable ()) {if (sc.FinishConnect ()); else System.exit (1); } // Lisez le message if (key.isReadable ()) {// Créer ByteBuffer et ouvrir un tampon ByteBuffer ByteBuffer de 1M 1M = BYTEBUFFER.ALLOCY (1024); // Lisez le flux de code de demande et renvoyez le nombre d'octets Read Int ReadBytes = SC.Read (Buffer); // Lire les octets et code les octets if (readBytes> 0) {// Définissez la limite actuelle du tampon en position = 0, pour les opérations de lecture ultérieures de tampon.flip (); // Créer un tableau d'octet basé sur le nombre d'octets lisibles dans le tampon octet [] octets = nouveau octet [tampon.ReMinging ()]; // Copiez le tableau d'octets lisible par le tampon dans le tableau nouvellement créé Buffer.get (octets); Résultat de la chaîne = new String (Bytes, "UTF-8"); System.out.println ("Le client a reçu un message:" + Résultat); } // pas d'octets lecture est ignoré // else if (readBytes == 0); // Le lien a été fermé, libérant la ressource else if (readBytes <0) {key.cancel (); sc.close (); }}}}} // Envoi des messages void dowrite void privé de manière asynchrone (canal socketchannel, string request) lance ioException {// codant le message en tant que byte d'octet Byte [] bytes = request.getBytes (); // Création de bytebuffer en fonction de la capacité du tableau ByteBuffer WriteBuffer = byteBuffer.Allocate (Bytes.Length); // Copie du tableau d'octets dans le tampon writebuffer.put (octets); // Flip Operation WriteBuffer.flip (); // Envoyez le tableau des octets Channel.Write (WriteBuffer); // ***** Le code pour le traitement "Write Half-Packet" n'est pas inclus ici} private void doconnect () lève ioException {if (socketchannel.connect (new InetsocketDress (hôte, port))); else socketchannel.register (sélecteur, selectionKey.op_connect); } public void sendmsg (String msg) lève une exception {socketchannel.register (sélecteur, selectionKey.op_read); Dowrite (socketchannel, msg); }} 2.7. Résultats de la démonstration
Exécutez d'abord le serveur et exécutez un client au fait:
package com.anxpp.io.calculator.nio; import java.util.scanner; / ** * Méthode de test * @author yangtao__anxpp.com * @version 1.0 * / public class test {// Tester la méthode principale @SuppressWarnings ("Resource") public static void main (String [] args) lève exception {// run server server.start (); // Évitez le client exécutant le code thread.Sleep (100); // Exécuter le client client.start (); while (client.sendmsg (nouveau scanner (System.in) .NextLine ())); }} Nous pouvons également exécuter le client séparément et les effets sont les mêmes.
Résultats d'un test:
Le serveur a été démarré, numéro de port: 123451 + 2 + 3 + 4 + 5 + 6 Le serveur a reçu le message: 1 + 2 + 3 + 4 + 5 + 6 Le client a reçu le message: 211 * 2/3-4 + 5 * 6/7-8 Le serveur a reçu le message: 1 * 2 / 3-4 + 5 * 6 / 7-8 Le client a reçu le message: -7.04761904761904744
Il n'y a aucun problème à exécuter plusieurs clients.
3. Programmation AIO
Nio 2.0 présente le concept de nouveaux canaux asynchrones et fournit des implémentations de canaux de fichiers asynchrones et de canaux de douille asynchrones.
Le canal de socket asynchrone est vraiment des E / S non bloquantes asynchrones, correspondant aux E / S (AIO), dans la programmation réseau UNIX. Il ne nécessite pas trop de sélecteurs pour interroger les canaux enregistrés pour obtenir une lecture et une écriture asynchrones, simplifiant ainsi le modèle de programmation NIO.
Téléchargez simplement le code.
3.1. Code côté serveur
Serveur:
package com.anxpp.io.calculator.aio.server; / ** * AIO Server * @author yangtao__anxpp.com * @version 1.0 * / public class Server {private static int default_port = 12345; Private Static AsyncServerHandler ServerHandle; public Volatile statique long ClientCount = 0; public static void start () {start (default_port); } public static synchronisé void start (int port) {if (serverhandle! = null) return; serverHandle = new AsyncServerHandler (port); nouveau thread (serverhandle, "serveur"). start (); } public static void main (string [] args) {server.start (); }} AsyncServerHandler:
package com.anxpp.io.calculator.aio.server; Importer java.io.ioException; Importer java.net.inetsocketAddress; Importer java.nio.channels.asyncServersocketchannel; Importer java.util.concurrent.CountDownLatch; classe publique AsyncServerHandler implémente Runnable {public CountdownLatch Latch; Public AsyncServersocketChannel Channel; public asyncServerHandler (int port) {try {// Create Server Channel = AsynchrEousSerVersocketchannel.open (); // Bind Port Channel.bind (new IneTSocketAddress (port)); System.out.println ("Le serveur a été démarré, numéro de port:" + port); } catch (ioException e) {e.printStackTrace (); }} @Override public void run () {// CountdownLatch Initialisation // sa fonction: permettez au champ actuel de bloquer tout le temps avant de terminer un ensemble d'opérations exécutées // ici, laissez le champ bloquer ici pour empêcher le serveur de sortir après l'exécution // Vous pouvez également utiliser pendant (vrai) + Sleep // L'environnement de génération n'a pas de problème (1 // Connection Channel.Accept (this, new accepthandler ()); essayez {latch.await (); } catch (InterruptedException e) {e.printStackTrace (); }}} Accepthandler:
package com.anxpp.io.calculator.aio.server; import java.nio.bytebuffer; Importer java.nio.channels.asynchronoussocketchannel; Importer java.nio.channels.completionhandler; // Connectez-vous en tant que gestionnaire de classe publique ACCEPTHANDLER implémente CompletionHandler <asynchronoussocketchannel, AsyncServerHandler> {@Override public void terminé (AsynchronousSocketchannel Channel, AsyncServerHandler ServerHandler) {// Continuer d'accepter les demandes d'autres clients Server.ClientCount ++; System.out.println ("Nombre de clients connectés:" + Server.ClientCount); serverhandler.channel.accept (serverhandler, this); // Créer un nouveau tampon ByteBuffer Buffer = ByteBuffer.AllOcy (1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; Importer java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (ioException e) {e.printStackTrace (); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2. Client side code
Client:
package com.anxpp.io.calculator.aio.client; import java.util.scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; Importer java.io.ioException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; port int privé; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (ioException e) {e.printStackTrace (); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (ioException e) {e.printStackTrace (); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (ioException e) {e.printStackTrace (); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; Importer java.io.ioException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //Complete writing of all data if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //Read data ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("Data send failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; Importer java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Client received result:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("Data read failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. Test
Test:
package com.anxpp.io.calculator.aio; import java.util.scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.