この記事では、伝統的なバイオからNIO、AIOまで浅いものから深いものに紹介され、完全なコードの説明が伴います。
例は次のコードで使用されます。クライアントは方程式の文字列をサーバーに送信し、サーバーは計算後に結果をクライアントに返します。
コードのすべての手順はコメントとして直接使用され、コードに埋め込まれています。これは、コードを読むときに理解しやすくなります。結果を計算するためのツールクラスは、コードで使用されます。記事のコードセクションを参照してください。
関連する基本知識に関する推奨記事:
LinuxネットワークI/Oモデルの紹介(写真とテキスト)
Javaの並行性(マルチスレッド)
1。バイオプログラミング
1.1。従来のバイオプログラミング
ネットワークプログラミングの基本モデルは、C/Sモデル、つまり2つのプロセス間の通信です。
サーバーはIPおよびリスニングポートを提供します。クライアントは、サーバーが聞きたい接続操作アドレスを介して接続要求を開始します。 3つのハンドシェイクを通じて、接続が正常に確立された場合、両当事者はソケットを介して通信できます。
従来の同期ブロッキングモデルの開発において、ServersocketはIPアドレスをバインドし、リスニングポートを開始する責任があります。ソケットは、接続操作の開始を担当します。接続が成功した後、両当事者は、入力ストリームと出力ストリームを介した同期ブロック通信を実施します。
バイオサーバー通信モデルの簡単な説明:バイオ通信モデルを使用するサーバーは、通常、クライアントの接続を聞くことを担当する独立したアクセプタースレッドです。クライアント接続要求を受信した後、リンク処理のためにクライアントごとに新しいスレッドを作成し、処理に失敗し、出力ストリームを介してクライアントへの返信を返し、スレッドが破壊されます。つまり、典型的な1回のリケストからリプトーな終戦モデルです。
従来のバイオコミュニケーションモデル図:
このモデルの最大の問題は、弾性スケーリング機能がないことです。クライアントの同時アクセスの数が増加すると、サーバー上のスレッドの数は、クライアントの同時アクセスの数に比例します。 Javaのスレッドも比較的貴重なシステムリソースです。スレッドの数が急速に拡大すると、システムのパフォーマンスが急激に低下します。アクセスの数が増え続けると、システムは最終的に死にます。
同期ブロッキングI/Oによって作成されたサーバーソースコード:
パッケージcom.anxpp.io.calculator.bio; java.io.ioexceptionをインポートします。 java.net.serversocketをインポートします。 java.net.socketをインポートします。 /** *バイオサーバーソースコード * @author yangtao__anxpp.com * @version 1.0 */public final class servernormal {//デフォルトポート番号プライベートStatic int default_port = 12345; // Singleton ServersocketプライベートStatic Serversocket Server; //着信パラメーターに従ってリスニングポートを設定します。パラメーターがない場合は、次のメソッドを呼び出し、デフォルト値public static void start()を使用します。 } //このメソッドは多数の同時の方法でアクセスされず、効率を考慮する必要はありません。メソッドを直接同期するだけで、パブリック同期した静的ボイドスタート(int port)がioException {(server!= null)return; {//コンストラクターを介してサーバーソケットを作成してください//ポートが合法でアイドル状態の場合、サーバーは正常にリッスします。 server = new Serversocket(port); System.out.println( "サーバーが開始されました、ポート番号:" +ポート); //ワイヤレスループを介してクライアント接続を聞く//クライアントアクセスがない場合は、Accept操作でブロックされます。 while(true){socket socket = server.accept(); //新しいクライアントアクセスがある場合、次のコードが実行されます//このソケットリンクnewスレッド(new ServerHandler(socket))を処理する新しいスレッドを作成します。start(); }}最後に{//必要なクリーニング作業(サーバー!= null){system.out.println( "サーバーが閉じている。"); server.close();サーバー= null; }}}}クライアントメッセージ処理スレッドサーバーハンドラーソースコード:
パッケージcom.anxpp.io.calculator.bio; java.io.bufferedreaderをインポートします。 java.io.ioexceptionをインポートします。 java.io.inputStreamReaderをインポートします。 java.io.printwriterをインポートします。 java.net.socketをインポートします。 com.anxpp.io.utils.calculatorをインポートします。 / ***クライアントスレッド* @author yangtao__anxpp.com*クライアント用ソケットリンク*/ public class serverhandlerはrunnable {private socket socket; public serverhandler(socket socket){this.socket = socket; } @Override public void run(){bufferedreader in = null; printwriter out = null; try {in = new BufferedReader(new inputStreamReader(socket.getInputStream()); out = new PrintWriter(socket.getOutputStream()、true);文字列式;文字列結果; while(true){// bufferedReaderを介して行を読む//入力ストリームのテールを読み取った場合は、nullを返してループを出て、非ヌル値を取得した場合は、結果を計算して、(expression = in.readline())== null)breakを返します。 System.out.println( "サーバーはメッセージを受信しました:" + expression); try {result = calculature.cal(expression).toString(); } catch(Exception e){result = "calculature.cal(expression).toString();} catch(exception e){e.printstacktrace();}最後に{//必要なクリーンアップ作業(in!= null){try {in.close();} catch(ioexception e){e.printstacktrace(); null){out out = null;同期ブロッキングI/Oによって作成されたクライアントソースコード:
パッケージcom.anxpp.io.calculator.bio; java.io.bufferedreaderをインポートします。 java.io.ioexceptionをインポートします。 java.io.inputStreamReaderをインポートします。 java.io.printwriterをインポートします。 java.net.socketをインポートします。 /** * @author yangtao__anxpp.com * @version 1.0 */public class client {//デフォルトのポート番号private static int default_server_port = 12345; private static string default_server_ip = "127.0.0.1"; public static void send(string expression){send(default_server_port、expression); } public static void send(int port、string expression){system.out.println( "算術式IS:" +式);ソケットソケット= null; bufferedreader in = null; printwriter out = null; try {socket = new Socket(default_server_ip、port); in = new BufferedReader(new inputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream()、true); out.println(式); system.out.println( "___結果は次のとおりです。" + in.readline()); } catch(Exception e){e.printstacktrace(); }最後に{//必要なクリーニング作業は(in!= null){try {in.close(); } catch(ioexception e){e.printstacktrace(); } in = null; } if(out!= null){out.close(); out = null; } if(socket!= null){try {socket.close(); } catch(ioexception e){e.printstacktrace(); } socket = null; }}}}コードをテストして、コンソールで出力結果を表示するのを容易にするために、同じプログラム(JVM)に入れて実行します。
パッケージcom.anxpp.io.calculator.bio; java.io.ioexceptionをインポートします。 java.util.randomをインポートします。 /** *テスト方法 * @Author Yangtao__anxpp.com * @version 1.0 */public class test {// public static void main(string [] args)throws interrutedexception {// new runnable(){@override public void run(){try {serverbetter.start() e.printstacktrace(); //サーバーが開始される前にクライアントがコードを実行することを避けます。 //クライアントchar operators [] = {'+'、 ' - '、 '*'、 '/'}を実行します。 RANDOM RANDOM = new Random(System.CurrentTimeMillis());新しいスレッド(new runnable(){@suppresswarnings( "static-access")@override public void run(){//ランダム生成算術式文字列式= nextint(10)+"+operators [random.nextint(4)]+(random.nextint(10)+1); client.s. thread.currentThread()。sleep(1000)); }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}ティー。 }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}ティー。 } } } } } } } } { 糸。 currentThread()。sleep(random.nextint(1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { 糸。 currentThread()。sleep(random.nextint(1000)); }}}}}}}}}実行の1つの結果:
サーバーは開始されました、ポート番号:12345算術式は次のとおりです。4-2サーバーはメッセージを受信しました:4-2 ___結果は次のとおりです:2算術式はメッセージを受け取りました:5-10 __結果は次のとおりです:-5算術式はメッセージです:0-9サーバーはメッセージを受け取りました:0-9 __結果はメッセージです:0+6サーバーメッセージを受け取りました:1/6__結果は次のとおりです。 0.166666666666666666666666666666666666666666666666666666666666666666666666666666666666666666 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666年66666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666年66666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666年
上記のコードから、バイオの主な問題は、新しいクライアントがアクセスを要求するときはいつでも、サーバーがこのリンクを処理するために新しいスレッドを作成する必要があることを簡単に確認できます。
1.2。擬似同期I/Oプログラミング
この1つの接続1つのスレッドモデルを改善するために、スレッドプールを使用してこれらのスレッドを管理します(詳細については、前述の記事を参照してください)。1つ以上のスレッドのモデルを実装してNクライアントを処理する(ただし、基礎となるレイヤーは、依然として同期ブロッキングI/Oを使用します)。
擬似同期I/Oモデル図:
実装は非常に簡単です。新しいスレッドを引き渡してプール管理をスレッドするだけで、今すぐサーバーコードを変更するだけです。
パッケージcom.anxpp.io.calculator.bio; java.io.ioexceptionをインポートします。 java.net.serversocketをインポートします。 java.net.socketをインポートします。 java.util.concurrent.executorserviceをインポートします。 java.util.concurrent.executorsをインポートします。 /** * BIO SERVER SOURCE CODE__PSEUDO-ASYNCHRONOUS I/O * @AUTHOR YANGTAO__ANXPP.com * @version 1.0 */public Final Class ServerBetter {//デフォルトのポート番号プライベートStatic Int Default_port = 12345; // Singleton ServersocketプライベートStatic Serversocket Server; // singleton private static executorservice executorservice = executors.newfixedthreadpool(60); //着信パラメーターに従ってリスニングポートを設定します。パラメーターがない場合は、次のメソッドを呼び出し、デフォルト値public static void start()を使用します。 } //このメソッドには同時にアクセスされず、効率を考慮する必要はありません。メソッドを直接同期するだけで、パブリック同期した静的ボイドスタート(intポート)がioException {if(server!= null)return; {//コンストラクターを介してサーバーソケットを作成してください//ポートが合法でアイドル状態の場合、サーバーは正常にリッスします。 server = new Serversocket(port); System.out.println( "サーバーが開始されました、ポート番号:" +ポート); //ワイヤレスループを介してクライアント接続をスーパーしますwhile(true){socket socket = server.accept(); //新しいクライアントアクセスがある場合、次のコードが実行されます//ソケットリンクexecutorservice.execute(new ServerHandler(Socket))を処理する新しいスレッドを作成します。 }}最後に{//必要なクリーニング作業(サーバー!= null){system.out.println( "サーバーが閉じている。"); server.close();サーバー= null; }}}}テストの実行結果は同じです。
実際、スレッドの管理(再利用)を自動的に支援することに加えて、CachedThreadPoolスレッドプールを使用すると、記事の冒頭で提供されている記事を参照してください)を参照してください。固定ThreadPoolを使用して、スレッドの最大数を効果的に制御し、システムの限られたリソースの制御を確保し、N:Mの擬似同期I/Oモデルを実装します。
ただし、スレッドの数が限られているため、多数の同時リクエストが発生した場合、最大数を超えるスレッドは、再利用できるスレッドプールに自由なスレッドがあるまで待機できます。ソケット入力ストリームが読み取られると、発生するまでブロックされます。
したがって、データの読み取りが遅い場合(データの大量、ネットワーク伝送の遅いなど)、および大量の並行性の場合、他のアクセスメッセージは常に待機することができます。これは最大の欠点です。
後で導入されるNIOは、この問題を解決できます。
2。NIOプログラミング
新しいJava I/Oライブラリは、java.nio。*パッケージにJDK 1.4に導入され、速度を上げることを目的としています。実際、「古い」I/OパッケージはNIOを使用して再実装されており、NIOプログラミングを明示的に使用していなくても、利益を得ることができます。速度の改善は、ファイルI/OとネットワークI/Oの両方で発生する可能性がありますが、この記事では後者のみについて説明します。
2.1。導入
NIOは一般的に新しいI/O(公式名も)と考えています。これは、古いI/Oライブラリにとって新しいものであるためです(実際にはJDK 1.4で導入されていますが、この名詞は今も「古い」ものであっても長い間使用され続けます。ただし、多くの人、つまり非ブロックI/Oによって非ブロックI/Oと呼ばれます。これは呼ばれるため、その特性をよりよく反映できるためです。次のテキストのNIOは、新しいI/Oライブラリ全体を参照していませんが、I/Oをブロックしていません。
NIOは、従来のバイオモデルのソケットとサーバーソケットに対応するSocketchannelとServersocketchannelの2つの異なるソケットチャネルの実装を提供します。
新しく追加されたチャネルの両方が、ブロッキングモードと非ブロッキングモードをサポートしています。
ブロッキングモードの使用は従来のサポートと同じくらい簡単ですが、パフォーマンスと信頼性は良くありません。非ブロッキングモードはまったく逆です。
低負荷、低電流アプリケーションの場合、同期ブロッキングI/Oを使用して、開発率とより良いメンテナンスを改善できます。高負荷、高電流(ネットワーク)アプリケーションの場合、NIOの非ブロッキングモードを使用して開発する必要があります。
基本的な知識を以下に最初に紹介します。
2.2。バッファバッファー
バッファーとは、書き込まれるデータを含むオブジェクトです。
NIOライブラリでは、すべてのデータがバッファで処理されます。データを読み取るときは、バッファーに直接読み取られます。データを作成するときは、バッファーにも書き込まれます。 NIOのデータにアクセスするたびに、バッファーを介して動作します。
バッファーは実際には配列であり、データへの構造化されたアクセスや読み取りおよび書き込みの場所の維持などの情報を提供します。
特定のキャッシュ領域は、Bytebuffe、Charbuffer、ShortBuffer、Intbuffer、Longbuffer、Floatbuffer、doubleBufferです。同じインターフェイスを実装します:バッファー。
2.3。チャネル
データの読み取りと執筆は、水道管、チャネルのようなチャネルを通過する必要があります。チャネルとストリームの違いは、チャネルが双方向であり、読み取り、書き込み、および書き込み操作に使用できることです。
基礎となるオペレーティングシステムのチャネルは一般に全二重です。したがって、フルダップレックスチャネルは、ストリームよりも基礎となるオペレーティングシステムのAPIをより適切にマッピングできます。
チャネルは、主に2つのカテゴリに分かれています。
次のコードに関与するServersocketchannelとSocketchannelは、どちらもSefterableChannelのサブクラスです。
2.4。 Multiplexerセレクター
セレクターは、Java NIOプログラミングの基礎です。
Selectorは、Ready Taskを選択する機能を提供します。Selectorは、登録されているチャネルを常にポーリングします。チャネルで読み取りまたは書き込みイベントが発生した場合、チャネルは準備ができており、セレクターによって投票されます。次に、selectionキーを通じて一連の準備チャネルを取得して、後続のI/O操作を実行できます。
Selectorは、JDKが従来の選択実装の代わりにEpoll()を使用するため、複数のチャネルを同時に投票できます。最大接続ハンドル1024/2048に制限はありません。したがって、セレクターのポーリングに責任を負う必要があるスレッドは1つだけで、何千ものクライアントにアクセスできます。
2.5。 NIOサーバー
このコードは、従来のソケットプログラミングよりもはるかに複雑なようです。
コードを貼り付けて、コメントの形式でコードの説明を提供するだけです。
NIOによって作成されたサーバーソースコード:
パッケージcom.anxpp.io.calculator.nio;パブリッククラスサーバー{private static int default_port = 12345;プライベートStatic ServerHandle ServerHandle; public static void start(){start(default_port); } public static同期void start(int port){if(serverhandle!= null)serverhandle.stop(); serverHandle = new ServerHandle(port);新しいスレッド(serverHandle、 "server")。start(); } public static void main(string [] args){start(); }} ServerHandle:
パッケージcom.anxpp.io.calculator.nio; java.io.ioexceptionをインポートします。 java.net.inetsocketAddressをインポートします。 java.nio.bytebufferをインポートします。 java.nio.channels.selectionKeyをインポートします。 java.nio.channels.selectorをインポートします。 java.nio.channels.serversocketchannelをインポートします。 java.nio.channels.socketchannelをインポートします。 java.util.iteratorをインポートします。 java.util.setをインポートします。 com.anxpp.io.utils.calculatorをインポートします。 / ** * nio server * @author yangtao__anxpp.com * @version 1.0 */ public class serverhandle runnable {private selector selector;プライベートServerSocketchannel ServerChannel;私的な揮発性ブール奏者が開始されました。 /** constructor* @paramポートは、聴取するポート番号を指定します*/public serverhandle(int port){try {// create selector = selector.open(); //リスニングチャネルServerChannel = serversocketchannel.open()を開きます。 // trueの場合、このチャネルはブロッキングモードに配置されます。 falseの場合、このチャネルは非ブロッキングモードServerChannel.ConfigureBlocking(false)に配置されます。 //非ブロッキングモードを有効にする//バインドポートバックログは1024 ServerChannel.Socket()。バインド(new inetsocketAddress(port)、1024)に設定されています。 // Superce Client Connection Request ServerChannel.register(selector、selectionkey.op_accept); //マークサーバーが有効になりました= true; System.out.println( "サーバーが開始され、ポート番号:" +ポート); } catch(ioexception e){e.printstacktrace(); System.Exit(1); }} public void stop(){start = false; } @Override public void run(){// selector while(start){try {//読み取りおよび書き込みイベントがあるかどうか、セレクターは1S selector.select(1000)ごとに起きます。 //ブロックすると、少なくとも1つの登録イベントが発生した場合にのみ継続されます。 // selector.select(); set <selectionKey> keys = selector.selectedKeys(); iterator <selectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasnext()){key = it.next(); it.remove(); try {handleInput(key); } catch(例外e){if(key!= null){key.cancel(); if(key.channel()!= null){key.channel()。close(); }}}}}}}}}} catch(throwable t){t.printstacktrace(); }} //セレクターが閉じた後、マネージドリソースが自動的にリリースされます(selector!= null)try {selector.close(); } catch(Exception e){e.printstacktrace(); }} private void handleInput(selectionkey key)throws ioexception {if(key.isvalid()){//新しいアクセスのリクエストメッセージの処理if(key.isaceptable()){serversocketchannel ssc =(serversocketchannel)key.channel(); // ServersocketchannelのAcceptを介してSocketchannelインスタンスを作成する//この操作を完了すると、TCP 3方向の握手が完了し、TCP Physical Linkが公式に確立されます。 socketchannel sc = ssc.accept(); //非ブロッキングSC.CONFIGUREBLOCKING(false)に設定します。 // recolts sc.register(selector、selectionkey.op_read); } //メッセージを読むif(key.isreadable()){socketchannel sc =(socketchannel)key.channel(); // bytebufferを作成し、1Mバッファーを開きますbytebufferバッファー= bytebuffer.allocate(1024); //リクエストストリームを読み取り、バイト数を返すint readbytes = sc.read(buffer); //バイトを読み取り、バイトのif(readbytes> 0){//バッファーの現在の制限を位置= 0に設定します。 //バッファーの数に基づいてバイト配列を作成します。 // Buffer Readableバイト配列を新しく作成した配列buffer.get(bytes)にコピーします。文字列式= new String(bytes、 "utf-8"); system.out.println( "サーバーがメッセージを受信しました:" + expression); //データ文字列result = nullを処理します。 try {result = calculature.cal(expression).toString(); } catch(例外e){result = "計算エラー:" + e.getmessage(); } //返信メッセージdowrite(sc、result)を送信します。 } //バイトなし読み取りと無視// else if(readbytes == 0); //リンクが閉じられており、リソースを解放してください。 sc.close(); }}}}} //返信メッセージを送信します。 //配列容量に従ってbytebufferを作成するbytebuffer writebuffer = bytebuffer.allocate(bytes.length); //バイテ配列をバッファにコピーしますwritebuffer.put(bytes); // flip操作writebuffer.flip(); // buffer channel.write(writebuffer)のバイト配列を送信します。 // *****「ハーフパケットの書き込み」を処理するためのコードはここに含まれていません}}ご覧のとおり、NIOサーバーを作成するための主な手順は次のとおりです。
応答メッセージが送信されるため、Socketchannelは非同期および非ブロッキングでもあるため、送信する必要があるデータを一度に送信できることを保証することはできません。書き込み操作を登録し、セレクターを絶えず投票して、アンセントメッセージを送信し、バッファのhasremain()メソッドを使用して、メッセージが送信されるかどうかを判断する必要があります。
2.6。 NIOクライアント
コードをアップロードするだけです。このプロセスはあまり説明を必要としません。サーバーコードに少し似ています。
クライアント:
パッケージcom.anxpp.io.calculator.nio;パブリッククラスクライアント{private static string default_host = "127.0.0.1"; private static int default_port = 12345;プライベートStatic ClientHandle ClientHandle; public static void start(){start(default_host、default_port); } public static同期void start(string ip、int port){if(clienthandle!= null)clienthandle.stop(); clientHandle = new ClientHandle(IP、ポート);新しいスレッド(clienthandle、 "server")。start(); } //サーバーにメッセージを送信しますpublic static boolean sendmsg(string msg)throws exception {if(msg.equals( "q"))return false; clienthandle.sendmsg(msg); trueを返します。 } public static void main(string [] args){start(); }} clienthandle:
パッケージcom.anxpp.io.calculator.nio; java.io.ioexceptionをインポートします。 java.net.inetsocketAddressをインポートします。 java.nio.bytebufferをインポートします。 java.nio.channels.selectionKeyをインポートします。 java.nio.channels.selectorをインポートします。 java.nio.channels.socketchannelをインポートします。 java.util.iteratorをインポートします。 java.util.setをインポートします。 / ** * nio client * @author yangtao__anxpp.com * @version 1.0 */ public class clienthandle explmention runnable {private string host;プライベートインターポート;プライベートセレクターセレクター。 Private Socketchannel Socketchannel;私的な揮発性ブール奏者が開始されました。 public clienthandle(string ip、int port){this.host = ip; this.port = port; try {// selector selector = selector.open(); //リスニングチャンネルを開きましたsocketchannel = socketchannel.open(); // trueの場合、このチャネルはブロッキングモードに配置されます。 falseの場合、このチャネルは非ブロッキングモードSocketchannel.configureBlocking(false); //非ブロッキングモードを開くstrue = trueに配置されます。 } catch(ioexception e){e.printstacktrace(); System.Exit(1); }} public void stop(){start = false; } @Override public void run(){try {doconnect(); } catch(ioexception e){e.printstacktrace(); System.Exit(1); } // selectorを介してループwhile(start){try {//読み取りおよび書き込みイベントがあるかどうかに関係なく、セレクターは1Sセレクターごとに目覚めます。Select(1000); //ブロック、そして少なくとも1つの登録イベントが発生した場合にのみ継続されます。 // selector.select(); set <selectionKey> keys = selector.selectedKeys(); iterator <selectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasnext()){key = it.next(); it.remove(); try {handleInput(key); } catch(例外e){if(key!= null){key.cancel(); if(key.channel()!= null){key.channel()。close(); }}}}}}}}} catch(Exception e){e.printstacktrace(); System.Exit(1); }} //セレクターが閉じた後、マネージドリソースが自動的にリリースされます(selector!= null)try {selector.close(); } catch(Exception e){e.printstacktrace(); }} private void handleInput(selectionkey key)throws ioexception {if(key.isvalid()){socketchannel sc =(socketchannel)key.channel(); if(key.isconnectable()){if(sc.finishconnect()); else system.exit(1); } //メッセージを読み取ります(key.isreadable()){// bytebufferを作成し、1mバッファーbytebufferバッファー= bytebuffer.allocate(1024); //リクエストコードストリームを読み取り、int readbytes = sc.read(buffer)を読むバイト数を返します。 //バイトを読み取り、バイトのif(readbytes> 0){//バッファーの現在の制限を位置= 0に設定します。 //バッファバイトの読み取り可能なバイトの数に基づいてバイト配列を作成します[] bytes = new byte [buffer.remaining()]; // Buffer Readableバイト配列を新しく作成した配列buffer.get(bytes)にコピーします。 string result = new String(bytes、 "utf-8"); System.out.println( "クライアントはメッセージを受信しました:" + result); } //読み取りは無視されません// else if(readbytes == 0); //リンクが閉じられており、リソースを解放してください。 sc.close(); }}}}} //メッセージを送信します。 //配列容量に基づいてbytebufferの作成bytebuffer writebuffer = bytebuffer.allocate(bytes.length); //バッテ配列をバッファにコピーするwritebuffer.put(bytes); // flip操作writebuffer.flip(); //バイト配列チャンネルを送信します。Write(WriteBuffer); // *****「write half-packet」を処理するためのコードはここに含まれていません} private void doconnect()throws ioexception {if(socketchannel.connect(new inetsocketAddress(host、port))); else socketchannel.register(selector、selectionkey.op_connect); } public void sendmsg(string msg)スロー例外{socketchannel.register(selector、selectionkey.op_read); Dowrite(Socketchannel、MSG); }} 2.7。デモンストレーションの結果
最初にサーバーを実行し、ところでクライアントを実行します。
パッケージcom.anxpp.io.calculator.nio; Java.util.scannerをインポートします。 /**テスト方法 * @author yangtao__anxpp.com * @version 1.0 */public class test {//メインメソッド@suppresswarnings( "resource")public static void main(string [] args)throws exception {// run server.start(); //クライアントがコードスレッドを実行するのを避けます。スリープ(100); // client client client.start()を実行します。 while(client.sendmsg(new scanner(system.in).nextline())); }}また、クライアントを個別に実行することもできますが、効果は同じです。
テストの結果:
サーバーは開始され、ポート番号:123451+2+3+4+5+6サーバーはメッセージを受け取りました:1+2+3+4+5+6クライアントはメッセージを受け取りました:211*211*211*211*2/3-4+5*6/7-8クライアントはメッセージを受け取りました:-7.047619047190474
複数のクライアントを実行するのに問題はありません。
3。AIOプログラミング
NIO 2.0は、新しい非同期チャネルの概念を導入し、非同期ファイルチャネルと非同期ソケットチャネルの実装を提供します。
非同期ソケットチャネルは、UNIXネットワークプログラミングのイベント駆動型I/O(AIO)に対応する、本当に非同期非ブロッキングI/Oです。非同期の読み取りと書き込みを達成するために登録されたチャネルを投票するためにあまりにも多くのセレクターを必要としないため、NIOプログラミングモデルが簡素化されます。
コードをアップロードするだけです。
3.1。サーバーサイドコード
サーバ:
パッケージcom.anxpp.io.calculator.aio.server; / ** * AIO SERVER * @Author Yangtao__anxpp.com * @version 1.0 */ public class server {private static int default_port = 12345;プライベートStatic Asyncserverhandler ServerHandle;公共揮発性静的long clientCount = 0; public static void start(){start(default_port); } public static同期void start(int port){if(serverhandle!= null)return; serverHandle = new Asyncserverhandler(ポート);新しいスレッド(serverHandle、 "server")。start(); } public static void main(string [] args){server.start(); }} asyncserverhandler:
パッケージcom.anxpp.io.calculator.aio.server; java.io.ioexceptionをインポートします。 java.net.inetsocketAddressをインポートします。 java.nio.channels.asyncserversocketchannelをインポートします。 java.util.concurrent.countdownlatchをインポートします。パブリッククラスAsyncserverhandlerは実行可能{public countdownlatchラッチ; Public AsyncserversocketchenterChannel; public asyncserverhandler(int port){try {//サーバーチャネル= asynchronousserversocketchannel.open(); //ポートチャンネルをバインドします。 System.out.println( "サーバーが開始され、ポート番号:" +ポート); } catch(ioexception e){e.printstacktrace(); }} @Override public void run(){// CountDownLatch Initialization //その関数:現在のフィールドが常に実行される前に常にブロックすることを許可します//ここでフィールドブロックをブロックして、実行後にサーバーが終了しないようにします// //接続Channel.accept(this、new accepthandler()); try {latch.await(); } catch(arturnedexception e){e.printstacktrace(); }}} accepthandler:
パッケージcom.anxpp.io.calculator.aio.server; java.nio.bytebufferをインポートします。 java.nio.channels.asynchronoussocketchannelをインポートします。 java.nio.channels.compleditionhandlerをインポートします。 //ハンドラーとして接続するパブリッククラスACCEPTHENDLERを実装しているRecture Handler <asynchronoussocketchannel、asyncserverhandler> {@override public void Complete(asynchronoussocketchannelチャンネル、asyncserverhandler serverhandler){//他のクライアントサーバーからのリクエストを受け入れ続けます。 System.out.println( "接続クライアントの数:" + server.clientCount); ServerHandler.Channel.Accept(ServerHandler、this); //Create a new Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; java.io.ioexceptionをインポートします。 import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2。 Client side code
クライアント:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); trueを返します。 } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; java.io.ioexceptionをインポートします。 import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host;プライベートインターポート; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; java.io.ioexceptionをインポートします。 import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //Complete writing of all data if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //Read data ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("Data send failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; java.io.ioexceptionをインポートします。 import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("Client received result:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("Data read failed..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3。テスト
テスト:
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。