Préface:
Récemment, lors de l'analyse du RPC de Hadoop (protocole d'appel de procédure à distance), un protocole qui demande des services à partir de programmes informatiques distants via le réseau sans comprendre la technologie de réseau sous-jacente. Vous pouvez vous référer à: http://baike.baidu.com/view/32726.htm), il a été constaté que la mise en œuvre du mécanisme RPC de Hadoop utilise principalement deux technologies: proxy dynamique (vous pouvez vous référer au blog: http://weixiaolu.iteye.com/blog/147774) et java nio. Afin d'analyser correctement le code source RPC de Hadoop, je pense qu'il est nécessaire d'étudier d'abord les principes et la mise en œuvre spécifique de Java Nio.
Dans ce blog, j'analyse principalement Java Nio à partir de deux directions
Table des matières:
un. La différence entre Java Nio et le blocage des E / S
1. Modèle de communication d'E / S de blocage
2. Principe Java Nio et modèle de communication 2. Implémentation du serveur Java Nio et du code client
Analyse spécifique:
un. La différence entre Java Nio et le blocage des E / S
1. Modèle de communication d'E / S de blocage
Si vous avez une certaine compréhension du blocage des E / S maintenant, nous savons que le blocage des E / S est bloqué lors de l'appel de la méthode InputStream.Read (), il attendra que les données arrivent (ou délais d'attente) avant de revenir; De même, lors de l'appel de la méthode SERVERSocket.Accept (), il bloquera jusqu'à ce qu'il y ait une connexion client avant de retourner. Une fois que chaque client se connecte, le serveur démarrera un thread pour traiter la demande du client. Le diagramme du modèle de communication du blocage des E / S est le suivant:
Si vous l'analysez soigneusement, vous constaterez certainement qu'il y a des inconvénients de blocage des E / S. Sur la base du modèle de communication d'E / S de blocage, j'ai résumé ses deux inconvénients:
1. Lorsqu'il existe de nombreux clients, un grand nombre de threads de traitement seront créés. Et chaque fil occupe de la pile et du temps de processeur
2. Le blocage peut conduire à une commutation de contexte fréquente, et la plupart des commutations de contexte peuvent être dénuées de sens.
Dans ce cas, les E / S non bloquantes ont ses perspectives d'application.
2. Principe et modèle de communication Java Nio
Java Nio a été lancé dans JDK1.4, et on peut dire qu'il s'agit à la fois de "nouvelles E / S" ou d'E / S non bloquantes. Voici comment fonctionne Java Nio:
1. Un fil dédié gère tous les événements IO et est responsable de la distribution.
2. Mécanisme axé sur les événements: déclenche lorsqu'un événement arrive, plutôt que de surveiller les événements simultanément.
3. Communication du thread: les threads communiquent via l'attente, les notifications et autres moyens. Assurez-vous que chaque commutateur de contexte est logique. Réduisez la commutation de filetage inutile.
Après avoir lu quelques informations, j'ai publié le schéma de travail de Java Nio que je comprends:
(Remarque: le flux de traitement de chaque thread est probablement des données de lecture, de décodage, de calcul du traitement, du codage et de l'envoi de réponses.)
Le serveur Java Nio n'a besoin que de démarrer un thread spécial pour gérer tous les événements IO. Comment ce modèle de communication est-il mis en œuvre? Haha, explorons son mystère ensemble. Java Nio utilise un canal bidirectionnel pour la transmission de données, plutôt qu'un flux unidirectionnel, et les événements d'intérêt peuvent être enregistrés sur le canal. Il y a quatre événements au total:
| Nom de l'événement | Valeur correspondante |
| Le serveur reçoit des événements de connexion client | Selectionkey.op_accept (16) |
| Événement de serveur de connexion client | Selectionkey.op_connect (8) |
| Lire les événements | Selectionkey.op_read (1) |
| Écrire des événements | Selectionkey.op_write (4) |
Le serveur et le client maintiennent chacun un objet qui gère un canal, que nous appelons un sélecteur, qui peut détecter des événements sur un ou plusieurs canaux. Prenons le serveur à titre d'exemple. Si un événement de lecture est enregistré sur le sélecteur du serveur, le client envoie des données au serveur à un moment donné. Lors du blocage des E / S, il appellera la méthode Read () pour bloquer les données, et le serveur NIO ajoutera un événement de lecture au sélecteur. Le thread de traitement du serveur interrogera pour accéder au sélecteur. Si un événement d'intérêt arrive lors de l'accès au sélecteur, il traitera ces événements. Si aucun événement d'intérêt n'arrive, le fil de traitement se bloque jusqu'à l'arrivée de l'événement d'intérêt. Vous trouverez ci-dessous un diagramme schématique du modèle de communication de Java Nio que je comprends:
deux. Implémentation du serveur Java Nio et du code client
Afin de mieux comprendre Java Nio, ce qui suit est une simple implémentation de code pour le serveur et le client.
Serveur:
package cn.nio; Importer java.io.ioException; Importer java.net.inetsocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; Importer java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; Importer java.util.iterator; / ** * Nio Server * @Author Small Path * / public class Nioserver {// Channel Manager Sélecteur privé Sélecteur; / ** * Obtenez un canal Serversocket et effectuez des travaux d'initialisation sur le canal * @param port belt port * @throws ioException * / public void initserver (int port) lance ioException {// Obtenez un canal Serversocket serversocketchannel serverChannel = serversocketchannel.open (); // Définissez le canal sur ServerChannel non bloquant.ConfigureBlocking (FALSE); // lier le SERVERSOCKET correspondant à ce canal à Port Port ServerChannel.Socket (). Bind (New IneTsocketAddress (port)); // Obtenez un manager de canal this.selector = selector.open (); // Ligne le gestionnaire de canal à la chaîne et enregistrez l'événement selectionkey.op_accept pour la chaîne. Après avoir enregistré l'événement, // lorsque l'événement arrivera, Selector.Select () reviendra. Si l'événement n'atteint pas Selector.Select () bloquera. ServerChannel.Register (Selector, SelectionKey.op_Accept); } / ** * Utilisez des sondages pour écouter s'il y a des événements sur le sélecteur qui doivent être traités. Si c'est le cas, il sera traité * @throws ioException * / @SuppressWarnings ("Unchecked") public void écouter () lève ioException {System.out.println ("Server-Side Start avec succès!"); // Pollage pour accéder au sélecteur tandis que (true) {// Lorsque l'événement enregistré arrive, la méthode renvoie; Sinon, la méthode continuera de bloquer sélecteur.select (); // Obtenez l'itérateur de l'élément sélectionné dans le sélecteur, et l'élément sélectionné est l'élément d'événement enregistré ITERATOR ITEM = this.selector.selectedKeys (). Iterator (); while (ite.hasnext ()) {SELECTION KEY = (SelectionKey) ite.next (); // Supprimer la touche sélectionnée pour empêcher le traitement répété de l'élément.Remove (); // Client demande l'événement de connexion if (key.isacceptable ()) {serversocketchannel server = (serversocketchannel) key .Channel (); // Obtenez le canal pour se connecter au client Socketchannel canal = server.accept (); // défini sur canal non bloquant.configureBlocking (false); // Vous pouvez envoyer des informations au client ici channel.write (bytebuffer.wrap (new String ("Envoyer un message au client"). GetBytes ())); // Une fois que la connexion avec le client a réussi, afin de recevoir les informations du client, vous devez définir les autorisations de lecture pour le canal. Channel.Register (this.selector, selectionKey.op_read); // Un événement lisible a été obtenu} else if (key.isreadable ()) {read (key); }}}} / ** * Traitement des événements qui lisent les messages envoyés par le client * @param key * @throws ioException * / public void read (SelectionKey key) lance ioException {// Le serveur peut lire les messages: obtenir le canal de socket où l'événement se produit. Socketchannel Channel = (socketchannel) key.channel (); // Créer un tampon de tampon de lecture ByteBuffer Buffer = ByteBuffer.Allocation (10); canal.read (tampon); octet [] data = buffer.array (); String msg = new String (data) .trim (); System.out.println ("Le serveur a reçu le message:" + msg); ByteBuffer OutBuffer = ByteBuffer.Wrap (msg.getBytes ()); channel.write (undbuffer); // renvoie le message au client} / ** * Démarrer le test du serveur * @throws ioException * / public static void main (String [] args) lance ioException {Nioserver Server = new Nioserver (); server.initServer (8000); server.Listen (); }} Client:
package cn.nio; Importer java.io.ioException; Importer java.net.inetsocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.socketchannel; Importer java.util.iterator; / ** * Nio Client * @Author Small Path * / public class nioclient {// Channel Manager Sélecteur privé Sélecteur; / ** * Obtenez un canal de socket et effectuez une certaine initialisation de la chaîne * @param ip L'IP du serveur connecté à * @param port le numéro de port du serveur connecté à * @Throws ioException * / public void initClient (String ip) lance ioException {// Obtenez un socket de canne de socket canketchenel (socketchannel.open (); // Définissez le canal sur canal non bloquant.configureBlocking (false); // Obtenez un manager de canal this.selector = selector.open (); // Le client se connecte au serveur. En fait, l'exécution de la méthode n'implémente pas la connexion. Vous devez appeler // utiliser channel.finishconnect (); Dans la méthode écouter () pour compléter le canal de connexion.Connect (new IneTSocketAddress (IP, port)); // Ligne le gestionnaire de canal à la chaîne et enregistrez l'événement selectionkey.op_connect pour la chaîne. Channel.Register (Selector, SelectionKey.op_Connect); } / ** * Utilisez des sondages pour écouter s'il y a des événements sur le sélecteur qui doivent être traités. Si c'est le cas, il sera traité * @throws ioException * / @SuppressWarnings ("Unchecked") public void écouter () lève ioException {// sondage pour accéder au sélecteur tandis que (true) {selector.select (); // Obtenez l'itérateur de l'élément sélectionné dans l'élément d'itérateur de sélecteur = this.selector.selectedKeys (). Iterator (); while (ite.hasnext ()) {SELECTIONKEY KEY = (SELECTIONKEY) item.next (); // Supprimer la touche sélectionnée pour empêcher le traitement répété de l'élément.Remove (); // L'événement de connexion se produit if (key.isconnectable ()) {socketchannel canal = (socketchannel) key .Channel (); // Si la connexion est connectée, complétez la connexion if (canal.isconnectionPending ()) {canal.FinishConnect (); } // défini sur canal non bloquant.configureBlocking (false); // Vous pouvez envoyer des informations sur le serveur canal.write (bytebuffer.wrap (new String ("Envoyer un message au serveur"). GetBytes ())); // Une fois que la connexion avec le serveur a réussi, afin de recevoir les informations du serveur, le canal doit être défini pour lire les autorisations. Channel.Register (this.selector, selectionKey.op_read); // Un événement lisible a été obtenu} else if (key.isreadable ()) {read (key); }}}} / ** * Traitement des événements qui lisent les messages envoyés par le serveur * @Param Key * @throws ioException * / public void read (SelectionKey Key) lance ioexception {// Identique à la méthode de lecture du serveur} / ** * Démarrer le test client * @throws ioException * / public static Void Main (String [] args) throws ioexception {nioClient Client = New Nioclient (); client.InitClient ("LocalHost", 8000); client.Listen (); }}résumé:
Enfin, l'analyse Dynamic Proxy et Java Nio est terminée. Haha, ce qui suit est d'analyser le code source du mécanisme RPC de Hadoop. L'adresse du blog est: http://weixiaolu.iteye.com/blog/1504898. Cependant, si vous avez des objections à votre compréhension de Java Nio, vous êtes invités à en discuter ensemble.
Si vous devez réimprimer, veuillez indiquer la source: http://weixiaolu.iteye.com/blog/1479656