PipedOutputStream e PipedInputStream
Em Java, PipedOutputStream e PipedInputStream são fluxos de saída do pipeline e fluxos de entrada de tubulação, respectivamente.
Sua função é permitir que os multithreads se comuniquem entre threads através de pipelines. Ao usar a comunicação do pipeline, o PipedOutputStream e o PipedInputStream devem ser usados em conjunto.
Ao usar a comunicação do pipeline, o processo geral é: Escrevemos dados no PipedOutputStream no encadeamento A e esses dados serão enviados automaticamente para o PipedInputStream correspondente ao PipedOutputStream e, em seguida, armazenados no buffer do PipedEdInputStream; Neste momento, o Thread B lê os dados no PipedInputStream. Isso pode perceber a comunicação entre o thread A e o Thread B.
Abaixo, analisamos o exemplo de comunicação através de pipelines em multithreads. Os exemplos incluem 3 classes: Receiver.java, PipedStreamtest.java e sender.java.
O Código de Receiver.java é o seguinte:
importar java.io.ioException; importar java.io.pipedInputStream; @Suppresswarnings ("all") / *** Tópico do receptor* / public class Receiver estende o thread {// Objeto de fluxo de entrada da tubulação. // Ele está ligado ao objeto "PipedOutputStream", // permite que você receba os dados do "PipedOutputStream" e deixe o usuário ler. private PipedInputStream em = new PipedInputStream (); // Obtenha o objeto "fluxo de entrada do tubo" public PipedInputStream getInputStream () {return in; } @Override public void run () {readMessageOnce (); // readMessageContinued (); } // Leia os dados uma vez a partir de "Pipe Input Stream" public void readMessageOnce () {//, embora o tamanho do BUF seja 2048 bytes, ele só lerá no máximo 1024 bytes de "fluxo de entrada de tubos". // Porque, o tamanho do buffer do "fluxo de entrada do tubo" é de apenas 1024 bytes por padrão. byte [] buf = novo byte [2048]; tente {int len = in.read (buf); System.out.println (new String (BUF, 0, Len)); in.Close (); } catch (ioexception e) {e.printStackTrace (); }} // Ao ler> 1024 bytes do "Pipe Inpit Stream", pare de ler o público vazio readMessageContinued () {int total = 0; while (true) {byte [] buf = novo byte [1024]; tente {int len = in.read (buf); total += len; System.out.println (new String (BUF, 0, Len)); // Se o número total de bytes lido for> 1024, saia do loop. if (total> 1024) quebra; } catch (ioexception e) {e.printStackTrace (); }} tente {in.close (); } catch (ioexception e) {e.printStackTrace (); }}} O código do sender.java é o seguinte:
importar java.io.ioException; importar java.io.pipedOutputStream; @Suppresswarnings ("all")/ *** threads de remetente*/ public class Remetes estende o thread {// Objeto de fluxo de saída do pipeline. // Ele está ligado ao objeto "PipedInputStream", // Isso permite que os dados sejam enviados para os dados do "PipedInputStream", e o usuário pode ler os dados do "PipedInputStream". private pipedOutputStream Out = new PipedOutputStream (); // Obtenha o objeto "Stream de saída do tubo" public PiDedOutputStream getOutputStream () {return Out; } @Override public void run () {writeshortMessage (); // writeLongMessage (); } // Escreva uma mensagem curta para o "fluxo de saída do tubo": "Esta é uma mensagem curta" private void writeShortMessage () {string strinfo = "Esta é uma mensagem curta"; tente {out.write (strinfo.getBytes ()); out.Close (); } catch (ioexception e) {e.printStackTrace (); }} // Escreva uma mensagem longa para o "fluxo de saída do tubo" private void writeLongMessage () {stringbuilder sb = new stringbuilder (); // escreva 1020 bytes através de um loop for (int i = 0; i <102; i ++) sb.append ("0123456789"); // Escreva 26 bytes mais. sb.append ("abcdefghijklmnopqrstuvwxyz"); // O comprimento total de STR é 1020+26 = 1046 Bytes String str = sb.toString (); tente {// escreva 1046 bytes no "fluxo de saída do tubo" out.write (str.getBytes ()); out.Close (); } catch (ioexception e) {e.printStackTrace (); }}} O código de PipedStreamtest.java é o seguinte:
importar java.io.pipeDInputStream; importar java.io.pipedOutputStream; importar java.io.ioexception; @suppresswarnings ("all") / *** Programa interativo para fluxo de entrada de pipeline e fluxo de saída do tubo); Receptor T2 = novo receptor (); PipedOutputStream Out = t1.getOutputStream (); PipedInputStream in = t2.getInputStream (); tente {// conexão de tubo. A essência das duas frases a seguir é a mesma. //out.connect(in); in.Connect (Out); /** * Método de início da classe Thread: * Faça o thread comece a executar; A máquina virtual Java chama o método de execução do Thread. * O resultado é que dois threads são executados simultaneamente; O thread atual (retornado da chamada para o método inicial) e o outro thread (executando seu método de execução). * É ilegal iniciar um tópico várias vezes. Especialmente quando o tópico terminou de executar, ele não pode ser reiniciado. */ t1.start (); t2.start (); } catch (ioexception e) {e.printStackTrace (); }}} Resultados em execução:
Esta é uma mensagem curta
ilustrar:
(1) in.Connect (Out); Associados "fluxo de entrada de tubo" e "fluxo de saída de tubo". Verifique o código -fonte de Connect () em PipedOutputStream.java e PipedInputStream.java; conhecemos out.Connect (in); é equivalente a in.connect (out);
(2)
t1.start (); // Inicie o thread "remetente" t.start (); // Inicie o tópico "receptor"
Primeiro verifique o código -fonte do remetente.java e execute a função run () após o início do thread; Na corrida () do sender.java, ligue para WritEShortMessage ();
A função de writeshortMessage (); é escrever dados "Esta é uma mensagem curta" para o "fluxo de saída do tubo"; Esses dados serão recebidos pelo "fluxo de entrada do tubo". Vamos ver como isso é alcançado.
Vamos primeiro olhar para o código -fonte de gravação (byte b []) e defini -lo no outputStream.java. PipedOutputStream.java herda do outputStream.java; O código -fonte de gravação (byte b []) em outputStream.java é o seguinte:
public void write (byte b []) lança ioexception {write (b, 0, b.length);} De fato, Write (byte b []) é a função Write de chamada (byte b [], int off, int len) em PipedOutputStream.java. Olhando para o código -fonte de gravação (byte b [], int off, int len), descobrimos que ele chamará Sink.receive (B, Off, Len); Observando ainda mais a definição de recebimento (byte B [], int Off, Int Len), sabemos que Sink.Receive (B, Off, Len) é salvar os dados no "fluxo de saída do tubo" no buffer do "fluxo de entrada do tubo". O tamanho padrão do buffer do buffer do "fluxo de entrada do tubo" é de 1024 bytes.
Neste ponto, sabemos que: t1.start () inicia o segmento do remetente e o thread do remetente escreverá os dados "Esta é uma mensagem curta" para o "fluxo de saída do tubo"; e o "fluxo de saída do tubo" transferirá os dados para o "fluxo de entrada do tubo", ou seja, ele será salvo no buffer do "fluxo de entrada do tubo".
Em seguida, analisamos "como os usuários leem os dados do buffer do 'fluxo de entrada do tubo'". Esta é realmente a ação do thread do receptor.
t2.start () iniciará o thread do receptor, executando assim a função Receiver.java run (). Olhando para o código -fonte do receptor.java, sabemos que Run () chama ReadMessageOnce ().
ReadMessageOnce () deve ligar para in.read (BUF) para ler dados do "fluxo de entrada do tubo em" e salvá -los no BUF.
Através da análise acima, já sabemos que os dados no buffer do "fluxo de entrada do tubo em" é "Esta é uma mensagem curta"; Portanto, os dados do BUF são "esta é uma mensagem curta".
Para aprofundar o entendimento do pipeline. Continuaremos os dois pequenos experimentos a seguir.
Experiência 1: modifique o remetente.java
Vai
public void run () {writeshortMessage (); // writeLongMessage ();} Modificado para
public void run () {// writeshortMessage (); writeLongMessage ();} Execute o programa. O resultado em execução é:
Esses dados são gravados no "fluxo de saída do tubo" através do writeLongMessage () e, em seguida, transferidos para o "fluxo de entrada do tubo" e depois armazenados no buffer do "fluxo de entrada do tubo"; e depois leia do buffer pelo usuário.
Em seguida, observe o código -fonte do WRITELONGMESSAGE (). Podemos descobrir que o comprimento do STR é de 1046 bytes e, em seguida, o resultado da corrida é de apenas 1024 bytes! Por que isso está acontecendo?
O motivo é simples: o tamanho padrão do buffer do fluxo de entrada do pipeline é de 1024 bytes. Portanto, no máximo, 1024 bytes podem ser escritos.
Ao observar o código -fonte do PipedInputStream.java, podemos entender mais minuciosamente.
private estático final int default_pipe_size = 1024; public pipedInputStream () {initpipe (default_pipe_size);} O construtor padrão chama initpipe (default_pipe_size), e seu código -fonte é o seguinte:
private void initpipe (int pipeSize) {if (tubpesize <= 0) {lança novo ilegalArgumentException ("tamanho do tubo <= 0"); } buffer = novo byte [PipeSize];} A partir disso, podemos saber que o tamanho padrão do buffer é de 1024 bytes.
Experiência 2: Continue a modificar o receptor.java com base no "Experimento 1"
Vai
public void run () {readMessageOnce (); // readmessageContinued ();} Modificado para
public void run () {// readMessageOnce (); ReadMessageContinued ();} Execute o programa. O resultado em execução é:
Este resultado são os dados completos gravados no "buffer de entrada".
PipedWriter e PipedReader
PipedWriter é um fluxo de saída de tubulação de caracteres, herdado do escritor.
O PipedReader é um fluxo de entrada de tubulação de caracteres que herda do Writer.
A função do PipedWriter e PipedReader é comunicar entre threads através de pipelines. Ao usar a comunicação do pipeline, o PipedWriter e o PipedReader devem ser usados em conjunto.
Abaixo, analisamos exemplos de comunicação através do PipedWriter e PipedReader no multithreading. Os exemplos incluem 3 classes: Receiver.java, sender.java e pipetest.java
O Código de Receiver.java é o seguinte:
importar java.io.ioException; importar java.io.pipedReader; @Suppresswarnings ("all") / *** Tópico do receptor* / public class Receiver estende o thread {// Objeto de fluxo de entrada da tubulação. // Ele está vinculado ao objeto "PipedWriter", // Isso permite que você receba os dados do "PipedWriter" e deixe o usuário ler. Private PipedReader in = new PipedReader (); // Obtenha "Objeto de fluxo de entrada do tubo" public PipedReader getReader () {return in; } @Override public void run () {readMessageOnce (); // readMessageContinued (); } // Leia os dados uma vez a partir de "Pipe Input Stream" public void readMessageOnce () {//, embora o tamanho do BUF seja de 2048 caracteres, ele lerá apenas no máximo 1024 caracteres de "Pipe Inpit Stream". // Porque, o tamanho do buffer do "fluxo de entrada do tubo" é de apenas 1024 caracteres por padrão. char [] buf = novo char [2048]; tente {int len = in.read (buf); System.out.println (new String (BUF, 0, Len)); in.Close (); } catch (ioexception e) {e.printStackTrace (); }} // Ao ler> 1024 caracteres do "Pipe Inpit Stream", pare de ler o public void readMessageContinued () {int total = 0; while (true) {char [] buf = novo char [1024]; tente {int len = in.read (buf); total += len; System.out.println (new String (BUF, 0, Len)); // Se o número total de caracteres lido é> 1024, o loop será excitado. if (total> 1024) quebra; } catch (ioexception e) {e.printStackTrace (); }} tente {in.close (); } catch (ioexception e) {e.printStackTrace (); }}} O código do sender.java é o seguinte:
importar java.io.ioException; importar java.io.pipedWriter; @Suppresswarnings ("all")/ *** threads de remetente*/ public class Remetes estende o thread {// Objeto de fluxo de saída do pipeline. // Ele está vinculado ao objeto "PipedReader", // Isso permite que os dados sejam enviados para os dados do "PipedReader" e o usuário pode ler os dados do "PipedReader". Private PipedWriter Out = new PipedWriter (); // Obtenha o objeto "Stream de saída do tubo" public PipedWriter getWriter () {return Out; } @Override public void run () {writeshortMessage (); // writeLongMessage (); } // Escreva uma mensagem curta para o "fluxo de saída do tubo": "Esta é uma mensagem curta" private void writeShortMessage () {string strinfo = "Esta é uma mensagem curta"; tente {out.write (strinfo.toCharArray ()); out.Close (); } catch (ioexception e) {e.printStackTrace (); }} // Escreva uma mensagem longa para o "fluxo de saída do tubo" private void writeLongMessage () {stringbuilder sb = new stringbuilder (); // Escreva 1020 caracteres através de um loop para (int i = 0; i <102; i ++) sb.append ("0123456789"); // Escreva 26 caracteres mais. sb.append ("abcdefghijklmnopqrstuvwxyz"); // O comprimento total de STR é 1020+26 = 1046 caracteres String str = sb.toString (); tente {// escreva 1046 caracteres no "fluxo de saída do tubo" out.write (str); out.Close (); } catch (ioexception e) {e.printStackTrace (); }}} O código do pipetest.java é o seguinte:
importar java.io.pipedReader; importar java.io.pipedWriter; importar java.io.ioexception; @suppresswarnings ("all") / *** Programa interativo para fluxo de entrada de pipeline e fluxo de saída de tubulação* / public classe PipeTest {public static void main (string [] args) {session t1 t1 t) {send) Receptor T2 = novo receptor (); Pipedwriter out = t1.getWriter (); PipedReader in = t2.getReader (); tente {// conexão de tubo. A essência das duas frases a seguir é a mesma. //out.connect(in); in.Connect (Out); /** * Método de início da classe Thread: * Faça o thread comece a executar; A máquina virtual Java chama o método de execução do Thread. * O resultado é que dois threads são executados simultaneamente; O thread atual (retornado da chamada para o método inicial) e o outro thread (executando seu método de execução). * É ilegal iniciar um tópico várias vezes. Especialmente quando o tópico terminou de executar, ele não pode ser reiniciado. */ t1.start (); t2.start (); } catch (ioexception e) {e.printStackTrace (); }}} Resultados em execução:
Esta é uma mensagem curta
Resultados Descrição:
(1)
in.Connect (Out);
Sua função é associar o "fluxo de entrada do tubo" e o "fluxo de saída do tubo". Verifique o código -fonte de Connect () em PipedWriter.java e PipedReader.java; conhecemos out.Connect (in); é equivalente a in.connect (out);
(2)
t1.start (); // Inicie o thread "remetente" t.start (); // Inicie o tópico "receptor"
Primeiro verifique o código -fonte do remetente.java e execute a função run () após o início do thread; Na corrida () do sender.java, ligue para WritEShortMessage ();
A função de writeshortMessage (); é escrever dados "Esta é uma mensagem curta" para o "fluxo de saída do tubo"; Esses dados serão recebidos pelo "fluxo de entrada do tubo". Vamos ver como isso é alcançado.
Vamos primeiro olhar para o código -fonte de gravação (char.
public void write (char cbuf []) lança ioexception {write (cbuf, 0, cbuf.length);}
De fato, Write (char c []) é a função de gravação de chamadas (char c [], int off, int len) em PipedWriter.java. Olhando para o código -fonte de gravação (char c [], int off, int len), descobrimos que ele chamará Sink.receive (CBUF, Off, Len); Observando ainda mais a definição de recebimento (char c [], int off, int len), sabemos que o Sink.Receive (CBUF, OFF, LEN) é salvar os dados no "fluxo de saída do tubo" no buffer do "fluxo de entrada do tubo". O tamanho padrão do buffer de "fluxo de entrada do tubo" é de 1024 caracteres.
Neste ponto, sabemos que: t1.start () inicia o segmento do remetente e o thread do remetente escreverá os dados "Esta é uma mensagem curta" para o "fluxo de saída do tubo"; e o "fluxo de saída do tubo" transferirá os dados para o "fluxo de entrada do tubo", ou seja, ele será salvo no buffer do "fluxo de entrada do tubo".
Em seguida, analisamos "como os usuários leem os dados do buffer do 'fluxo de entrada do tubo'". Esta é realmente a ação do thread do receptor.
t2.start () iniciará o thread do receptor, executando assim a função Receiver.java run (). Olhando para o código -fonte do receptor.java, sabemos que Run () chama ReadMessageOnce ().
ReadMessageOnce () deve ligar para in.read (BUF) para ler dados do "fluxo de entrada do tubo em" e salvá -los no BUF.
Através da análise acima, já sabemos que os dados no buffer do "fluxo de entrada do tubo em" é "Esta é uma mensagem curta"; Portanto, os dados do BUF são "esta é uma mensagem curta".
Para aprofundar o entendimento do pipeline. Continuaremos os dois pequenos experimentos a seguir.
Experiência 1: modifique o remetente.java
Vai
public void run () {writeshortMessage (); // writeLongMessage ();} Modificado para
public void run () {// writeshortMessage (); writeLongMessage ();} Execute o programa. Os resultados da operação são os seguintes:
A partir disso, podemos ver que o programa funciona incorretamente! Exceção de arremesso java.io.ioException: tubo fechado
Por que isso está acontecendo?
Vou analisar o fluxo do programa.
(1) No pipetest, conecte os pipelines de entrada e saída através do in.Connect (OUT); Em seguida, inicie dois threads. t1.start () inicia o remetente do encadeamento e t2.start () inicia o receptor do thread.
(2) Depois que o thread do remetente for iniciado, os dados são gravados no "Pipeline de saída" através do WriteLongMessage () e OUT.Write (str.ToCharArray ()) escreve um total de 1046 caracteres. De acordo com o código -fonte do PipedWriter, a função Write () do PipedWriter chamará a função RECEBE () do PipedReader. Olhando para a função Receber () do PipedReader, sabemos que o PipedReader armazenará o buffer de dados aceito. Se você observar a função Receber (), existe o seguinte código:
while (in == out) {if ((readside! = null) &&! readside.isalive ()) {lança a nova ioexception ("Pipe Broken"); } / * completo: chute todos os leitores em espera * / notifyAll (); tente {wait (1000); } catch (interruptedException ex) {tiro novo java.io.interruptedioException (); }} Os valores iniciais de IN e Out estão em = -1, out = 0, respectivamente; combinado com o acima, enquanto (in == out). Sabemos que seu significado é que toda vez que um personagem é escrito no pipeline, a condição em == out é atendida. Em seguida, notifyAll () é chamado para acordar o "thread que lê o pipeline".
Ou seja, toda vez que um personagem é escrito no pipeline, ele bloqueará e aguarda outros threads ler.
No entanto, o tamanho padrão do buffer do PipedReader é 1024! No entanto, existem 1046 dados a serem escritos no momento! Portanto, no máximo 1024 caracteres podem ser escritos por vez.
(03) Depois que o thread do receptor for iniciado, o ReadMessageOnce () será chamado para ler o fluxo de entrada do pipeline. A leitura de 1024 caracteres será feita e fechará () será chamado para fechar, o tubo.
A partir da análise de (02) e (03), pode -se ver que o remetente precisa escrever 1046 caracteres no pipeline. Entre eles, os primeiros 1024 caracteres (a capacidade do buffer é 1024) podem ser escritos normalmente e um é lido para cada gravação. Quando 1025 caracteres são escritos, Write () em PipedWriter.java ainda é chamado em sequência; então, receba () no PipedReader.java é chamado; No PipedReader.java, a função Receber (int c) será chamada. Neste momento, o fluxo de entrada do pipeline foi fechado, ou seja, o FechbyReader é verdadeiro, então jogue a nova ioexception ("Pipe fechado") é lançado.
Continuamos a modificar "teste um" para resolver o problema.
Experiência 2: Continue a modificar o receptor.java com base no "Experimento 1".
public void run () {readMessageOnce (); // readmessageContinued ();} Modificado para
public void run () {// readMessageOnce (); ReadMessageContinued ();} Neste momento, o programa pode ser executado normalmente. O resultado em execução é: