PipedOutputStreamおよびPipedInputStream
Javaでは、PipedOutputStreamとPipedInputStreamは、それぞれパイプライン出力ストリームとパイプライン入力ストリームです。
それらの機能は、マルチスレッドがパイプラインを介してスレッド間で通信できるようにすることです。パイプライン通信を使用する場合、PipedOutputStreamとPipedInputStreamを互いに組み合わせて使用する必要があります。
パイプライン通信を使用する場合、一般的なプロセスは次のとおりです。スレッドAのPipedOutputStreamにデータを書き込み、これらのデータはPipedOutputStreamに対応するPipedinputStreamに自動的に送信され、PipedinputStreamのバッファーに保存されます。この時点で、スレッドBはPipedinputStreamのデータを読み取ります。これにより、スレッドAとスレッドBの間の通信が実現できます。
以下に、マルチスレッドのパイプラインを介した通信の例を見ていきます。例には、Receiver.java、pipedstreamtest.java、sender.javaの3つのクラスが含まれます。
Receiver.javaのコードは次のとおりです。
java.io.ioexceptionをインポートします。 java.io.pipedinputStreamをインポートします。 @suppresswarnings( "all") / ***レシーバースレッド* /パブリッククラスレシーバーはスレッド{// pipeline入力ストリームオブジェクトを拡張します。 //「PipedOutputStream」オブジェクトにバインドされています。 private pipedinputStream in = new PipedInputStream(); //「パイプ入力ストリーム」オブジェクトを取得しますpipedinputStream getInputStream(){return in; } @Override public void run(){readmessageonce(); // readmessagecontinued(); } //「パイプ入力ストリーム」からデータを1回読み取りますpublic void readmessageonce(){// bufのサイズは2048バイトですが、「パイプ入力ストリーム」から最大1024バイトでのみ読み取ります。 //「パイプ入力ストリーム」のバッファサイズは、デフォルトでは1024バイトのみです。 byte [] buf = new byte [2048]; {int len = in.read(buf); System.out.println(new String(buf、0、len)); in.close(); } catch(ioexception e){e.printstacktrace(); }} //「パイプ入力ストリーム」から> 1024バイトを読むとき、public void readmessagecontinued(){int total = 0; while(true){byte [] buf = new byte [1024]; {int len = in.read(buf);合計 += len; System.out.println(new String(buf、0、len)); //読み取られたバイトの総数が> 1024の場合、ループを終了します。 if(合計> 1024)破壊; } catch(ioexception e){e.printstacktrace(); }} try {in.close(); } catch(ioexception e){e.printstacktrace(); }}} sender.javaのコードは次のとおりです。
java.io.ioexceptionをインポートします。 java.io.pipedOutputStreamをインポートします。 @suppresswarnings( "all")/ ***送信者スレッド*/ public class sender extends thread {// pipeline出力ストリームオブジェクト。 //「pipedinputStream」オブジェクトにバインドされています。//これにより、データが「pipedinputStream」のデータに送信され、ユーザーは「pipedinputStream」のデータを読み取ることができます。 private pipedOutputStream out = new PipedOutputStream(); //「パイプ出力ストリーム」オブジェクトを取得しますpipedOutputStream getOutputStream(){return out; } @Override public void run(){writeshortmessage(); // writelongmessage(); } //「パイプ出力ストリーム」に短いメッセージを書きます:「これは短いメッセージです "private void writeshortmessage(){string strinfo ="これは短いメッセージです "; try {out.write(strinfo.getBytes()); out.close(); } catch(ioexception e){e.printstacktrace(); }} //「パイプ出力ストリーム」に長いメッセージを書き込み、private void writelongmessage(){stringbuilder sb = new StringBuilder(); //(int i = 0; i <102; i ++)sb.append( "0123456789"); // 26バイトをもっと書きます。 sb.append( "abcdefghijklmnopqrstuvwxyz"); // strの全長は1020+26 = 1046バイトstring str = sb.toString()です。 try {// 1046バイトを「パイプ出力ストリーム」out.write(str.getBytes())に書き込みます。 out.close(); } catch(ioexception e){e.printstacktrace(); }}} pipedstreamtest.javaのコードは次のとおりです。
java.io.pipedinputStream; Import java.io.pipedOutputStream; Import java.io.ioexception; @suppresswarnings( "all") / ***パイプライン入力ストリームおよびパイプライン出力ストリームのインタラクティブプログラム受信機T2 = new Receiver(); pipedoutputStream out = t1.getOutputStream(); pipedinputStream in = t2.getInputStream(); {//パイプ接続を試してください。次の2つの文の本質は同じです。 //out.connect(in); in.connect(out); /** *スレッドクラスの開始方法: *スレッドの実行を開始します。 Java仮想マシンは、スレッドの実行方法を呼び出します。 *その結果、2つのスレッドが同時に実行されます。現在のスレッド(呼び出しからstartメソッドに返されます)と他のスレッド(実行方法を実行)。 *スレッドを複数回起動することは違法です。特に、スレッドの実行が終了した場合、再起動することはできません。 */ t1.start(); t2.start(); } catch(ioexception e){e.printstacktrace(); }}}実行結果:
これは短いメッセージです
説明:
(1)in.connect(out); 「パイプ入力ストリーム」と「パイプ出力ストリーム」を関連付けます。 pipedoutputStream.javaおよびpipedinputStream.javaのソースコード()のソースコード()を確認します。私たちは知っています.connect(in); in.connect(out)に相当します。
(2)
t1.start(); //「送信者」スレッドt2.start()を起動します。 //「受信機」スレッドを起動します
最初にsender.javaのソースコードを確認し、スレッドの開始後にrun()関数を実行します。 sender.javaのrun()で、writeshortmessage()を呼び出します。
writeshortmessage()の機能; 「これは短いメッセージです」と「パイプ出力ストリーム」を記述することです。このデータは、「パイプ入力ストリーム」によって受信されます。これがどのように達成されるか見てみましょう。
まず、Source of Write(BYTE B [])を見て、outputStream.javaで定義しましょう。 pipedOutputStream.javaは、outputStream.javaから継承します。 outputstream.javaの書き込みコード(byte b [])は次のとおりです。
public void write(byte b [])throws ioexception {write(b、0、b.length);}実際、write(byte b [])は、pipedoutputStream.javaのコールwrite(byte b []、int off、int len)関数です。 Source of Write(BYTE B []、INT OFF、INT LEN)を見ると、Sink.Receive(b、off、len)を呼び出すことがわかりました。さらに、受信(BYTE B []、INT OFF、INT LEN)の定義を見ると、Sink.Receive(B、Off、Len)は、「パイプ出力ストリーム」のデータを「パイプ入力ストリーム」のバッファーに保存することです。 「パイプ入力ストリーム」のバッファバッファーのデフォルトサイズは1024バイトです。
この時点で、次のことがわかります。T1.Start()は送信者スレッドを起動し、送信者スレッドは「これはパイプ出力ストリーム」に「これは短いメッセージです」と記述します。 「パイプ出力ストリーム」は、データを「パイプ入力ストリーム」に転送します。つまり、「パイプ入力ストリーム」のバッファーに保存されます。
次に、「ユーザーが「パイプ入力ストリーム」のバッファからデータを読む方法」を見ます。これは、実際にはレシーバースレッドのアクションです。
t2.start()はレシーバースレッドを開始し、それによりreceiver.java run()関数を実行します。 Receiver.javaのソースコードを見ると、run()がreadmessageonce()を呼び出すことがわかります。
readMessageOnce()は、in.read(buf)を呼び出して、「パイプ入力ストリームのパイプ」からデータを読み取り、BUFに保存します。
上記の分析を通じて、「パイプ入力ストリーム」のバッファー内のデータは「これは短いメッセージです」であることをすでに知っています。したがって、BUFのデータは「これは短いメッセージです」です。
パイプラインの理解を深めるため。次の2つの小さな実験を続けます。
実験1:sender.javaを変更します
意思
public void run(){writeshortmessage(); // writelongmessage();}に変更された
public void run(){// writeshortmessage(); writelongmessage();}プログラムを実行します。実行中の結果は次のとおりです。
これらのデータは、writelongmessage()を介して「パイプ出力ストリーム」に書き込まれ、「パイプ入力ストリーム」に転送され、「パイプ入力ストリーム」のバッファーに保存されます。そして、ユーザーによるバッファーから読み出します。
次に、writelongmessage()のソースコードを観察します。 STRの長さは1046バイトであり、実行の結果は1024バイトのみであることがわかります!なぜこれが起こっているのですか?
理由は単純です。パイプライン入力ストリームのバッファーのデフォルトサイズは1024バイトです。したがって、せいぜい1024バイトを書き込むことができます。
pipedinputStream.javaのソースコードを観察することにより、より徹底的に理解することができます。
Private static final int default_pipe_size = 1024; public pipedinputstream(){initpipe(default_pipe_size);}デフォルトのコンストラクターはinitpipe(default_pipe_size)を呼び出し、そのソースコードは次のとおりです。
private void initpipe(int pipesize){if(pipesize <= 0){throw new IllegalargumentException( "Pipe Size <= 0"); } buffer = new byte [pipesize];}このことから、バッファバッファーのデフォルトサイズが1024バイトであることがわかります。
実験2:「実験1」に基づいてreceiver.javaを変更し続けます
意思
public void run(){readmessageonce(); // readmessagecontinued();}に変更された
public void run(){// readmessageonce(); readmessagecontinued();}プログラムを実行します。実行中の結果は次のとおりです。
この結果は、「入力バッファー」に記述された完全なデータです。
PipedWriterとPipedReader
PipedWriterは、ライターから継承されているキャラクターパイプライン出力ストリームです。
PipedReaderは、ライターから継承する文字パイプライン入力ストリームです。
PipedWriterとPipedReaderの機能は、パイプラインを介してスレッド間で通信することです。パイプライン通信を使用する場合、PipedWriterとPipedReaderを互いに組み合わせて使用する必要があります。
以下に、マルチスレッドのPipedWriterとPipedReaderを介したコミュニケーションの例を見ていきます。例には、Receiver.java、sender.java、pipetest.javaの3つのクラスが含まれます
Receiver.javaのコードは次のとおりです。
java.io.ioexceptionをインポートします。 java.io.pipedreaderをインポートします。 @suppresswarnings( "all") / ***レシーバースレッド* /パブリッククラスレシーバーはスレッド{// pipeline入力ストリームオブジェクトを拡張します。 //「PipedWriter」オブジェクトにバインドされています。//これにより、「PipedWriter」のデータを受信してからユーザーに読み取ることができます。 Private PipedReader in = new PipedReader(); //「パイプ入力ストリームオブジェクト」を取得しますpipedReader getReader(){return in; } @Override public void run(){readmessageonce(); // readmessagecontinued(); } //「パイプ入力ストリーム」からデータを1回読み取りますpublic void readmessageonce(){// bufのサイズは2048文字ですが、「パイプ入力ストリーム」の最大1024文字でのみ読み取ります。 //「パイプ入力ストリーム」のバッファサイズは、デフォルトでは1024文字のみです。 char [] buf = new char [2048]; {int len = in.read(buf); System.out.println(new String(buf、0、len)); in.close(); } catch(ioexception e){e.printstacktrace(); }} //「パイプ入力ストリーム」から> 1024文字を読むとき、public void readmessagecontinued(){int total = 0; while(true){char [] buf = new char [1024]; {int len = in.read(buf);合計 += len; System.out.println(new String(buf、0、len)); //読み取り文字の総数が> 1024の場合、ループは終了します。 if(合計> 1024)破壊; } catch(ioexception e){e.printstacktrace(); }} try {in.close(); } catch(ioexception e){e.printstacktrace(); }}} sender.javaのコードは次のとおりです。
java.io.ioexceptionをインポートします。 java.io.pipedwriterをインポートします。 @suppresswarnings( "all")/ ***送信者スレッド*/ public class sender extends thread {// pipeline出力ストリームオブジェクト。 //「PipedReader」オブジェクトにバインドされています。//これにより、データを「PipedReader」のデータに送信でき、ユーザーは「PipedReader」のデータを読み取ることができます。 Private PipedWriter out = new PipedWriter(); //「パイプ出力ストリーム」オブジェクトを取得しますパブリックパイプライターgetWriter(){return out; } @Override public void run(){writeshortmessage(); // writelongmessage(); } //「パイプ出力ストリーム」に短いメッセージを書きます:「これは短いメッセージです "private void writeshortmessage(){string strinfo ="これは短いメッセージです "; try {out.write(strinfo.tochararray()); out.close(); } catch(ioexception e){e.printstacktrace(); }} //「パイプ出力ストリーム」に長いメッセージを書き込み、private void writelongmessage(){stringbuilder sb = new StringBuilder(); //(int i = 0; i <102; i ++)sb.append( "0123456789"); // 26文字をもっと書きます。 sb.append( "abcdefghijklmnopqrstuvwxyz"); // strの全長は1020+26 = 1046文字string str = sb.toString()です。 try {// 1046文字を「パイプ出力ストリーム」out.write(str)に書き込みます。 out.close(); } catch(ioexception e){e.printstacktrace(); }}} pipetest.javaのコードは次のとおりです。
java.io.io.pipedreader; Import java.io.pipedwriter; Import java.io.ioexception; @suppresswarnings( "all") / ***パイプライン出力ストリームおよびパイプライン出力ストリームのインタラクティブプログラム受信機T2 = new Receiver(); PipedWriter out = t1.getWriter(); PipedReader in = t2.getReader(); {//パイプ接続を試してください。次の2つの文の本質は同じです。 //out.connect(in); in.connect(out); /** *スレッドクラスの開始方法: *スレッドの実行を開始します。 Java仮想マシンは、スレッドの実行方法を呼び出します。 *その結果、2つのスレッドが同時に実行されます。現在のスレッド(呼び出しからstartメソッドに返されます)と他のスレッド(実行方法を実行)。 *スレッドを複数回起動することは違法です。特に、スレッドの実行が終了した場合、再起動することはできません。 */ t1.start(); t2.start(); } catch(ioexception e){e.printstacktrace(); }}}実行結果:
これは短いメッセージです
結果説明:
(1)
in.connect(out);
その機能は、「パイプ入力ストリーム」と「パイプ出力ストリーム」を関連付けることです。 pipedwriter.javaおよびpipedreader.javaのソースコード()のソースコード()を確認してください。私たちは知っています.connect(in); in.connect(out)に相当します。
(2)
t1.start(); //「送信者」スレッドt2.start()を起動します。 //「受信機」スレッドを起動します
最初にsender.javaのソースコードを確認し、スレッドの開始後にrun()関数を実行します。 sender.javaのrun()で、writeshortmessage()を呼び出します。
writeshortmessage()の機能; 「これは短いメッセージです」と「パイプ出力ストリーム」を記述することです。このデータは、「パイプ入力ストリーム」によって受信されます。これがどのように達成されるか見てみましょう。
まず、Source of Writeコード(Char。PipedWriter.javaがwriter.javaから継承されます; writer.javaのソースコード(char c [])は次のとおりです。
public void write(char cbuf [])throws ioexception {write(cbuf、0、cbuf.length);}
実際、write(char c [])は、pipedwriter.javaのコールwrite(char c []、int off、int len)関数です。 Source of Write(Char C []、int off、int len)を見ると、sink.receive(cbuf、off、len)を呼び出すことがわかりました。さらに、受信(Char C []、Int Off、Int Len)の定義を見ると、sink.Receive(CBUF、OFF、LEN)は、「パイプ出力ストリーム」のデータを「パイプ入力ストリーム」のバッファーに保存することを知っています。 「パイプ入力ストリーム」のバッファバッファーのデフォルトサイズは1024文字です。
この時点で、次のことがわかります。T1.Start()は送信者スレッドを起動し、送信者スレッドは「これはパイプ出力ストリーム」に「これは短いメッセージです」と記述します。 「パイプ出力ストリーム」は、データを「パイプ入力ストリーム」に転送します。つまり、「パイプ入力ストリーム」のバッファーに保存されます。
次に、「ユーザーが「パイプ入力ストリーム」のバッファからデータを読む方法」を見ます。これは、実際にはレシーバースレッドのアクションです。
t2.start()はレシーバースレッドを開始し、それによりreceiver.java run()関数を実行します。 Receiver.javaのソースコードを見ると、run()がreadmessageonce()を呼び出すことがわかります。
readMessageOnce()は、in.read(buf)を呼び出して、「パイプ入力ストリームのパイプ」からデータを読み取り、BUFに保存します。
上記の分析を通じて、「パイプ入力ストリーム」のバッファー内のデータは「これは短いメッセージです」であることをすでに知っています。したがって、BUFのデータは「これは短いメッセージです」です。
パイプラインの理解を深めるため。次の2つの小さな実験を続けます。
実験1:sender.javaを変更します
意思
public void run(){writeshortmessage(); // writelongmessage();}に変更された
public void run(){// writeshortmessage(); writelongmessage();}プログラムを実行します。操作結果は次のとおりです。
このことから、プログラムが間違って実行されることがわかります!例外Java.io.ioexception:パイプが閉じています
なぜこれが起こっているのですか?
プログラムフローを分析します。
(1)pipetestで、入力パイプラインと出力パイプラインをin.connect(out)に接続します。次に、2つのスレッドを開始します。 t1.start()はスレッド送信者を起動し、t2.start()はスレッドレシーバーを起動します。
(2)送信者スレッドが開始された後、データはwriteLongmessage()およびout.write(str.tochararray())を介して「出力パイプライン」に書き込まれます。 PipedWriterのソースコードによると、PipedWriterのWrite()関数は、PipedReaderの受信()関数を呼び出します。 PipedReaderの受信()関数を見ると、PipedReaderが受け入れられたデータバッファーを保存することがわかっています。受信()関数を慎重に観察した場合、次のコードがあります。
while(in == out){if((readside!= null)&&!readside.isalive()){throw new ioexception( "pipe broken"); } / * full:待機中の読者をキック * / notifyall(); {wait(1000); } catch(arturtedexception ex){new java.io.interrupdedioexception(); }}内外の初期値は、それぞれ= -1、out = 0です。上記と組み合わせて(in == out)。その意味は、キャラクターがパイプラインに書き込まれるたびに、== outの条件が満たされるということです。次に、notifyall()が呼び出され、「パイプラインを読み取るスレッド」を目覚めさせます。
つまり、キャラクターがパイプラインに書き込まれるたびに、他のスレッドが読み取られるのをブロックして待ちます。
ただし、PipedReaderのバッファーのデフォルトサイズは1024です!ただし、現時点で記述すべき1046のデータがあります!したがって、最大で1024文字を一度に書くことができます。
(03)受信者スレッドが開始された後、Pipeline入力ストリームを読み取るためにReadMessageOnce()が呼び出されます。読み取り1024文字が完了し、閉じる()が閉じるために呼び出されます。
(02)と(03)の分析から、送信者は1046文字をパイプラインに書き込む必要があることがわかります。その中で、最初の1024文字(バッファ容量は1024)は正常に書くことができ、1つは書き込みごとに読み取られます。 1025文字が書かれている場合、pipedwriter.javaのwrite()は順番に呼び出されます。次に、pipedreader.javaのreceive()が呼び出されます。 PipedReader.javaでは、最終的に受信(int C)関数が呼び出されます。この時点で、パイプライン入力ストリームは閉じられています。つまり、クローズドバイリーダーが真であるため、新しいioException(「パイプ閉じた」)をスローします。
「テスト1」を変更して問題を解決し続けます。
実験2:「実験1」に基づいて、receiver.javaを変更し続けます。
public void run(){readmessageonce(); // readmessagecontinued();}に変更された
public void run(){// readmessageonce(); readmessagecontinued();}この時点で、プログラムは正常に実行できます。実行中の結果は次のとおりです。