序文:
最近、HadoopのRPC(リモートプロシージャコールプロトコル)を分析するときは、基礎となるネットワークテクノロジーを理解せずにネットワークを介してリモートコンピュータープログラムからサービスを要求するプロトコルです。参照できます:http://baike.baidu.com/view/32726.htm)メカニズム、HadoopのRPCメカニズムの実装は主に2つのテクノロジーを使用していることがわかりました。 HadoopのRPCソースコードを正しく分析するには、Java NIOの原則と特定の実装を最初に研究する必要があると思います。
このブログでは、主に2つの方向からJava Nioを分析します
目次:
1つ。 Java nioとI/Oのブロックの違い
1。I/O通信モデルのブロック
2。JavaNioの原則とコミュニケーションモデル2。JavaNioサーバーとクライアントコードの実装
特定の分析:
1つ。 Java nioとI/Oのブロックの違い
1。I/O通信モデルのブロック
I/Oのブロックについて特定の理解がある場合、InputStream.Read()メソッドを呼び出すときにI/Oのブロックがブロックされていることがわかります。データが到着する(またはタイムアウト)が戻る前に待機します。同様に、Serversocket.Accept()メソッドを呼び出すと、戻る前にクライアント接続があるまでブロックします。各クライアントが接続すると、サーバーはスレッドを起動してクライアントの要求を処理します。 I/Oのブロックの通信モデル図は次のとおりです。
慎重に分析すると、I/Oをブロックすることにはいくつかの欠点があることがわかります。ブロッキングI/O通信モデルに基づいて、2つの欠点を要約しました。
1.多くのクライアントがいる場合、多数の処理スレッドが作成されます。そして、各スレッドはスタックスペースといくつかのCPU時間を取り上げます
2。ブロックは頻繁にコンテキストの切り替えにつながる可能性があり、ほとんどのコンテキストの切り替えは無意味である可能性があります。
この場合、非ブロッキングI/Oにはアプリケーションの見通しがあります。
2。JavaNioの原則とコミュニケーションモデル
Java NioはJDK1.4で開始され、「新しいI/O」または非ブロッキングI/Oの両方と言えます。 Java nioの仕組みは次のとおりです。
1.専用のスレッドはすべてのIOイベントを処理し、配布を担当します。
2。イベント駆動型メカニズム:イベントを同時に監視するのではなく、イベントが到着したときにトリガーします。
3。スレッド通信:スレッドは、待機、通知、その他の手段を通信します。すべてのコンテキストスイッチが理にかなっていることを確認してください。不要なスレッドスイッチングを減らします。
いくつかの情報を読んだ後、私が理解しているJava Nioの作業概略図を投稿しました。
(注:各スレッドの処理フローは、おそらくデータの読み取り、デコード、コンピューティング処理、エンコード、および応答の送信です。)
Java NIOサーバーは、すべてのIOイベントを処理するために特別なスレッドを起動するだけでいいものがあります。この通信モデルはどのように実装されていますか?ハハ、一緒にその謎を探りましょう。 Java Nioは、一元配置ストリームではなく、データ送信に双方向チャネルを使用し、関心のあるイベントをチャネルに登録できます。合計4つのイベントがあります:
| イベント名 | 対応する値 |
| サーバーはクライアント接続イベントを受信します | selectionkey.op_accept(16) |
| クライアント接続サーバーイベント | selectionkey.op_connect(8) |
| イベントを読んでください | selectionKey.op_read(1) |
| イベントを書く | selectionkey.op_write(4) |
サーバーとクライアントはそれぞれ、1つ以上のチャネルでイベントを検出できるセレクターを呼び出すチャネルを管理するオブジェクトを維持します。サーバーを例として見てみましょう。読み取りイベントがサーバーのセレクターに登録されている場合、クライアントはある時点でいくつかのデータをサーバーに送信します。 I/Oをブロックすると、データをブロックするためにread()メソッドを呼び出し、NIOサーバーはreadイベントをセレクターに追加します。サーバーの処理スレッドは、セレクターにアクセスするためにポーリングします。関心のあるイベントがセレクターにアクセスするときに到着することがわかった場合、これらのイベントを処理します。関心のあるイベントが届かない場合、興味のあるイベントが到着するまで処理スレッドがブロックされます。以下は、私が理解しているJava Nioのコミュニケーションモデルの概略図です。
二。 Java NIOサーバーとクライアントコードの実装
Java Nioをよりよく理解するために、以下はサーバーとクライアントの簡単なコード実装です。
サーバ:
パッケージcn.nio; java.io.ioexceptionをインポートします。 java.net.inetsocketAddressをインポートします。 java.nio.bytebufferをインポートします。 java.nio.channels.selectionKeyをインポートします。 java.nio.channels.selectorをインポートします。 java.nio.channels.serversocketchannelをインポートします。 java.nio.channels.socketchannelをインポートします。 java.util.iteratorをインポートします。 /*** NIO SERVER* @Author Small Path*/public class nioServer {//チャンネルマネージャープライベートセレクターセレクター。 / ** * Serversocket Channelを取得し、チャンネルでいくつかの初期化作業を行います * @Param Port Bound Port * @Throws IOException */ public void initserver(int port)throws ioexception {// Serversocket Channel ServersocketchannelChannel = serversocketchannel.open(); //チャンネルを非ブロッキングServerChannel.configureBlocking(false)に設定します。 //このチャネルに対応するサーバーソケットをポートポートServerChannel.Socket()。バインド(new inetsocketAddress(port)); //チャンネルマネージャーを取得するthis.selector = selector.open(); //チャンネルマネージャーをチャンネルにバインドし、selectionkey.op_acceptイベントを登録します。イベントを登録した後、//イベントが届くと、selector.select()が返されます。イベントがselector.select()に到達しない場合、ブロックします。 serverChannel.register(selector、selectionkey.op_accept); } /***ポーリングを使用して、セレクターに処理する必要があるイベントがあるかどうかを聞きます。その場合、それは処理されます * @throws ioexception */ @suppresswarnings( "unchecked")public void risten()throws ioexception {system.out.println( "server-side start furtty!"); //(true){//登録されたイベントが到着すると、メソッドが戻ります。それ以外の場合、メソッドはselector.select()をブロックし続けます。 //選択したアイテムのiteratorをセレクターで取得し、選択したアイテムは登録されているイベントIterator Item = this.selector.selectedKeys()。iterator(); while(ite.hasnext()){selectionkey key =(selectionkey)ite.next(); //選択したキーを削除して、item.remove()の繰り返し処理を防ぐために; //クライアントは、(key.isaceptable()){serversocketchannel server =(serversocketchannel)key .channel();の場合、接続イベントを要求します。 //チャンネルを取得して、クライアントSocketchannelチャンネル= server.accept();に接続します。 //非blocking channel.configureblocking(false)に設定します。 //ここでクライアントに情報を送信できますChannel.write(bytebuffer.wrap(new String( "メッセージをクライアントに送信")。getBytes())); //クライアントとの接続が成功した後、クライアントの情報を受信するには、チャネルの読み取り権限を設定する必要があります。 Channel.register(this.selector、selectionkey.op_read); //読み取り可能なイベントが取得されました} else if(key.isreadable()){read(key); }}}}} / ** *クライアントから送信されたメッセージを読み取るイベントの処理 * @param key * @throws ioexception * / public void read(selectionkey key)throws ioexception {//サーバーはメッセージを読むことができます:イベントが発生するソケットチャネルを取得します。 Socketchannel Channel =(Socketchannel)key.channel(); //読み取りバッファーを作成するbytebuffer buffer = bytebuffer.allocate(10); channel.read(buffer); byte [] data = buffer.array();文字列msg = new String(data).trim(); System.out.println( "サーバーはメッセージを受信しました:"+msg); bytebuffer outbuffer = bytebuffer.wrap(msg.getBytes()); Channel.write(outbuffer); //メッセージをクライアントに返送}/ *** Start Server Test* @Throws IOException*/ public Static void Main(String [] args)throws ioException {nioServer server = new nioServer(); server.initserver(8000); server.listen(); }}クライアント:
パッケージcn.nio; java.io.ioexceptionをインポートします。 java.net.inetsocketAddressをインポートします。 java.nio.bytebufferをインポートします。 java.nio.channels.selectionKeyをインポートします。 java.nio.channels.selectorをインポートします。 java.nio.channels.socketchannelをインポートします。 java.util.iteratorをインポートします。 /*** nio Client* @Author Small Path*/public class nioclient {// Channel Managerプライベートセレクターセレクター; / ** *ソケットチャネルを取得してチャンネルの初期化を実行 * @param IP * @paramポートに接続されたサーバーのIPは、 * @throws ioexception */ public void initclient(string ip、int port)に接続されたサーバーのポート番号をioexception {//ソケットチャンネルソケットソックエッチネルチャネル= socketchannel.open.open()を取得します。 //チャンネルを非ブロッキングchannel.configureblocking(false)に設定します。 //チャンネルマネージャーを取得するthis.selector = selector.open(); //クライアントはサーバーに接続します。実際、メソッド実行は接続を実装しません。 // channel.finishconnect()を使用する必要があります。 connection channel.connect(new inetsocketAddress(ip、port))を完了するlisten()メソッドで; //チャンネルマネージャーをチャンネルにバインドし、selectionkey.op_connectイベントをチャンネルに登録します。 Channel.register(selector、selectionkey.op_connect); } /***ポーリングを使用して、セレクターに処理する必要があるイベントがあるかどうかを聞きます。その場合、それは処理されます * @throws ioexception */ @suppresswarnings( "unchecked")public void risten()throws ioexception {//ポーリングしてセレクターにアクセスして(true){selector.select(); //セレクターIterator item = this.selector.selectedKeys()。iterator()で選択したアイテムのイテレーターを取得します。 while(ite.hasnext()){selectionkey key =(selectionkey)item.next(); //選択したキーを削除して、item.remove()の繰り返し処理を防ぐために; //接続イベントは、(key.isconnectable()){socketchannel channel =(socketchannel)key.channel();の場合に発生します。 //接続が接続されている場合、接続を完了します(channel.isconnectionpending()){channel.finishconnect(); } //非blocking channel.configureblocking(false)に設定します。 // Server Channel.write(bytebuffer.wrap(new String( "メッセージをサーバーに送信")。getBytes())); //サーバーとの接続が成功した後、サーバー情報を受信するには、チャネルを許可を読み取るために設定する必要があります。 Channel.register(this.selector、selectionkey.op_read); //読み取り可能なイベントが取得されました} else if(key.isreadable()){read(key); }}}}} /***サーバーによって送信されたメッセージを読み取るイベントの処理* @param key* @throws ioexception* /public void read(selectionkey key)throws ioexception {//サーバーの読みメソッドと同じ} /*** @throws ioexception* /public void main(] sthrows ioexction = nioc client.initclient( "localhost"、8000); client.listen(); }}まとめ:
最後に、動的プロキシとJava NIO分析が完了しました。ハハ、以下は、HadoopのRPCメカニズムのソースコードを分析するためです。ブログアドレスは、http://weixiaolu.iteye.com/blog/1504898です。ただし、Java Nioの理解に異議がある場合は、一緒に話し合うことを歓迎します。
再版する必要がある場合は、出典を示してください:http://weixiaolu.iteye.com/blog/1479656