PipedoutputStream dan PipedInputStream
Di Java, PipedoutputStream dan PipedInputStream adalah aliran output pipa dan aliran input pipa, masing -masing.
Fungsinya adalah untuk memungkinkan multithreads berkomunikasi di antara utas melalui pipa. Saat menggunakan komunikasi pipa, pipedoutputStream dan pipedInputStream harus digunakan bersamaan satu sama lain.
Saat menggunakan komunikasi pipa, proses umum adalah: kami menulis data ke pipedoutputStream di utas A, dan data ini akan secara otomatis dikirim ke PipedInputStream yang sesuai dengan PipedoutputStream, dan kemudian disimpan dalam buffer PipedInputStream; Pada saat ini, Thread B membaca data di PipedInputStream. Ini dapat mewujudkan komunikasi antara utas A dan Thread B.
Di bawah ini, kita melihat contoh komunikasi melalui pipa di multithreads. Contohnya termasuk 3 kelas: receiver.java, pipedstreamtest.java dan sender.java.
Kode penerima.java adalah sebagai berikut:
impor java.io.ioException; impor java.io.pipedinputStream; @SuppressWarnings ("all") / *** Utas penerima* / penerima kelas publik memperluas thread {// objek aliran input pipa. // Ini terikat pada objek "PipedoutputStream", // Ini memungkinkan Anda untuk menerima data "PipedoutputStream" dan kemudian membiarkan pengguna membacanya. Private PipedInputStream in = new PipedInputStream (); // Dapatkan "Pipe Input Stream" Object PipedInputStream getInputStream () {return in; } @Override public void run () {readMessageOnce (); // readmessageContinued (); } // Baca data sekali dari "Pipe Input Stream" public void readMessageOnce () {// Meskipun ukuran buf adalah 2048 byte, itu hanya akan membaca paling banyak 1024 byte dari "Pipe Input Stream". // Karena, ukuran buffer "aliran input pipa" hanya 1024 byte secara default. byte [] buf = byte baru [2048]; coba {int len = in.read (buf); System.out.println (string baru (buf, 0, len)); melampirkan(); } catch (ioException e) {e.printstacktrace (); }} // Saat membaca> 1024 byte dari "Pipe Input Stream", berhenti membaca public void readMessageContinued () {int total = 0; while (true) {byte [] buf = byte baru [1024]; coba {int len = in.read (buf); Total += len; System.out.println (string baru (buf, 0, len)); // Jika jumlah total byte yang dibaca adalah> 1024, keluar dari loop. if (total> 1024) break; } catch (ioException e) {e.printstacktrace (); }} coba {in.close (); } catch (ioException e) {e.printstacktrace (); }}} Kode pengirim.java adalah sebagai berikut:
impor java.io.ioException; impor java.io.pipedoutputStream; @SuppressWarnings ("all")/ *** Thread pengirim*/ Pengirim kelas publik memperluas Thread {// Objek aliran output pipa. // Ini terikat pada objek "PipedInputStream", // Ini memungkinkan data untuk dikirim ke data "PipedInputStream", dan pengguna kemudian dapat membaca data dari "PipedInputStream". private pipedoutputStream out = new PipeTutputStream (); // Dapatkan "Pipe Output Stream" Object PipedoutputStream getOutputStream () {return out; } @Override public void run () {writeShortMessage (); // writelongmessage (); } // Tulis pesan singkat ke "Pipa Output Stream": "Ini adalah pesan singkat" pribadi void writeShortMessage () {string strinfo = "Ini adalah pesan singkat"; coba {out.write (strinfo.getbytes ()); out.close (); } catch (ioException e) {e.printstacktrace (); }} // Tulis pesan panjang ke "Pipe Output Stream" private void writelongMessage () {StringBuilder SB = New StringBuilder (); // Tulis 1020 byte melalui loop untuk (int i = 0; i <102; i ++) SB.Append ("0123456789"); // Tulis 26 byte lebih banyak. SB.Append ("ABCDEFGHIJKLMNOPQRSUVWXYZ"); // Total panjang STR adalah 1020+26 = 1046 byte string str = sb.toString (); coba {// tulis 1046 byte ke dalam "Pipe Output Stream" out.write (str.getbytes ()); out.close (); } catch (ioException e) {e.printstacktrace (); }}} Kode PipedStreamTest.java adalah sebagai berikut:
import java.io.PipedInputStream;import java.io.PipedOutputStream;import java.io.IOException;@SuppressWarnings("all") /** * Interactive program for pipeline input stream and pipeline output stream*/ public class PipedStreamTest { public static void main(String[] args) { Sender t1 = new Sender(); Penerima T2 = penerima baru (); PipedOutputStream out = t1.getOutputStream (); PipedInputStream in = t2.getInputStream (); coba {// koneksi pipa. Inti dari dua kalimat berikut adalah sama. //out.connect(in); in.connect (out); /** * Mulai metode kelas utas: * Buat utas mulai dieksekusi; Java Virtual Machine memanggil metode menjalankan utas. * Hasilnya adalah bahwa dua utas berjalan secara bersamaan; Utas saat ini (dikembalikan dari panggilan ke metode start) dan utas lainnya (mengeksekusi metode jalankannya). * Adalah ilegal untuk memulai utas beberapa kali. Terutama ketika utas telah selesai dieksekusi, itu tidak dapat dimulai ulang. */ t1.start (); t2.start (); } catch (ioException e) {e.printstacktrace (); }}} Hasil Menjalankan:
Ini adalah pesan singkat
menjelaskan:
(1) in.connect (out); Associates "Pipe Input Stream" dan "Pipe Output Stream". Periksa kode sumber Connect () di PipedOutputStream.java dan PipedInputStream.java; kita tahu out.connect (in); setara dengan in.connect (out);
(2)
t1.start (); // Mulai utas "pengirim" t2.start (); // Mulai utas "penerima"
Pertama periksa kode sumber pengirim.java, dan jalankan fungsi run () setelah utas dimulai; di run () dari pengirim.java, hubungi writeshortmessage ();
Fungsi writeshortmessage (); adalah menulis data "Ini adalah pesan singkat" ke "aliran output pipa"; Data ini akan diterima oleh "aliran input pipa". Mari kita lihat bagaimana ini tercapai.
Mari pertama -tama lihat kode sumber penulisan (byte b []) dan tentukan di outputStream.java. PipedoutputStream.java mewarisi dari outputStream.java; Kode sumber penulisan (byte b []) di outputStream.java adalah sebagai berikut:
public void write (byte b []) melempar ioException {write (b, 0, b.length);} Faktanya, tulis (byte b []) adalah fungsi call write (byte b [], int off, int len) di pipedoutputStream.java. Melihat kode sumber penulisan (byte b [], int, int len), kami menemukan bahwa itu akan memanggil wastafel. Lebih lanjut melihat definisi menerima (byte b [], int, int len), kita tahu bahwa wastafel. Ukuran default buffer buffer "aliran input pipa" adalah 1024 byte.
Pada titik ini, kita tahu bahwa: t1.start () memulai utas pengirim, dan utas pengirim akan menulis data "Ini adalah pesan singkat" ke "aliran output pipa"; Dan "aliran output pipa" akan mentransfer data ke "aliran input pipa", yaitu, itu akan disimpan dalam buffer "aliran input pipa".
Selanjutnya, kita melihat "Bagaimana Pengguna Membaca Data Dari Buffer 'Pipa Input Stream'". Ini sebenarnya tindakan utas penerima.
t2.start () akan memulai utas penerima, dengan demikian menjalankan fungsi receiver.java (). Melihat kode sumber penerima.java, kita tahu bahwa run () memanggil readmessageOnce ().
ReadMessageOnce () akan menelepon. Baca (BUF) untuk membaca data dari "Pipa Input Stream di" dan simpan ke BUF.
Melalui analisis di atas, kita sudah tahu bahwa data dalam buffer "aliran input pipa di" adalah "ini adalah pesan singkat"; Oleh karena itu, data BUF adalah "ini adalah pesan singkat".
Untuk memperdalam pemahaman pipa. Kami akan melanjutkan dua percobaan kecil berikut.
Eksperimen 1: Modifikasi pengirim.java
Akan
public void run () {writeShortMessage (); // writelongmessage ();} Dimodifikasi menjadi
public void run () {// writeShortMessage (); writelongmessage ();} Jalankan program. Hasil berjalan adalah:
Data ini ditulis ke "aliran output pipa" melalui writelongmessage (), dan kemudian ditransfer ke "aliran input pipa", dan kemudian disimpan dalam buffer "aliran input pipa"; dan kemudian baca dari buffer oleh pengguna.
Kemudian, amati kode sumber WriteLongMessage (). Kita dapat menemukan bahwa panjang STR adalah 1046 byte, dan kemudian hasil berlari hanya 1024 byte! Mengapa ini terjadi?
Alasannya sederhana: ukuran default buffer aliran input pipa adalah 1024 byte. Oleh karena itu, paling banyak, 1024 byte dapat ditulis.
Dengan mengamati kode sumber PipedInputStream.java, kita dapat memahami lebih teliti.
private static final int default_pipe_size = 1024; publinputStream publik () {initpipe (default_pipe_size);} Panggilan konstruktor default initpipe (default_pipe_size), dan kode sumbernya adalah sebagai berikut:
private void initpipe (int pipesize) {if (pipesize <= 0) {lempar baru ilegalargumentException ("ukuran pipa <= 0"); } buffer = byte baru [pipa];} Dari ini, kita dapat mengetahui bahwa ukuran default buffer buffer adalah 1024 byte.
Eksperimen 2: Lanjutkan untuk memodifikasi receiver.java berdasarkan "Eksperimen 1"
Akan
public void run () {ReadMessageOnce (); // readmessageContinued ();} Dimodifikasi menjadi
public void run () {// readMessageOnce (); readmessageContinued ();} Jalankan program. Hasil berjalan adalah:
Hasil ini adalah data lengkap yang ditulis untuk "input buffer".
Pipedwriter dan Pipedreader
PipedWriter adalah aliran output pipa karakter, yang diwarisi dari penulis.
Pipedreader adalah aliran input pipa karakter yang mewarisi dari penulis.
Fungsi Pipedwriter dan Pipedreader adalah untuk berkomunikasi di antara benang melalui pipa. Saat menggunakan komunikasi pipa, PipedWriter dan Pipedreader harus digunakan bersama satu sama lain.
Di bawah ini, kita melihat contoh komunikasi melalui PipedWriter dan Pipedreader di Multithreading. Contohnya termasuk 3 kelas: receiver.java, sender.java dan pipetest.java
Kode penerima.java adalah sebagai berikut:
impor java.io.ioException; impor java.io.pipedreader; @SuppressWarnings ("all") / *** Utas penerima* / penerima kelas publik memperluas thread {// objek aliran input pipa. // Ini terikat pada objek "PipedWriter", // Ini memungkinkan Anda untuk menerima data "PipedWriter" dan kemudian membiarkan pengguna membacanya. private pipedreader di = new pipedreader (); // Dapatkan "Pipa Input Stream Object" Pipedreader Publik GetReader () {return in; } @Override public void run () {readMessageOnce (); // readmessageContinued (); } // Baca data sekali dari "Pipe Input Stream" public void readMessageOnce () {// Meskipun ukuran buf adalah 2048 karakter, itu hanya akan membaca paling banyak 1024 karakter dari "Pipe Input Stream". // Karena, ukuran buffer "aliran input pipa" hanya 1024 karakter secara default. char [] buf = char baru [2048]; coba {int len = in.read (buf); System.out.println (string baru (buf, 0, len)); melampirkan(); } catch (ioException e) {e.printstacktrace (); }} // Saat membaca> 1024 karakter dari "Pipe Input Stream", berhenti membaca public void readMessageContinued () {int total = 0; while (true) {char [] buf = new char [1024]; coba {int len = in.read (buf); Total += len; System.out.println (string baru (buf, 0, len)); // Jika jumlah total karakter yang dibaca adalah> 1024, loop keluar. if (total> 1024) break; } catch (ioException e) {e.printstacktrace (); }} coba {in.close (); } catch (ioException e) {e.printstacktrace (); }}} Kode pengirim.java adalah sebagai berikut:
impor java.io.ioException; impor java.io.pipedwriter; @SuppressWarnings ("all")/ *** Thread pengirim*/ Pengirim kelas publik memperluas Thread {// Objek aliran output pipa. // Ini terikat pada objek "Pipedreader", // Ini memungkinkan data untuk dikirim ke data "PipedReader" dan pengguna kemudian dapat membaca data dari "Pipedreader". Private PipedWriter out = New PipedWriter (); // Dapatkan "Pipe Output Stream" Object Pipedwriter GetWriter () {return out; } @Override public void run () {writeShortMessage (); // writelongmessage (); } // Tulis pesan singkat ke "Pipa Output Stream": "Ini adalah pesan singkat" pribadi void writeShortMessage () {string strinfo = "Ini adalah pesan singkat"; coba {out.write (strinfo.tochararray ()); out.close (); } catch (ioException e) {e.printstacktrace (); }} // Tulis pesan panjang ke "Pipe Output Stream" private void writelongMessage () {StringBuilder SB = New StringBuilder (); // Tulis 1020 karakter melalui loop untuk (int i = 0; i <102; i ++) SB.Append ("0123456789"); // Tulis 26 karakter lagi. SB.Append ("ABCDEFGHIJKLMNOPQRSUVWXYZ"); // Total panjang STR adalah 1020+26 = 1046 karakter string str = sb.toString (); Coba {// tulis 1046 karakter ke dalam "Pipe Output Stream" out.write (str); out.close (); } catch (ioException e) {e.printstacktrace (); }}} Kode Pipetest.java adalah sebagai berikut:
import java.io.PipedReader;import java.io.PipedWriter;import java.io.IOException;@SuppressWarnings("all") /** * Interactive program for pipeline input stream and pipeline output stream*/ public class PipeTest { public static void main(String[] args) { Sender t1 = new Sender(); Penerima T2 = penerima baru (); PipedWriter out = t1.getWriter (); PipedReader di = t2.getReader (); coba {// koneksi pipa. Inti dari dua kalimat berikut adalah sama. //out.connect(in); in.connect (out); /** * Mulai metode kelas utas: * Buat utas mulai dieksekusi; Java Virtual Machine memanggil metode menjalankan utas. * Hasilnya adalah bahwa dua utas berjalan secara bersamaan; Utas saat ini (dikembalikan dari panggilan ke metode start) dan utas lainnya (mengeksekusi metode jalankannya). * Adalah ilegal untuk memulai utas beberapa kali. Terutama ketika utas telah selesai dieksekusi, itu tidak dapat dimulai ulang. */ t1.start (); t2.start (); } catch (ioException e) {e.printstacktrace (); }}} Hasil Menjalankan:
Ini adalah pesan singkat
Deskripsi Hasil:
(1)
in.connect (out);
Fungsinya adalah untuk mengaitkan "aliran input pipa" dan "aliran output pipa". Periksa kode sumber Connect () di PipedWriter.java dan PipedReader.java; kita tahu out.connect (in); setara dengan in.connect (out);
(2)
t1.start (); // Mulai utas "pengirim" t2.start (); // Mulai utas "penerima"
Pertama periksa kode sumber pengirim.java, dan jalankan fungsi run () setelah utas dimulai; di run () dari pengirim.java, hubungi writeshortmessage ();
Fungsi writeshortmessage (); adalah menulis data "Ini adalah pesan singkat" ke "aliran output pipa"; Data ini akan diterima oleh "aliran input pipa". Mari kita lihat bagaimana ini tercapai.
Mari pertama -tama lihat kode sumber penulisan (char char. Pipedwriter.java mewarisi dari writer.java; kode sumber penulisan (char c []) di writer.java adalah sebagai berikut:
public void write (char cbuf []) melempar ioException {write (cbuf, 0, cbuf.length);}
Faktanya, tulis (char c []) adalah fungsi Write (Char C [], int, int len) di PipedWriter.java. Melihat kode sumber penulisan (char c [], int, int len), kami menemukan bahwa itu akan memanggil wastafel. Lebih lanjut melihat definisi menerima (char c [], int, int len), kita tahu bahwa sink. Ukuran default dari buffer buffer "Pipe Input Stream" adalah 1024 karakter.
Pada titik ini, kita tahu bahwa: t1.start () memulai utas pengirim, dan utas pengirim akan menulis data "Ini adalah pesan singkat" ke "aliran output pipa"; Dan "aliran output pipa" akan mentransfer data ke "aliran input pipa", yaitu, itu akan disimpan dalam buffer "aliran input pipa".
Selanjutnya, kita melihat "Bagaimana Pengguna Membaca Data Dari Buffer 'Pipa Input Stream'". Ini sebenarnya tindakan utas penerima.
t2.start () akan memulai utas penerima, dengan demikian menjalankan fungsi receiver.java (). Melihat kode sumber penerima.java, kita tahu bahwa run () memanggil readmessageOnce ().
ReadMessageOnce () akan menelepon. Baca (BUF) untuk membaca data dari "Pipa Input Stream di" dan simpan ke BUF.
Melalui analisis di atas, kita sudah tahu bahwa data dalam buffer "aliran input pipa di" adalah "ini adalah pesan singkat"; Oleh karena itu, data BUF adalah "ini adalah pesan singkat".
Untuk memperdalam pemahaman pipa. Kami akan melanjutkan dua percobaan kecil berikut.
Eksperimen 1: Modifikasi pengirim.java
Akan
public void run () {writeShortMessage (); // writelongmessage ();} Dimodifikasi menjadi
public void run () {// writeShortMessage (); writelongmessage ();} Jalankan program. Hasil operasi adalah sebagai berikut:
Dari ini, kita dapat melihat bahwa program berjalan secara tidak benar! Lempar pengecualian java.io.ioException: pipa tertutup
Mengapa ini terjadi?
Saya akan menganalisis aliran program.
(1) di Pipetest, sambungkan pipa input dan output melalui in.connect (out); Kemudian, mulailah dua utas. T1.Start () Memulai pengirim utas, dan t2.start () memulai penerima utas.
(2) Setelah utas pengirim dimulai, data ditulis ke "pipa output" melalui writelongMessage (), dan out.write (str.tochararray ()) menulis total 1046 karakter. Menurut kode sumber PipedWriter, fungsi Write () dari PipedWriter akan memanggil fungsi penerimaan () dari PipedReader. Melihat fungsi penerimaan () dari PipedReader, kita tahu bahwa Pipedreader akan menyimpan buffer data yang diterima. Jika Anda mengamati fungsi penerimaan () dengan cermat, ada kode berikut:
while (in == out) {if ((readside! = null) &&! readside.isalive ()) {lempar ioException baru ("pipa rusak"); } / * Lengkap: Tendang pembaca yang menunggu * / notifyall (); coba {tunggu (1000); } catch (interruptedException ex) {lempar java.io.interruptedioexception baru (); }} Nilai awal masuk dan keluar masing-masing adalah = -1, out = 0; dikombinasikan dengan di atas sementara (dalam == out). Kita tahu bahwa artinya adalah bahwa setiap kali karakter ditulis ke dalam pipa, kondisi di == out terpenuhi. Kemudian, notifyall () dipanggil untuk membangunkan "utas yang membaca pipa".
Artinya, setiap kali karakter ditulis ke dalam pipa, itu akan memblokir dan menunggu utas lain untuk dibaca.
Namun, ukuran default buffer Pipedreader adalah 1024! Namun, ada 1046 data yang akan ditulis saat ini! Oleh karena itu, paling banyak 1024 karakter dapat ditulis sekaligus.
(03) Setelah utas penerima dimulai, ReadMessageOnce () akan dipanggil untuk membaca aliran input pipa. Membaca 1024 karakter akan dilakukan, dan tutup () akan dipanggil untuk menutup, pipa.
Dari analisis (02) dan (03), dapat dilihat bahwa pengirim perlu menulis 1046 karakter ke dalam pipa. Di antara mereka, 1024 karakter pertama (kapasitas buffer adalah 1024) dapat ditulis secara normal, dan satu dibaca untuk setiap tulisan. Ketika 1025 karakter ditulis, tulis () di PipedWriter.java masih dipanggil secara berurutan; Kemudian, terima () di pipedreader.java dipanggil; Di PipedReader.java, fungsi menerima (int c) pada akhirnya akan dipanggil. Pada saat ini, aliran input pipa telah ditutup, yaitu, closedbyreader benar, jadi lempar ioException baru ("pipa tertutup") dilemparkan.
Kami terus memodifikasi "Uji Satu" untuk menyelesaikan masalah.
Eksperimen 2: Lanjutkan untuk memodifikasi receiver.java berdasarkan "Eksperimen 1".
public void run () {ReadMessageOnce (); // readmessageContinued ();} Dimodifikasi menjadi
public void run () {// readMessageOnce (); readmessageContinued ();} Pada saat ini, program dapat berjalan secara normal. Hasil berjalan adalah: