PipedOutputStream and PipedInputStream
In java, PipedOutputStream and PipedInputStream are pipeline output streams and pipeline input streams, respectively.
Their function is to allow multithreads to communicate between threads through pipelines. When using pipeline communication, PipedOutputStream and PipedInputStream must be used in conjunction with each other.
When using pipeline communication, the general process is: we write data to the PipedOutputStream in thread A, and these data will be automatically sent to the PipedInputStream corresponding to the PipedOutputStream, and then stored in the buffer of the PipedInputStream; at this time, thread B reads the data in the PipedInputStream. This can realize the communication between thread A and thread B.
Below, we look at the example of communication through pipelines in multithreads. The examples include 3 classes: Receiver.java, PipedStreamTest.java and Sender.java.
The code of Receiver.java is as follows:
import java.io.IOException; import java.io.PipedInputStream; @SuppressWarnings("all") /** * Receiver thread*/ public class Receiver extends Thread { // Pipeline input stream object. // It is bound to the "PipedOutputStream" object, // This allows you to receive the data of the "PipedOutputStream" and then let the user read it. private PipedInputStream in = new PipedInputStream(); // Get the "pipe input stream" object public PipedInputStream getInputStream(){ return in; } @Override public void run(){ readMessageOnce(); // readMessageContinued() ; } // read data once from "pipe input stream" public void readMessageOnce(){ // Although the size of buf is 2048 bytes, it will only read at most 1024 bytes from "pipe input stream". // Because, the buffer size of the "pipe input stream" is only 1024 bytes by default. byte[] buf = new byte[2048]; try { int len = in.read(buf); System.out.println(new String(buf,0,len)); in.close(); } catch (IOException e) { e.printStackTrace(); } } // When reading >1024 bytes from "pipe input stream", stop reading public void readMessageContinued() { int total=0; while(true) { byte[] buf = new byte[1024]; try { int len = in.read(buf); total += len; System.out.println(new String(buf,0,len)); // If the total number of bytes read is >1024, exit the loop. if (total > 1024) break; } catch (IOException e) { e.printStackTrace(); } } try { in.close(); } catch (IOException e) { e.printStackTrace(); } }} The code of Sender.java is as follows:
import java.io.IOException; import java.io.PipedOutputStream; @SuppressWarnings("all")/** * Sender thread*/ public class Sender extends Thread { // Pipeline output stream object. // It is bound to the "PipedInputStream" object, // This allows data to be sent to the data of the "PipedInputStream", and the user can then read the data from the "PipedInputStream". private PipedOutputStream out = new PipedOutputStream(); // Get the "pipe output stream" object public PipedOutputStream getOutputStream(){ return out; } @Override public void run(){ writeShortMessage(); //writeLongMessage(); } // Write a short message to the "pipe output stream": "this is a short message" private void writeShortMessage() { String strInfo = "this is a short message" ; try { out.write(strInfo.getBytes()); out.close(); } catch (IOException e) { e.printStackTrace(); } } // Write a long message to the "pipe output stream" private void writeLongMessage() { StringBuilder sb = new StringBuilder(); // Write 1020 bytes through a for loop for (int i=0; i<102; i++) sb.append("0123456789"); // Write 26 bytes more. sb.append("abcdefghijklmnopqrstuvwxyz"); // The total length of str is 1020+26=1046 bytes String str = sb.toString(); try { // Write 1046 bytes into the "pipe output stream" out.write(str.getBytes()); out.close(); } catch (IOException e) { e.printStackTrace(); } }} The code of PipedStreamTest.java is as follows:
import java.io.PipedInputStream;import java.io.PipedOutputStream;import java.io.IOException;@SuppressWarnings("all") /** * Interactive program for pipeline input stream and pipeline output stream*/ public class PipedStreamTest { public static void main(String[] args) { Sender t1 = new Sender(); Receiver t2 = new Receiver(); PipedOutputStream out = t1.getOutputStream(); PipedInputStream in = t2.getInputStream(); try { //Pipe connection. The essence of the following two sentences is the same. //out.connect(in); in.connect(out); /** * START method of the Thread class: * Make the thread start executing; the Java virtual machine calls the thread's run method. * The result is that two threads run concurrently; the current thread (returned from the call to the start method) and the other thread (executing its run method). * It is illegal to start a thread multiple times. Especially when the thread has finished executing, it cannot be restarted. */ t1.start(); t2.start(); } catch (IOException e) { e.printStackTrace(); } }} Running results:
This is a short message
illustrate:
(1) in.connect(out); associates "pipe input stream" and "pipe output stream". Check the source code of connect() in PipedOutputStream.java and PipedInputStream.java; we know out.connect(in); is equivalent to in.connect(out);
(2)
t1.start(); // Start the "Sender" thread t2.start(); // Start the "Receiver" thread
First check the source code of Sender.java, and execute the run() function after the thread is started; in the run() of Sender.java, call writeShortMessage();
The function of writeShortMessage(); is to write data "this is a short message" to the "pipe output stream"; this data will be received by the "pipe input stream". Let's see how this is achieved.
Let’s first look at the source code of write(byte b[]) and define it in OutputStream.java. PipedOutputStream.java inherits from OutputStream.java; the source code of write(byte b[]) in OutputStream.java is as follows:
public void write(byte b[]) throws IOException { write(b, 0, b.length);} In fact, write(byte b[]) is the call write(byte b[], int off, int len) function in PipedOutputStream.java. Looking at the source code of write(byte b[], int off, int len), we found that it will call sink.receive(b, off, len); Further looking at the definition of receive(byte b[], int off, int len), we know that sink.receive(b, off, len) is to save the data in the "pipe output stream" into the buffer of the "pipe input stream". The default size of the buffer buffer of the "pipe input stream" is 1024 bytes.
At this point, we know that: t1.start() starts the Sender thread, and the Sender thread will write the data "this is a short message" to the "pipe output stream"; and the "pipe output stream" will transfer the data to the "pipe input stream", that is, it will be saved in the buffer of the "pipe input stream".
Next, we look at "how users read data from the buffer of the 'pipe input stream'". This is actually the action of the Receiver thread.
t2.start() will start the Receiver thread, thereby executing the Receiver.java run() function. Looking at the source code of Receiver.java, we know that run() calls readMessageOnce().
readMessageOnce() is to call in.read(buf) to read data from the "pipe input stream in" and save it to buf.
Through the above analysis, we already know that the data in the buffer of the "pipe input stream in" is "this is a short message"; therefore, the data of buf is "this is a short message".
In order to deepen the understanding of the pipeline. We will continue the following two small experiments.
Experiment 1: Modify Sender.java
Will
public void run(){ writeShortMessage(); //writeLongMessage();} Modified to
public void run(){ //writeShortMessage(); writeLongMessage();} Run the program. The running result is:
These data are written to the "pipe output stream" through writeLongMessage(), and then transferred to the "pipe input stream", and then stored in the buffer of the "pipe input stream"; and then read out from the buffer by the user.
Then, observe the source code of writeLongMessage(). We can find that the length of str is 1046 bytes, and then the result of running is only 1024 bytes! Why is this happening?
The reason is simple: the default size of the buffer of the pipeline input stream is 1024 bytes. Therefore, at most, 1024 bytes can be written.
By observing the source code of PipedInputStream.java, we can understand more thoroughly.
private static final int DEFAULT_PIPE_SIZE = 1024;public PipedInputStream() { initPipe(DEFAULT_PIPE_SIZE);} The default constructor calls initPipe(DEFAULT_PIPE_SIZE), and its source code is as follows:
private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize];} From this, we can know that the default size of the buffer buffer is 1024 bytes.
Experiment 2: Continue to modify Receiver.java on the basis of "Experiment 1"
Will
public void run(){ readMessageOnce() ; //readMessageContinued() ;} Modified to
public void run(){ //readMessageOnce() ; readMessageContinued() ;} Run the program. The running result is:
This result is the complete data written to the "input buffer".
PipedWriter and PipedReader
PipedWriter is a character pipeline output stream, which is inherited from Writer.
PipedReader is a character pipeline input stream that inherits from Writer.
The function of PipedWriter and PipedReader is to communicate between threads through pipelines. When using pipeline communication, PipedWriter and PipedReader must be used in conjunction with each other.
Below, we look at examples of communication through PipedWriter and PipedReader in multithreading. The examples include 3 classes: Receiver.java, Sender.java and PipeTest.java
The code of Receiver.java is as follows:
import java.io.IOException; import java.io.PipedReader; @SuppressWarnings("all") /** * Receiver thread*/ public class Receiver extends Thread { // Pipeline input stream object. // It is bound to the "PipedWriter" object, // This allows you to receive the data of the "PipedWriter" and then let the user read it. private PipedReader in = new PipedReader(); // Get "pipe input stream object" public PipedReader getReader(){ return in; } @Override public void run(){ readMessageOnce(); // readMessageContinued() ; } // read data once from "pipe input stream" public void readMessageOnce(){ // Although the size of buf is 2048 characters, it will only read at most 1024 characters from "pipe input stream". // Because, the buffer size of the "pipe input stream" is only 1024 characters by default. char[] buf = new char[2048]; try { int len = in.read(buf); System.out.println(new String(buf,0,len)); in.close(); } catch (IOException e) { e.printStackTrace(); } } // When reading >1024 characters from "pipe input stream", stop reading public void readMessageContinued(){ int total=0; while(true) { char[] buf = new char[1024]; try { int len = in.read(buf); total += len; System.out.println(new String(buf,0,len)); // If the total number of characters read is >1024, the loop is exited. if (total > 1024) break; } catch (IOException e) { e.printStackTrace(); } } try { in.close(); } catch (IOException e) { e.printStackTrace(); } } } The code of Sender.java is as follows:
import java.io.IOException; import java.io.PipedWriter; @SuppressWarnings("all")/** * Sender thread*/ public class Sender extends Thread { // Pipeline output stream object. // It is bound to the "PipedReader" object, // This allows data to be sent to the data of the "PipedReader" and the user can then read the data from the "PipedReader". private PipedWriter out = new PipedWriter(); // Get the "pipe output stream" object public PipedWriter getWriter(){ return out; } @Override public void run(){ writeShortMessage(); //writeLongMessage(); } // Write a short message to the "pipe output stream": "this is a short message" private void writeShortMessage() { String strInfo = "this is a short message" ; try { out.write(strInfo.toCharArray()); out.close(); } catch (IOException e) { e.printStackTrace(); } } // Write a long message to the "pipe output stream" private void writeLongMessage() { StringBuilder sb = new StringBuilder(); // Write 1020 characters through a for loop for (int i=0; i<102; i++) sb.append("0123456789"); // Write 26 characters more. sb.append("abcdefghijklmnopqrstuvwxyz"); // The total length of str is 1020+26=1046 characters String str = sb.toString(); try { // Write 1046 characters into the "pipe output stream" out.write(str); out.close(); } catch (IOException e) { e.printStackTrace(); } }} The code of PipeTest.java is as follows:
import java.io.PipedReader;import java.io.PipedWriter;import java.io.IOException;@SuppressWarnings("all") /** * Interactive program for pipeline input stream and pipeline output stream*/ public class PipeTest { public static void main(String[] args) { Sender t1 = new Sender(); Receiver t2 = new Receiver(); PipedWriter out = t1.getWriter(); PipedReader in = t2.getReader(); try { //Pipe connection. The essence of the following two sentences is the same. //out.connect(in); in.connect(out); /** * START method of the Thread class: * Make the thread start executing; the Java virtual machine calls the thread's run method. * The result is that two threads run concurrently; the current thread (returned from the call to the start method) and the other thread (executing its run method). * It is illegal to start a thread multiple times. Especially when the thread has finished executing, it cannot be restarted. */ t1.start(); t2.start(); } catch (IOException e) { e.printStackTrace(); } }} Running results:
This is a short message
Results description:
(1)
in.connect(out);
Its function is to associate the "pipe input stream" and the "pipe output stream". Check the source code of connect() in PipedWriter.java and PipedReader.java; we know out.connect(in); is equivalent to in.connect(out);
(2)
t1.start(); // Start the "Sender" thread t2.start(); // Start the "Receiver" thread
First check the source code of Sender.java, and execute the run() function after the thread is started; in the run() of Sender.java, call writeShortMessage();
The function of writeShortMessage(); is to write data "this is a short message" to the "pipe output stream"; this data will be received by the "pipe input stream". Let's see how this is achieved.
Let’s first look at the source code of write(char char. PipedWriter.java inherits from Writer.java; the source code of write(char c[]) in Writer.java is as follows:
public void write(char cbuf[]) throws IOException { write(cbuf, 0, cbuf.length);}
In fact, write(char c[]) is the call write(char c[], int off, int len) function in PipedWriter.java. Looking at the source code of write(char c[], int off, int len), we found that it will call sink.receive(cbuf, off, len); Further looking at the definition of receive(char c[], int off, int len), we know that sink.receive(cbuf, off, len) is to save the data in the "pipe output stream" into the buffer of the "pipe input stream". The default size of the buffer buffer of "pipe input stream" is 1024 characters.
At this point, we know that: t1.start() starts the Sender thread, and the Sender thread will write the data "this is a short message" to the "pipe output stream"; and the "pipe output stream" will transfer the data to the "pipe input stream", that is, it will be saved in the buffer of the "pipe input stream".
Next, we look at "how users read data from the buffer of the 'pipe input stream'". This is actually the action of the Receiver thread.
t2.start() will start the Receiver thread, thereby executing the Receiver.java run() function. Looking at the source code of Receiver.java, we know that run() calls readMessageOnce().
readMessageOnce() is to call in.read(buf) to read data from the "pipe input stream in" and save it to buf.
Through the above analysis, we already know that the data in the buffer of the "pipe input stream in" is "this is a short message"; therefore, the data of buf is "this is a short message".
In order to deepen the understanding of the pipeline. We will continue the following two small experiments.
Experiment 1: Modify Sender.java
Will
public void run(){ writeShortMessage(); //writeLongMessage();} Modified to
public void run(){ //writeShortMessage(); writeLongMessage();} Run the program. The operation results are as follows:
From this, we can see that the program runs incorrectly! Throw exception java.io.IOException: Pipe closed
Why is this happening?
I'll analyze the program flow.
(1) In PipeTest, connect the input and output pipelines through in.connect(out); then, start two threads. t1.start() starts the thread Sender, and t2.start() starts the thread Receiver.
(2) After the Sender thread is started, the data is written to the "output pipeline" through writeLongMessage(), and out.write(str.toCharArray()) writes a total of 1046 characters. According to the source code of PipedWriter, the write() function of PipedWriter will call the receive() function of PipedReader. Looking at the receive() function of PipedReader, we know that PipedReader will store the accepted data buffer. If you observe the receive() function carefully, there is the following code:
while (in == out) { if ((readSide != null) && !readSide.isAlive()) { throw new IOException("Pipe broken"); } /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); }} The initial values of in and out are in=-1, out=0, respectively; combined with the above while(in==out). We know that its meaning is that every time a character is written into the pipeline, the condition in==out is met. Then, notifyAll() is called to wake up the "thread that reads the pipeline".
That is, every time a character is written into the pipeline, it will block and wait for other threads to read.
However, the default size of the buffer of PipedReader is 1024! However, there are 1046 data to be written at this time! Therefore, at most 1024 characters can be written at a time.
(03) After the Receiver thread is started, readMessageOnce() will be called to read the pipeline input stream. Reading 1024 characters will be done, and close() will be called to close, pipe.
From the analysis of (02) and (03), it can be seen that Sender needs to write 1046 characters into the pipeline. Among them, the first 1024 characters (buffer capacity is 1024) can be written normally, and one is read for each write. When 1025 characters are written, write() in PipedWriter.java is still called in sequence; then, receive() in PipedReader.java is called; in PipedReader.java, the receive(int c) function will eventually be called. At this time, the pipeline input stream has been closed, that is, closedByReader is true, so throw new IOException("Pipe closed") is thrown.
We continue to modify "Test One" to solve the problem.
Experiment 2: Continue to modify Receiver.java on the basis of "Experiment 1".
public void run(){ readMessageOnce() ; //readMessageContinued() ;} Modified to
public void run(){ //readMessageOnce() ; readMessageContinued() ;} At this time, the program can run normally. The running result is: