Vorwort:
Kürzlich bei der Analyse von Hadoops RPC (Remote Procedure Call Protocol), ein Protokoll, das Dienste von Remote -Computerprogrammen über das Netzwerk anfordert, ohne die zugrunde liegende Netzwerktechnologie zu verstehen. Sie können sich unter: http://baike.baidu.com/view/32726.htm) beziehen. Es wurde festgestellt, dass die Implementierung des RPC -Mechanismus von Hadoop hauptsächlich zwei Technologien verwendet: dynamisches Proxy (Sie können sich auf den Blog beziehen: http://weixiaolyey.com/blog/147774) und JAVAILIA. Um den RPC -Quellcode von Hadoop korrekt zu analysieren, ist es denke, dass es notwendig ist, die Prinzipien und die spezifische Umsetzung von Java Nio zuerst zu untersuchen.
In diesem Blog analysiere ich Java Nio hauptsächlich aus zwei Richtungen
Inhaltsverzeichnis:
eins. Der Unterschied zwischen Java Nio und Blockierung i/o
1. Blockieren von E/A -Kommunikationsmodell
2. Java NIO -Prinzip und Kommunikationsmodell 2. Java NIO Server- und Client -Code -Implementierung
Spezifische Analyse:
eins. Der Unterschied zwischen Java Nio und Blockierung i/o
1. Blockieren von E/A -Kommunikationsmodell
Wenn Sie jetzt ein gewisses Verständnis für das Blockieren von I/O haben, wissen wir, dass die Blockierung der I/A beim Aufrufen der Methode "InputStream.read () blockiert wird, bis die Daten vor der Rückkehr eintrifft (oder Zeitüberschreitungen). In ähnlicher Weise blockiert es beim Aufrufen der Methode serversocket.accept (), bis es vor der Rückkehr eine Clientverbindung gibt. Nachdem sich jeder Client verbindet, startet der Server einen Thread, um die Anfrage des Clients zu verarbeiten. Das Kommunikationsmodelldiagramm des Blockierens von E/A lautet wie folgt:
Wenn Sie es sorgfältig analysieren, werden Sie auf jeden Fall feststellen, dass es einige Nachteile des Blockierens von i/o gibt. Basierend auf dem Blockierungs -I/A -Kommunikationsmodell habe ich seine beiden Nachteile zusammengefasst:
1. Wenn es viele Clients gibt, wird eine große Anzahl von Verarbeitungsfäden erstellt. Und jeder Faden nimmt Stapelraum und eine CPU -Zeit ein
2. Blockieren kann zu einem häufigen Kontextschalter führen, und die meisten Kontextschaltungen können bedeutungslos sein.
In diesem Fall hat nicht blockierende I/O ihre Anwendungsaussichten.
2. Java Nio -Prinzip und Kommunikationsmodell
Java Nio wurde in JDK1.4 begonnen, und es kann gesagt werden, dass es sowohl "neuer I/O" als auch nicht blockierender I/O ist. So funktioniert Java Nio:
1. Ein dedizierter Thread verarbeitet alle IO -Ereignisse und ist für die Verteilung verantwortlich.
2. Ereignisorientierter Mechanismus: Trigger, wenn ein Ereignis eintrifft, anstatt Ereignisse gleichzeitig zu überwachen.
3. Thread -Kommunikation: Threads kommunizieren durch Warten, Benachrichtigungen und andere Mittel. Stellen Sie sicher, dass jeder Kontextschalter Sinn macht. Reduzieren Sie unnötige Gewindeschaltung.
Nachdem ich einige Informationen gelesen hatte, habe ich das schematische Diagramm von Java Nio gepostet, das ich verstehe:
(Hinweis: Der Verarbeitungsfluss jedes Threads liest wahrscheinlich Daten, Dekodierung, Computerverarbeitung, Codierung und Senden von Antworten.)
Der Java Nio -Server muss nur einen speziellen Thread starten, um alle IO -Ereignisse zu verarbeiten. Wie wird dieses Kommunikationsmodell implementiert? Haha, lass uns gemeinsam sein Geheimnis erforschen. Java Nio verwendet einen Zwei-Wege-Kanal für die Datenübertragung und nicht für einen Einwegstrom, und Ereignisse von Interesse können auf dem Kanal registriert werden. Insgesamt gibt es vier Ereignisse:
| Ereignisname | Entsprechender Wert |
| Der Server empfängt Clientverbindungsereignisse | SelectionKey.op_accept (16) |
| Clientverbindungsserverereignis | SelectionKey.op_connect (8) |
| Ereignisse lesen | SelectionKey.op_read (1) |
| Schreiben Sie Ereignisse | SelectionKey.op_write (4) |
Der Server und der Client verwalten jeweils ein Objekt, das einen Kanal verwaltet, den wir einen Selektor nennen, mit dem Ereignisse auf einem oder mehreren Kanälen erfasst werden können. Nehmen wir den Server als Beispiel. Wenn ein Leseereignis auf dem Selektor des Servers registriert ist, sendet der Client irgendwann einige Daten an den Server. Beim Blockieren von E/O wird die Read () -Methode aufgerufen, um die Daten zu blockieren, und der NIO -Server fügt dem Selektor ein Read -Ereignis hinzu. Der Verarbeitungs -Thread des Servers wird zum Zugriff auf den Selektor gefragt. Wenn ein Interesseereignis beim Zugriff auf den Selektor ankommt, wird diese Ereignisse verarbeitet. Wenn kein Interesseereignis ankommt, blockiert der Verarbeitungs -Thread, bis das Interesseereignis eintrifft. Im Folgenden finden Sie ein schematisches Diagramm des Kommunikationsmodells von Java nio, das ich verstehe:
zwei. Java NIO Server- und Client -Code -Implementierung
Um Java Nio besser zu verstehen, ist Folgendes eine einfache Code -Implementierung für Server und Client.
Server:
Paket cn.nio; importieren java.io.ioException; importieren java.net.inetsocketaddress; Import Java.nio.ByTebuffer; Import Java.nio.Channels.SelectionKey; Import Java.nio.Channels.Selector; import Java.nio.channels.serversocketchannel; import Java.nio.channels.socketchannel; Import Java.util.iterator; /*** NIO Server* @Author Small Path*/Public Class NioServer {// Channel Manager Private Selector Selector; / ** * einen ServerSocket -Kanal erhalten und eine Initialisierungsarbeit auf dem Kanal * @Param Port gebundene Port * @throws ioException */ public void InitServer (int port) löscht ioException {// einen ServerSocket Channel ServerSocketAntel Serverchannel = ServerSocketAndannel (); // Setzen Sie den Kanal auf nicht blockierende serverchannel.configureBlocking (false); // Binden Sie das ServerSocket, das diesem Kanal entspricht, an die Port -Port -Serverchannel.socket (). BIND (New InetSocketaddress (Port)); // einen Channel Manager this.selector = selector.open () erhalten; // Binden Sie den Kanalmanager an den Kanal und registrieren Sie das Ereignis von SelectionKey.op_accept für den Kanal. Nach der Registrierung des Ereignisses kehrt Selector.Select () zurück, wenn das Ereignis ankommt. Wenn das Ereignis nicht Selector erreicht. Select () blockiert. serverchannel.register (selector, selectionKey.op_accept); } /*** Verwenden Sie die Umfrage, um zuzuhören, ob es Ereignisse auf dem Selektor gibt, die verarbeitet werden müssen. Wenn ja, wird es verarbeitet * @throws ioException */ @Suppresswarnings ("Deaktiviert") public void hören () löst ioException {System.out.println ("Server-Seite erfolgreich!"). // Umfragen zum Zugriff auf Selector (true) {// Wenn das registrierte Ereignis ankommt, gibt die Methode zurück; Andernfalls blockiert die Methode den Selector.Select (); // Holen Sie sich den Iterator des ausgewählten Elements im Selektor, und der ausgewählte Element ist der registrierte Ereignis -Iterator item = this.selector.SelectedKeys (). Iterator (); while (ite.hasnext ()) {SelectionKey key = (selectionKey) ite.next (); // den ausgewählten Schlüssel löschen, um eine wiederholte Verarbeitung von item.remove () zu verhindern; // Client fordert das Verbindungsereignis an, wenn (key.isocceptable ()) {ServerSocketchannel Server = (ServerSocketchannel). CHANNEL (); // den Kanal erhalten, um eine Verbindung zum Client -SocketAntel Channel = server.accept () herzustellen; // auf nicht blockierende Kanal einstellen. // Sie können Informationen hier Kanal.Write (byteBuffer.wrap (neuer String ("String (" eine Nachricht an den Client senden "). GetByTes ()) senden. // Nachdem die Verbindung mit dem Kunden erfolgreich ist, müssen Sie die Leseberechtigungen für den Kanal festlegen, um die Informationen des Kunden zu erhalten. Channel.register (this.Selector, SelectionKey.op_read); // Ein lesbares Ereignis wurde erhalten} else if (key.isreadable ()) {read (key); }}}} / ** * Verarbeitung von Ereignissen mit dem Lesen von Nachrichten, die vom Client gesendet wurden Socketchannel Channel = (Socketchannel) key.channel (); // Erstellen Sie einen Lesepuffer -ByteBuffer Buffer = byteBuffer.alcode (10); Channel.Read (Puffer); byte [] data = buffer.Array (); String msg = new String (Daten) .trim (); System.out.println ("Der Server empfielt die Nachricht:"+msg); ByteBuffer outbuffer = bytebuffer.wrap (msg.getBytes ()); Channel.Write (Outbuffer); // Senden Sie die Nachricht an den Client}/ *** Start Server -Test* @throws ioException*/ public static void main (String [] args) löst IOException {NioServer server = new NioServer () aus; server.initServer (8000); server.Listen (); }} Kunde:
Paket cn.nio; importieren java.io.ioException; importieren java.net.inetsocketaddress; Import Java.nio.ByTebuffer; Import Java.nio.Channels.SelectionKey; Import Java.nio.Channels.Selector; import Java.nio.channels.socketchannel; Import Java.util.iterator; /*** NIO Client* @Author Small Path*/Public Class Nioclient {// Channel Manager Private Selector Selector; / ** * Erhalten Sie einen Socket -Kanal und führen Sie die Initialisierung des Kanals ab * @param IP Die IP des Servers, das mit * @param Port angeschlossen ist. Die Portnummer des Servers, das mit * @Throws ioException */ public void initclient (String ip, int port) angeschlossen ist, löst IoException {// einen Socket Channel -SocketAntel -Channel = socketchannel.open (). // Setzen Sie den Kanal auf nicht blockierende Kanal.ConfigureBlocking (false); // einen Channel Manager this.selector = selector.open () erhalten; // Der Client stellt eine Verbindung zum Server her. Tatsächlich implementiert die Methodenausführung die Verbindung nicht. Sie müssen anrufen // Channel.finishConnect () verwenden; in der Methode hören () zum Vervollständigen des Verbindungskanals.Connect (New InetSocketaddress (IP, Port)); // Binden Sie den Channel -Manager an den Kanal und registrieren Sie das Ereignis selectionKey.op_connect für den Kanal. Channel.register (Selector, SelectionKey.op_connect); } /*** Verwenden Sie die Umfrage, um zuzuhören, ob es Ereignisse auf dem Selektor gibt, die verarbeitet werden müssen. In diesem Fall wird es verarbeitet * @throws ioException */ @Suppresswarnings ("Deaktiviert") public void hören () löst ioException {// ab, um auf den Selektor zuzugreifen, während (true) {selector.select (); // den Iterator für das ausgewählte Element im Selector Iterator item = this.selector.selectedKeys (). Iterator () erhalten; while (ite.hasnext ()) {SelectionKey key = (SelectionKey) item.Next (); // den ausgewählten Schlüssel löschen, um eine wiederholte Verarbeitung von item.remove () zu verhindern; // Das Verbindungsereignis erfolgt auf (key.isconnectable ()) {Socketchannel Channel = (Socketchannel) .Channel (); // Wenn eine Verbindung hergestellt wird, vervollständigen Sie die Verbindung, wenn (Channel.isconnectionPending ()) {Channel.finishConnect (); } // auf nicht blockierende Kanal einstellen.ConfigureBlocking (false); // Sie können Informationen an den Server -Kanal senden. // Nachdem die Verbindung mit dem Server erfolgreich ist, muss der Kanal für die Leseberechtigungen festgelegt werden, um die Serverinformationen zu erhalten. Channel.register (this.Selector, SelectionKey.op_read); // Ein lesbares Ereignis wurde erhalten} else if (key.isreadable ()) {read (key); } } } } /** * Processing events that read messages sent by the server* @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ //Same as the server's read method} /** * Start client test* @throws IOException */ public static void main(String[] args) throws IOException { NIOClient client = new Nioclient (); Client.initclient ("Localhost", 8000); Client.Listen (); }}Zusammenfassung:
Schließlich wird die dynamische Proxy- und Java -Nio -Analyse abgeschlossen. Haha, Folgendes ist die Analyse des Quellcode des RPC -Mechanismus von Hadoop. Die Blog -Adresse lautet: http://weixiaolu.iteye.com/blog/1504898. Wenn Sie jedoch Einwände gegen Ihr Verständnis von Java Nio haben, können Sie gerne gemeinsam besprechen.
Wenn Sie nachdrucken müssen, geben Sie bitte die Quelle an: http://weixiaolu.iteye.com/blog/1479656