Dieser Artikel wird von flach bis tief von der traditionellen Biografie über NIO bis AIO eingeführt und wird von einer vollständigen Code -Erklärung begleitet.
Im folgenden Code wird ein Beispiel verwendet: Der Client sendet eine Zeichenfolge der Gleichung an den Server, und der Server gibt das Ergebnis nach der Berechnung an den Client zurück.
Alle Anweisungen für den Code werden direkt als Kommentare verwendet und in den Code eingebettet, was beim Lesen des Codes einfacher zu verstehen ist. Eine Werkzeugklasse zur Berechnung des Ergebnisses wird im Code verwendet, siehe den Code -Abschnitt des Artikels.
Empfohlene Artikel für verwandte Grundkenntnisse:
Einführung in das Linux -Netzwerk -E/A -Modell (Bilder und Text)
Java-Parallelität (Multi-Threading)
1. Bioprogrammierung
1.1. Traditionelle Bio -Programmierung
Das Grundmodell der Netzwerkprogrammierung ist das C/S -Modell, dh die Kommunikation zwischen zwei Prozessen.
Der Server bietet IP- und Höranschlüsse. Der Client initiiert eine Verbindungsanforderung über die Verbindungsoperationsadresse, die der Server anhören möchte. Wenn die Verbindung erfolgreich hergestellt wird, können beide Parteien durch Sockets kommunizieren, wenn die Verbindung erfolgreich hergestellt wird.
Bei der Entwicklung des traditionellen Synchronisationsblockierungsmodells ist ServerSocket für die Bindung von IP -Adressen und das Starten von Höranschlüssen verantwortlich. Socket ist für die Initiierung von Verbindungsoperationen verantwortlich. Nachdem die Verbindung erfolgreich ist, führen beide Parteien eine synchrone Blockierungskommunikation über die Eingangs- und Ausgangsströme durch.
Eine kurze Beschreibung des BIO -Server -Kommunikationsmodells: Der Server, der das Biokommunikationsmodell verwendet, ist normalerweise ein unabhängiger Akzeptor -Thread, der für das Anhören der Verbindung des Clients verantwortlich ist. Nach Erhalt der Client -Verbindungsanforderung erstellt er einen neuen Thread für jeden Client für die Verarbeitung von Link und verarbeitet ihn nicht und gibt die Antwort dann über den Ausgabestream an den Client zurück, und der Thread wird zerstört. Das heißt, ein typisches Modell mit einem Umgang mit dem gesamten Nächten.
Traditionelles Bio -Kommunikationsmodelldiagramm:
Das größte Problem bei diesem Modell ist, dass es an elastischen Skalierungsfunktionen fehlen. Wenn die Anzahl der gleichzeitigen Zugriffe auf den Client zunimmt, ist die Anzahl der Threads auf dem Server proportional zur Anzahl der gleichzeitigen Zugriffe auf dem Client. Themen in Java sind auch relativ wertvolle Systemressourcen. Nachdem die Anzahl der Threads schnell erweitert wird, sinkt die Leistung des Systems stark. Wenn die Anzahl der Zugriffe weiter zunimmt, wird das System schließlich sterben.
Serverquellcode erstellt von Synchronous Blocking I/O:
Paket com.Anxpp.io.calculator.bio; importieren java.io.ioException; importieren java.net.serversocket; importieren java.net.socket; /** * Bio -Server -Quellcode * @Author YangtaO__Anxpp.com * @Version 1.0 */Public Final Class ServerVernormal {// Standard -Portnummer private statische int default_port = 12345; // Singleton ServerSocket Private Static ServerSocket Server; // Setzen Sie den Höranschluss gemäß den eingehenden Parametern. Wenn es keine Parameter gibt, rufen Sie die folgende Methode auf und verwenden Sie den Standardwert Public Static void start () löscht IOException {// den Standardwert Start (default_port) aus; } // Auf diese Methode wird nicht auf eine große Anzahl von gleichzeitigen Arten zugegriffen, und es besteht keine Notwendigkeit, die Effizienz zu berücksichtigen. Synchronisieren Sie einfach die Methode direkt öffentlich synchronisiertes statisches Void Start (int Port) löst IoException {if (Server! = NULL) zurück; Versuchen Sie {// ServerSocket über den Konstruktor // Wenn der Port legal und untätig ist, hört der Server erfolgreich an. Server = New ServerSocket (Port); System.out.println ("Der Server wurde gestartet, Portnummer:" + Port); // Client -Verbindungen über drahtlose Schleife anhören // Wenn kein Client -Zugriff vorhanden ist, wird der Annahmeoperation blockiert. while (true) {Socket Socket = server.accept (); // Wenn ein neuer Client -Zugriff vorhanden ist, wird der folgende Code ausgeführt // Dann erstellen Sie einen neuen Thread, um diesen Socket -Link neuen Thread (neuer ServerHandler (Socket)) zu verarbeiten. Start (); }} endlich {// einige notwendige Reinigungsarbeiten if (Server! = NULL) {System.out.println ("Der Server ist geschlossen."); server.close (); server = null; }}}} Client Message Processing Thread Serverhandler Quellcode:
Paket com.Anxpp.io.calculator.bio; Import Java.io.BufferedReader; importieren java.io.ioException; importieren java.io.inputStreamReader; Import Java.io.printwriter; importieren java.net.socket; import com.anxpp.io.utils.calculator; / *** Client -Thread* @author yangtao__anxpp.com* Socket -Link für einen Client*/ öffentliche Klasse ServerHandler implementiert Runnable {private Socket Socket; public ServerHandler (Socket Socket) {this.socket = Socket; } @Override public void run () {bufferedReader in = null; Printwriter out = null; try {in = new bufferedReader (neuer InputStreamReader (socket.getInputStream ())); out = neuer Printwriter (Socket.GetOutputStream (), True); String -Ausdruck; String -Ergebnis; Während (true) {// eine Zeile durch BufferedReader lesen // Wenn Sie den Schwanz des Eingabestreams gelesen haben, geben Sie Null zurück und beenden Sie die Schleife //, wenn Sie einen Nicht-Null-Wert erhalten, versuchen Sie, das Ergebnis zu berechnen und zurückzugeben, wenn ((Ausdruck = in.Readline ()) == NULL) Break; System.out.println ("Der Server hat eine Nachricht erhalten:" + Ausdruck); try {result = rechner.cal (Expression) .ToString (); } catch (Exception e) {result = "rechner.cal (Expression) .ToString ();} catch (exception e) {e.printstacktrace ();} schließlich {// einige notwendige Aufräumarbeiten if (in! = null) {try {in null) {out.close (); Client -Quellcode erstellt von Synchronous Blocking I/O:
Paket com.Anxpp.io.calculator.bio; Import Java.io.BufferedReader; importieren java.io.ioException; importieren java.io.inputStreamReader; Import Java.io.printwriter; importieren java.net.socket; /** * Client erstellt durch Blockieren von i/o * @author yangtao__anxpp.com * @VERSION 1.0 */public class Client {// Standard -Portnummer private statische int default_server_port = 12345; private statische String default_server_ip = "127.0.0.1"; public static void send (String Ausdruck) {send (default_server_port, Ausdruck); } public static void send (int port, String -Expression) {System.out.println ("Arithmetische Ausdruck ist:" + Ausdruck); Socket Socket = Null; BufferedReader in = null; Printwriter out = null; try {socket = new Socket (default_server_ip, port); in = neuer BufferedReader (neuer InputStreamReader (Socket.getInputStream ())); out = neuer Printwriter (Socket.GetOutputStream (), True); out.println (Ausdruck); System.out.println ("___ Das Ergebnis ist:" + in.readline ()); } catch (Ausnahme e) {e.printstacktrace (); } endlich {// sind die notwendige Reinigungsarbeit, wenn (in! = null) {try {in.close (); } catch (ioException e) {e.printstacktrace (); } in = null; } if (out! = null) {out.close (); out = null; } if (socket! = null) {try {socket.close (); } catch (ioException e) {e.printstacktrace (); } socket = null; }}}} Testen Sie den Code, um die Anzeige der Ausgabeergebnisse in der Konsole zu erleichtern, und geben Sie ihn in das gleiche Programm (JVM) ein, um auszuführen:
Paket com.Anxpp.io.calculator.bio; importieren java.io.ioException; import Java.util.random; /** * Testmethode * @Author yangtao__anxpp.com * @Version 1.0 */public class test {// Testen Sie die Hauptmethode public static void main (string [] args) löst die InterruptedException aus {// den Server -New Thread (New Runnable () {@override public void run () {try {try {) {server {server betbet) aus. e.printstacktrace ();}}}). start (); // Vermeiden Sie den Client, der den Code ausführt, bevor der Server startet. // Die Client-Zeichen-Operatoren ausführen [] = {'+', '-', '*', '/'}; Randal random = new random (system.currentTimemillis ()); neuer Thread (neuer Runnable () {@Suppresswarnings ("static-access") @Override public void run () {while (true) {// Random erzeugt arithmetische Ausdrucksstring-Ausdruck = random.Nextint (10)+""+Operatoren [Random.Nextinta (4)]+(Random.NEXT (10). Thread.CurrentThread (). } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Faden. CurrentThread (). Sleep (random.Nextint (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { Faden. CurrentThread (). Sleep (random.Nextint (1000)); }}}}}}}} Die Ergebnisse eines der Läufe:
Der Server wurde gestartet, Portnummer: 12345 Arithmetisches Ausdruck ist: 4-2 Server Empfangen Sie die Nachricht: 4-2 ___ Ergebnis ist: 2 Arithmetische Ausdruck ist: 5-10 Server Empfangen Sie die Meldung: 5-10__ Ergebnis ist: -5 Arithmetische Ausdruck: 0-9 Server Empfangen Sie die Nachricht: 0-9__ Ergebnis: -9 Arithhmetik Ausdruck: 0+6 Server. Empfangen Sie die Nachricht: 1/6__ Ergebnis ist: 0,16666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666
Aus dem obigen Code ist es leicht zu erkennen, dass das Hauptproblem der Biografie darin besteht, dass der Server bei einem neuen Client den Zugriff anfordert, der Server einen neuen Thread erstellen muss, um diesen Link zu verarbeiten, der nicht in Szenarien angewendet werden kann, in denen eine hohe Leistung und eine hohe Parallelität erforderlich sind (eine große Anzahl neuer Threads beeinflussen die Serverleistung und sogar einen Streik ernsthaft).
1.2. Pseudo-asynchrones E/A-Programmieren
Um dieses Modell mit einem Konnektion zu verbessern, können wir einen Thread-Pool verwenden, um diese Threads zu verwalten (für weitere Informationen finden Sie in dem zuvor angegebenen Artikel ein Modell für ein oder mehrere Threads, um N-Clients zu verarbeiten (aber die zugrunde liegende Ebene verwendet immer noch ein synchrones Blockierungs-I/O).
Pseudo-asynchrones E/A-Modelldiagramm:
Die Implementierung ist sehr einfach. Wir müssen nur den neuen Thread übergeben, um den Thread -Pool -Management zu erzielen, und gerade jetzt den Servercode ändern:
Paket com.Anxpp.io.calculator.bio; importieren java.io.ioException; importieren java.net.serversocket; importieren java.net.socket; import Java.util.concurrent.executorService; import Java.util.concurrent.executors; /** * Bio-Server-Quellcode__Pseudo-asynchronous i/o * @Author yangta __anxpp.com * @Version 1.0 */public Final Class Serverbetter {// Standard-Portnummer private statische statische int default_port = 12345; // Singleton ServerSocket Private Static ServerSocket Server; // Singleton Private Static ExecutorService Executorservice = Executors.NewFixedThreadpool (60); // Setzen Sie den Höranschluss gemäß den eingehenden Parametern. Wenn es keinen Parameter gibt, rufen Sie die folgende Methode auf und verwenden Sie den Standardwert Public Static void start () löscht IOException {// den Standardwert Start (default_port) aus; } // auf diese Methode wird nicht in einer großen Anzahl von gleichzeitig zugegriffen, und es besteht keine Notwendigkeit, die Effizienz zu berücksichtigen. Synchronisieren Sie die Methode einfach die öffentliche synchronisierte statische Void Start (int Port) löst IOException {if (server! = Null) zurück; Versuchen Sie {// ServerSocket über den Konstruktor // Wenn der Port legal und untätig ist, hört der Server erfolgreich an. Server = New ServerSocket (Port); System.out.println ("Der Server wurde gestartet, Portnummer:" + Port); // Die Clientverbindung über drahtlose Schleife überwachen // Wenn kein Client -Zugriff vorhanden ist, wird sie für den Akzeptanzvorgang blockiert. while (true) {Socket Socket = server.accept (); // Wenn ein neuer Client -Zugriff vorhanden ist, wird der folgende Code ausgeführt // Dann erstellen Sie einen neuen Thread, um den Socket -Link -ExecutorService.execute (New ServerHandler (Socket)) zu verarbeiten. }} endlich {// einige notwendige Reinigungsarbeiten if (Server! = NULL) {System.out.println ("Der Server ist geschlossen."); server.close (); server = null; }}}} Die Testläufergebnisse sind gleich.
Wir wissen, dass, wenn wir CachedThreadpool -Thread -Pool verwenden (nicht die Anzahl der Threads einschränken, wenn nicht klar, bitte siehe Artikel, die zu Beginn des Artikels angegeben sind). Tatsächlich sieht es auch wie ein 1: 1 -Client: Thread Count -Modell aus. Unter Verwendung von BesteThreadpool steuern wir die maximale Anzahl von Threads effektiv, stellen die Kontrolle begrenzter Ressourcen des Systems sicher und implementieren das N: M-Pseudo-asynchronische E/A-Modell.
Da jedoch die Anzahl der Threads begrenzt ist, können bei einer großen Anzahl von gleichzeitigen Anforderungen die maximalen Zahl nur warten, bis sich kostenlose Threads im Thread -Pool wiederverwendet werden können. Wenn der Socket -Eingangsstrom gelesen wird, wird er blockiert, bis er auftritt:
Wenn das Lesen von Daten langsam (z. B. eine große Datenmenge, langsame Netzwerkübertragung usw.) und große Mengen an Parallelität ist, können daher auch andere Zugriffsnachrichten ständig gewartet werden, was der größte Nachteil ist.
Die NIO, die später eingeführt wird, kann dieses Problem lösen.
2. NIO -Programmierung
Die New Java -E/A -Bibliothek wird in das Java.nio eingeführt.* Paket in JDK 1.4 mit zunehmender Geschwindigkeit. Tatsächlich wurde das "alte" I/A -Paket mit NIO neu implementiert, und wir können davon profitieren, auch wenn wir NiO -Programmierung nicht explizit verwenden. Geschwindigkeitsverbesserungen können sowohl in der Datei -E/A- als auch in der Netzwerk -E/A auftreten. In diesem Artikel wird jedoch nur in letzterem erörtert.
2.1. Einführung
Wir betrachten NIO im Allgemeinen als neue E/O (auch den offiziellen Namen), da es in der alten E/A -Bibliothek neu ist (tatsächlich wurde sie in JDK 1.4 eingeführt, aber dieses Substantiv wird weiterhin lange verwendet, auch wenn sie "alt" sind, und es erinnert uns auch daran, dass wir es bei der Benennung von großer Bedeutung haben müssen, und haben sie auch vorgeschrieben, und haben große Änderungen vorgenommen. Es wird jedoch von vielen Menschen, dh nicht blockierende E/O, als nicht blockierter E/O bezeichnet, da dies genannt wird, es kann seine Eigenschaften besser widerspiegeln. Der NIO im folgenden Text bezieht sich nicht auf die gesamte neue E/A -Bibliothek, blockiert jedoch nicht E/O.
NIO bietet zwei verschiedene Socket -Kanalimplementierungen: Socketchannel und ServerSocketchannel, die im herkömmlichen Bio -Modell Socket und ServerSocket entsprechen.
Beide neu hinzugefügten Kanäle unterstützen Blockier- und nicht blockierende Modi.
Die Verwendung des Blockierungsmodus ist so einfach wie die traditionelle Unterstützung, aber die Leistung und Zuverlässigkeit ist nicht gut. Der nicht blockierende Modus ist genau das Gegenteil.
Bei niedrigen Anwendungen mit niedriger Konsequenz, synchroner Blockierungs-E/A können Anwendungen zur Verbesserung der Entwicklungsrate und einer besseren Wartung verwendet werden. Bei hochladigen, hochverwöhnlichen Anwendungen (Netzwerken) sollte der nicht blockierende NIO-Modus zur Entwicklung verwendet werden.
Das Grundwissen wird zuerst unten eingeführt.
2.2. Pufferpuffer
Ein Puffer ist ein Objekt, das einige Daten enthält, die geschrieben oder ausgelesen werden sollen.
In der NIO -Bibliothek werden alle Daten in einem Puffer verarbeitet. Beim Lesen von Daten wird es direkt in den Puffer gelesen. Beim Schreiben von Daten wird es auch in den Puffer geschrieben. Wenn Sie in NIO auf Daten zugreifen, wird sie über einen Puffer betrieben.
Ein Puffer ist tatsächlich ein Array und bietet Informationen wie strukturierten Zugriff auf Daten und die Wartung von Lese- und Schreiborten.
Die spezifischen Cache -Bereiche sind: Bytebuffe, Charbuffer, Shortbuffer, Intbuffer, Longbuffer, FloatBuffer, DoubleBuffer. Sie implementieren dieselbe Schnittstelle: Puffer.
2.3. Kanal
Unser Lesen und Schreiben von Daten muss durch den Kanal geleitet werden, der wie ein Wasserrohr ist, ein Kanal. Der Unterschied zwischen einem Kanal und einem Stream besteht darin, dass der Kanal bidirektional ist und zum Lesen, Schreiben und gleichzeitigen Lesen und Schreiben verwendet werden kann.
Die Kanäle des zugrunde liegenden Betriebssystems sind im Allgemeinen Vollduplex, sodass ein Full-Duplex-Kanal die API des zugrunde liegenden Betriebssystems besser abbilden kann als ein Stream.
Kanäle sind hauptsächlich in zwei Kategorien unterteilt:
Die ServerSocketchannel und Socketchannel, die in den folgenden Code beteiligt sind, sind beide Unterklassen von SelectableChannel.
2.4. Multiplexer -Selektor
Der Selektor ist die Grundlage für die Java -Nio -Programmierung.
Der Selector bietet die Möglichkeit, Bereitschaftsaufgaben auszuwählen: Der Selektor stellt den darauf registrierten Kanal ständig ab. Wenn ein Lese- oder Schreibereignis auf einem Kanal stattfindet, befindet sich der Kanal im rede Zustand und wird vom Selektor befragt. Anschließend kann der Satz Bereitschaftskanäle über den SelectionKey erhalten werden, um nachfolgende E/A -Operationen auszuführen.
Ein Selektor kann mehrere Kanäle gleichzeitig befragen, da der JDK EPOLL () anstelle der herkömmlichen Auswahlimplementierung verwendet. Es gibt keine Grenze für den maximalen Verbindungsgriff 1024/2048. Daher muss nur ein Thread für die Wahl des Selektors verantwortlich sein, und er kann auf Tausende von Kunden zugreifen.
2.5. NIO -Server
Der Code scheint viel komplizierter zu sein als herkömmliche Socket -Programmierung.
Fügen Sie einfach den Code ein und geben Sie die Codebeschreibung in Form von Kommentaren an.
Serverquellcode von NIO erstellt:
Paket com.Anxpp.io.calculator.nio; öffentlicher Klassenserver {private static int default_port = 12345; private static serverHandle serverHandle; public static void start () {start (default_port); } public static synchronisierte void start (int port) {if (serverhandle! = null) serverHandle.stop (); ServerHandle = New ServerHandle (Port); neuer Thread (ServerHandle, "Server"). start (); } public static void main (String [] args) {start (); }} Serverhandle:
Paket com.Anxpp.io.calculator.nio; importieren java.io.ioException; importieren java.net.inetsocketaddress; Import Java.nio.ByTebuffer; Import Java.nio.Channels.SelectionKey; Import Java.nio.Channels.Selector; import Java.nio.channels.serversocketchannel; import Java.nio.channels.socketchannel; Import Java.util.iterator; Java.util.set importieren; import com.anxpp.io.utils.calculator; / ** * NIO Server * @Author yangtao__anxpp.com * @Version 1.0 */ public class ServerHandle implements Runnable {private selektor Selector; private serversSocketchannel serverchannel; Der private volatile Boolean begann; /*** Konstruktor* @param Port Geben Sie die Portnummer an*/public serverhandle (int port) {try {// selector = selector.open (); // Öffnen Sie die Hörkanal -Serverchannel = ServerSocketchannel.open (); // Wenn wahr, wird dieser Kanal im Blockierungsmodus platziert. Wenn dies falsch ist, wird dieser Kanal in nicht blockierende Modus serverchannel.ConfigureBlocking (false) platziert. // Nicht-Blocking-Modus aktivieren // Bindungs-Port-Rückstand ist auf 1024 serverchannel.socket (). Bind (New InetSocketaddress (Port), 1024) eingestellt; // Client -Verbindungsanforderung serverchannel.register überwachen (Selector, SelectionKey.op_accept); // markieren Der Server ist aktiviert gestartet = true; System.out.println ("Server wurde gestartet, Portnummer:" + Port); } catch (ioException e) {e.printstacktrace (); System.exit (1); }} public void stop () {start = false; } @Override public void run () {// Schleifen Sie durch den Selektor, während (gestartet) {try {// Ob ein Lese- und Schreibereignis vorhanden ist, der Selektor wird auf jeden 1S -Selektor aufgewacht. Select (1000); // Blockierung wird nur so weitergehen, wenn mindestens ein registriertes Ereignis stattfindet. // selector.select (); Set <SelectionKey> keys = selector.selectedKeys (); Iterator <SelectionKey> iT = Keys.Iderator (); SelectionKey Key = null; while (it.hasnext ()) {key = it.next (); it.remove (); Versuchen Sie {HandleInput (Schlüssel); } catch (Ausnahme e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}}} catch (throwable t) {t.printstacktrace (); }} // Nach der Schließung des Selektors werden die verwalteten Ressourcen automatisch veröffentlicht, wenn (selektor! = Null) try {selector.close (); } catch (Ausnahme e) {e.printstacktrace (); }} private void handleInput (SelectionKey -Schlüssel) löscht IOException {if (key.isvalid ()) {// Die Anforderungsnachricht für einen neuen Zugriff if (key.isacceptable ()) {ServerSocketAntel SSC = (ServerSocketchannel) key.channel () verarbeiten; // Erstellen einer Socketchannel-Instanz über die Akzeptanz von ServerSocketchannel // Durch Abschluss dieser Operation wird das TCP-Drei-Wege-Handshake abgeschlossen, und der physische Link der TCP wird offiziell festgelegt. Socketchannel sc = ssc.accept (); // auf nicht blockierende sc.configureBlocking (false) einstellen; // Registrieren Sie sich als Lesen sc.register (selector, selectionKey.op_read); } // Nachricht lesen if (key.isreadable ()) {Socketchannel sc = (socketchannel) key.channel (); // ByteBuffer erstellen und einen 1m Buffer ByteBuffer Buffer = bytebuffer.alcode (1024) eröffnen; // Anforderungsstrom lesen und die Anzahl der Bytes zurückgeben, die int bytes = sc.Read (Puffer) gelesen werden; // Bytes lesen und die Bytes codieren, wenn (readBytes> 0) {// die aktuelle Grenze des Puffers auf Position = 0 für nachfolgende Lesevorgänge von buffer.flip () festlegen; // Erstellen Sie ein Byte -Array basierend auf der Anzahl der lesbaren Bytes -Bytes = New Byte [buffer.Remaining ()]; // Kopieren Sie das buffer -lesbare Byte -Array in das neu erstellte Array Buffer.get (Bytes); String Expression = new String (Bytes, "UTF-8"); System.out.println ("Server empfangen eine Nachricht:" + Ausdruck); // Datenstring -Verarbeitungsstring ergebnis = null; try {result = rechner.cal (Expression) .ToString (); } catch (Ausnahme e) {result = "Berechnungsfehler:" + e.getMessage (); } // die Antwortmeldung Dowrite (sc, Ergebnis) senden; } // keine Bytes lesen und ignorieren // else if (readBytes == 0); // Der Link wurde geschlossen, wobei die Ressource sonst befreien wird, wenn (readBytes <0) {key.cancel (); sc.close (); }}}}} // Die Antwortnachricht asynchron private void dowrite (Socketchannel -Kanal, String -Antwort) löscht IOException {// die Nachricht als Byte -Array -Byte [] bytes = response.getBytes () aus. // bytebuffer erstellen gemäß der Array -Kapazität byteBuffer writeBuffer = bytebuffer.AlleLate (bytes.length); // das Byte -Array in den Puffer writeBuffer.put (Bytes) kopieren; // Flip -Operation writeBuffer.flip (); // das Byte -Array von Buffer Channel.write (writeBuffer) senden; // ***** Der Code für die Verarbeitung von "Halbpaket schreiben" ist hier nicht enthalten}}}Wie Sie sehen können, sind die Hauptschritte zum Erstellen eines NIO -Servers wie folgt:
Da die Antwortmeldung gesendet wird, ist Socketchannel auch asynchron und nicht blockiert. Es kann also nicht garantiert werden, dass die Daten, die gesendet werden müssen, gleichzeitig gesendet werden können, und es wird ein Problem beim Schreiben eines halben Pakets zu diesem Zeitpunkt geben. Wir müssen einen Schreibvorgang registrieren, den Selektor ständig befragen, um die nicht bereitgestellten Nachrichten zu senden, und dann die HasRemain () -Methode des Puffers verwenden, um festzustellen, ob die Nachricht gesendet wird.
2.6. NIO -Client
Es ist besser, den Code einfach hochzuladen. Der Prozess erfordert nicht zu viel Erläuterungen, er ist dem Servercode etwas ähnlich.
Kunde:
Paket com.Anxpp.io.calculator.nio; öffentliche Klasse Client {private statische String default_host = "127.0.0.1"; private static int default_port = 12345; private statische ClientHandle ClientHandle; public static void start () {start (default_host, default_port); } public static synchronisierte void start (String ip, int port) {if (ClientHandle! = null) clientHandle.stop (); ClientHandle = New ClientHandle (IP, Port); neuer Thread (ClientHandle, "Server"). start (); } // eine Nachricht an den Server senden public static boolean sendMsg (String msg) löst Ausnahme aus {if (msg.equals ("q")) return false; ClientHandle.Sendmsg (MSG); zurückkehren; } public static void main (String [] args) {start (); }} ClientHandle:
Paket com.Anxpp.io.calculator.nio; importieren java.io.ioException; importieren java.net.inetsocketaddress; Import Java.nio.ByTebuffer; Import Java.nio.Channels.SelectionKey; Import Java.nio.Channels.Selector; import Java.nio.channels.socketchannel; Import Java.util.iterator; Java.util.set importieren; / ** * nio client * @author yangtao__anxpp.com * @Version 1.0 */ Public Class ClientHandle implements Runnable {private String -Host; privater Int -Port; Privatauswahlauswahl; private Socketchannel Socketchannel; Der private volatile Boolean begann; public ClientHandle (String ip, int port) {this.host = ip; this.port = port; try {// Selector selector = selector.open () erstellen; // Öffnen Sie den Hörkanal socketchannel = socketchannel.open (); // Wenn wahr, wird dieser Kanal im Blockierungsmodus platziert. Wenn dies falsch ist, wird dieser Kanal im nicht blockierenden Modus Socketchannel.ConfigureBlocking (Falsch) platziert; // Nicht-Blocking-Modus geöffnet = true; } catch (ioException e) {e.printstacktrace (); System.exit (1); }} public void stop () {start = false; } @Override public void run () {try {doconnect (); } catch (ioException e) {e.printstacktrace (); System.exit (1); } // Schleifen Sie durch den Selektor, während (gestartet) {try {// unabhängig davon, ob es ein Lese- und Schreibereignis gibt, der Selektor wird jeden 1S -Selektor geweckt. Select (1000); // Blockieren, und es wird nur so weitergehen, wenn mindestens ein registriertes Ereignis stattfindet. // selector.select (); Set <SelectionKey> keys = selector.selectedKeys (); Iterator <SelectionKey> iT = Keys.Iderator (); SelectionKey Key = null; while (it.hasnext ()) {key = it.next (); it.remove (); Versuchen Sie {HandleInput (Schlüssel); } catch (Ausnahme e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). close (); }}}}}}}} catch (exception e) {e.printstacktrace (); System.exit (1); }} // Nach der Schließung des Selektors werden die verwalteten Ressourcen automatisch veröffentlicht, wenn (selektor! = Null) try {selector.close (); } catch (Ausnahme e) {e.printstacktrace (); }} private void handleInput (SelectionKey -Schlüssel) löst IOException {if (key.isvalid ()) {Socketchannel sc = (socketchannel) key.channel (); if (key.isconnectable ()) {if (sc.finishConnect ()); else system.exit (1); } // Lesen Sie die Meldung if (key.isreadable ()) {// Bytebuffer erstellen und öffnen Sie einen 1M -Buffer -ByteBuffer buffer = bytebuffer.Allecode (1024); // Lesen Sie den Anforderungscode -Stream und senden Sie die Anzahl der Lesebytes int ReadBytes = sc.read (Puffer) zurück. // Bytes lesen und die Bytes codieren, wenn (readBytes> 0) {// die aktuelle Grenze des Puffers auf Position = 0 für nachfolgende Lesevorgänge von buffer.flip () festlegen; // Byte -Array erstellen basierend auf der Anzahl der lesbaren Bytes im Buffer -Byte [] bytes = new Byte [buffer.Remaining ()]; // Kopieren Sie das buffer -lesbare Byte -Array in das neu erstellte Array Buffer.get (Bytes); String result = new String (Bytes, "UTF-8"); System.out.println ("Client empfangen eine Nachricht:" + Ergebnis); } // Keine Bytes gelesen wird ignoriert // else if (readBytes == 0); // Der Link wurde geschlossen, wobei die Ressource sonst befreien wird, wenn (readBytes <0) {key.cancel (); sc.close (); }}}}} // Nachrichten asynchron private void dowrite (Socketchannel -Kanal, String -Anforderung) aus löscht IOException {// die Nachricht als Byte -Array -Byte [] bytes = Anforderungs.getBytes () aus. // ByteBuffer erstellen basierend auf dem Array -Kapazitäts -ByteBuffer writeBuffer = bytebuffer.alcode (bytes.length); // Kopieren des Byte -Arrays in den Puffer WriteBuffer.put (Bytes); // Flip -Operation writeBuffer.flip (); // Senden Sie den Byte Array Channel.write (writeBuffer); // ***** Der Code für die Verarbeitung "Write Half-Packet" ist hier nicht enthalten} private void doconnect () löst IOException {if (socketchannel.connect (New InetSocketaddress (Host, Port))) aus; sonst socketchannel.register (selector, selectionKey.op_connect); } public void sendsg (String msg) löst Ausnahme aus {Socketchannel.register (Selector, SelectionKey.op_read); Dowrite (Socketchannel, MSG); }} 2.7. Demonstrationsergebnisse
Führen Sie zunächst den Server aus und führen Sie übrigens einen Client aus:
Paket com.Anxpp.io.calculator.nio; import Java.util.scanner; /** * Testmethode * @Author yangtao__anxpp.com * @version 1.0 */public class test {// testen Sie die Hauptmethode @SuppressWarnings ("Ressource") public static void main (String [] args) löst die Ausnahme {// run server.start (); // Vermeiden Sie den Client, der den Code -Thread ausführt. Sleep (100); // Client Client.start () ausführen; while (client.sendmsg (neuer Scanner (System.in) .Nextline ())); }} Wir können den Kunden auch separat ausführen, und die Effekte sind gleich.
Ergebnisse eines Tests:
Der Server wurde gestartet, Portnummer: 123451+2+3+4+5+6 Der Server empfing die Nachricht: 1+2+3+4+5+6 Der Client empfing die Nachricht: 211*2/3-4+5*6/7-8 Der Server empfing die Meldung: 1*2/3-4+5*6/7-8 Die Client empfangen die Nachricht: -7.0476190476190474:
Es gibt kein Problem mit mehreren Kunden.
3. AIO -Programmierung
NIO 2.0 führt das Konzept neuer asynchroner Kanäle ein und bietet Implementierungen asynchroner Dateikanäle und asynchroner Sockelkanäle.
Der Asynchronous-Socket-Kanal ist wirklich asynchroner nicht blockierender E/O, was der Ereignisantriebs-E/A (AIO) in der UNIX-Netzwerkprogrammierung entspricht. Es erfordert nicht zu viele Selektoren, die registrierten Kanäle zu befragen, um asynchrones Lesen und Schreiben zu erreichen und so das NIO -Programmiermodell zu vereinfachen.
Laden Sie einfach den Code hoch.
3.1. Server -Seitencode
Server:
Paket com.Anxpp.io.calculator.aio.server; / ** * AIO Server * @Author yangtao__anxpp.com * @Version 1.0 */ öffentlicher Klasse Server {private static int default_port = 12345; private statische Asyncserverhandler ServerHandle; öffentliches volatiles statisches Long ClientCount = 0; public static void start () {start (default_port); } public static synchronisierte void start (int port) {if (serverhandle! = null) return; serverhandle = new AsyncServerHandler (Port); neuer Thread (ServerHandle, "Server"). start (); } public static void main (String [] args) {server.start (); }} Asyncserverhandler:
Paket com.Anxpp.io.calculator.aio.server; importieren java.io.ioException; importieren java.net.inetsocketaddress; importieren java.nio.channels.asyncServersocketchannel; Import Java.util.Concurrent.Countdownlatch; öffentliche Klasse AsyncServerHandler implementiert Runnable {public Countdownlatch Latch; public asyncserversocketchannel -Kanal; public asyncserverhandler (int port) {try {// Server Channel = asynchronoServerSocketchannel.open (); // Port -Kanal binden. System.out.println ("Server wurde gestartet, Portnummer:" + Port); } catch (ioException e) {e.printstacktrace (); }} @Override public void run () {// Countdownlatch -Initialisierung // seine Funktion: Lassen Sie das aktuelle Feld ständig blockieren, bevor eine Reihe von Operationen ausgeführt werden. // Connection Channel.accept (this, New AcceptHandler ()); try {latch.await (); } catch (interruptedException e) {e.printstacktrace (); }}} AcceptHandler:
Paket com.Anxpp.io.calculator.aio.server; Import Java.nio.ByTebuffer; import Java.nio.channels.asynchronousSocketchannel; importieren java.nio.channels.completionHandler; // Verbindung als Handler öffentliche Klasse AcceptHandler implementiert CompletionHandler <AsynchronousSocketchannel, Asyncserverhandler> {@Override public void abgeschlossen (AsynchronousSocketchannel -Kanal, AsyncServerhandler ServerHandler) {// Antrag weiterhin Anfragen von anderen Clients -Server annehmen. System.out.println ("Anzahl der verbundenen Clients:" + server.clientCount); ServerHandler.Channel.accept (ServerHandler, this); // Erstellen Sie einen neuen Buffer -ByteBuffer Buffer = byteBuffer.alcode (1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; importieren java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (ioException e) {e.printstacktrace (); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2、Client端代码
Client:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); zurückkehren; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; importieren java.io.ioException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; privater Int -Port; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (ioException e) {e.printstacktrace (); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (ioException e) {e.printstacktrace (); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (ioException e) {e.printstacktrace (); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; importieren java.io.ioException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //Complete writing of all data if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //Read data ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("Data send failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; importieren java.io.ioException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Client received result:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("Data read failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. Prüfen
Prüfen:
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, es wird für das Lernen aller hilfreich sein und ich hoffe, jeder wird Wulin.com mehr unterstützen.