PipedOutputStream et PipedInputStream
En Java, PipedOutputStream et PipedInputStream sont des flux de sortie de pipeline et des flux d'entrée de pipeline, respectivement.
Leur fonction est de permettre à Multithreads de communiquer entre les fils à travers des pipelines. Lors de l'utilisation de la communication du pipeline, PipedOutputStream et PipedInputStream doivent être utilisés en conjonction les uns avec les autres.
Lorsque vous utilisez la communication du pipeline, le processus général est: nous écrivons des données sur PipEdOutputStream dans le thread A, et ces données seront automatiquement envoyées au PipEdInputStream correspondant au PipEdOutputStream, puis stockées dans le tampon du PipedInputStream; À l'heure actuelle, Thread B lit les données dans PipedInputStream. Cela peut réaliser la communication entre le thread A et le fil B.
Ci-dessous, nous examinons l'exemple de communication via des pipelines en multithreads. Les exemples incluent 3 classes: récepteur.java, pipedstreamtest.java et expéditeur.java.
Le code de récepteur.java est le suivant:
Importer java.io.ioException; import java.io.pipedInputStream; @SuppressWarnings ("all") / ** * Thread du récepteur * / classe publique Le récepteur étend Thread {// Pipeline Enting Stream Object. // Il est lié à l'objet "PIPEDOutputStream", // Cela vous permet de recevoir les données du "PIPEDOutputStream", puis de laisser l'utilisateur le lire. Private PipeDInputStream dans = new PipedInputStream (); // Obtenez le "Stream de tuyau de tuyau" Public PupEdInputStream getInputStream () {return in; } @Override public void run () {readMessageOnce (); // readMessageContinued (); } // Lire les données une fois à partir de "Stream d'entrée de tuyau" public void readMessageOnce () {// Bien que la taille de BUF soit de 2048 octets, il ne lira que au plus 1024 octets de "Stream d'entrée de tuyau". // Parce que la taille du tampon du "flux d'entrée de tuyau" n'est que de 1024 octets par défaut. octet [] buf = nouveau octet [2048]; try {int len = in.read (buf); System.out.println (nouvelle chaîne (buf, 0, len)); joindre(); } catch (ioException e) {e.printStackTrace (); }} // Lorsque vous lisez> 1024 octets à partir de "Stream d'entrée de tuyau", arrêtez la lecture du public réadmessageContinued () {int total = 0; while (true) {byte [] buf = nouveau octet [1024]; try {int len = in.read (buf); Total + = len; System.out.println (nouvelle chaîne (buf, 0, len)); // Si le nombre total d'octets de lecture est> 1024, quittez la boucle. if (total> 1024) pause; } catch (ioException e) {e.printStackTrace (); }} essayez {in.close (); } catch (ioException e) {e.printStackTrace (); }}} Le code de Sender.java est le suivant:
Importer java.io.ioException; import java.io.pipedOutputStream; @SuppressWarnings ("all") / ** * Thread de l'expéditeur * / classe publique Sender étend Thread {// Pipeline Output Stream Object. // Il est lié à l'objet "PipedInputStream", // cela permet d'envoyer des données aux données du "PipedInputStream", et l'utilisateur peut ensuite lire les données du "PipedInputStream". Private PipedOutputStream out = new PipedOutputStream (); // Obtenez le "Stream de sortie du tuyau" Public PupEdOutputStream getOutputStream () {return out; } @Override public void run () {WriteShOrtMessage (); // writeLongMessage (); } // Écrivez un court message dans le "Stream de sortie du tuyau": "Ceci est un court message" Private void WriteShortMessage () {String strinfo = "Ceci est un court message"; essayez {out.write (strinfo.getBytes ()); out.close (); } catch (ioException e) {e.printStackTrace (); }} // Écrivez un long message au "Stream de sortie de tuyau" privé void writeLongMessage () {StringBuilder sb = new StringBuilder (); // Écrivez 1020 octets via une boucle pour (int i = 0; i <102; i ++) sb.append ("0123456789"); // Écrivez 26 octets de plus. SB.APPEND ("ABCDEFGHIJKLMNOPQRSTUVWXYZ"); // La longueur totale de Str est 1020 + 26 = 1046 octets String str = sb.toString (); Essayez {// Écrivez 1046 octets dans le "Stream de sortie du tuyau" out.write (str.getBytes ()); out.close (); } catch (ioException e) {e.printStackTrace (); }}} Le code de pipedstreamtest.java est le suivant:
import java.io.pipedInputStream; import java.io.pipedoutputStream; import java.io.ioexception; @suppresswarnings ("all") / ** * Programme interactif pour le flux d'entrée de pipeline et le flux de sortie du pipeline * / public classPedStreamTest {public static Void Main (String [] args) {Sender T1 = New Sender (); Récepteur t2 = nouveau récepteur (); PipedOutputStream out = t1.getOutputStream (); PipedInputStream dans = t2.GetInputStream (); Essayez la connexion {// du tuyau. L'essence des deux phrases suivantes est la même. //out.connect(in); in.connect (out); / ** * Méthode de démarrage de la classe de threads: * Faites en sorte que le thread commence à exécuter; La machine virtuelle Java appelle la méthode d'exécution du thread. * Le résultat est que deux fils fonctionnent simultanément; le thread actuel (renvoyé de l'appel à la méthode de démarrage) et l'autre thread (exécutant sa méthode d'exécution). * Il est illégal de démarrer plusieurs fois un fil. Surtout lorsque le thread a terminé l'exécution, il ne peut pas être redémarré. * / t1.start (); t2.start (); } catch (ioException e) {e.printStackTrace (); }}} Résultats en cours:
Ceci est un court message
illustrer:
(1) in.connect (out); associe le "flux d'entrée du tuyau" et le "flux de sortie du tuyau". Vérifiez le code source de Connect () dans PIPEDOutputStream.java et PipedInputStream.java; Nous savons.connect (in); est équivalent à in.connect (out);
(2)
t1.start (); // Démarrez le thread "Sender" T2.Start (); // Démarrez le thread "récepteur"
Vérifiez d'abord le code source de Sender.java et exécutez la fonction run () après le démarrage du thread; Dans le run () de Sender.java, appelez WriteShOrtMessage ();
La fonction de WriteShortMessage (); est d'écrire des données "Ceci est un court message" au "Stream de sortie du tuyau"; Ces données seront reçues par le "flux d'entrée de tuyau". Voyons comment cela est réalisé.
Examinons d'abord le code source d'écriture (octet b []) et définissons-le dans outputStream.java. PipedOutputStream.java hérite de outputStream.java; Le code source d'écriture (octet b []) dans OutputStream.java est le suivant:
public void write (byte b []) lève ioException {write (b, 0, b.length);} En fait, l'écriture (octet b []) est la fonction d'appel écrite (byte b [], int off, int len) dans pipedoutputStream.java. En regardant le code source d'écriture (byte b [], int off, int len), nous avons constaté qu'il appellera swier.receive (b, off, len); En regardant en outre la définition de la réception (octet b [], int off, int len), nous savons que Sink.receive (b, off, len) est d'enregistrer les données dans le "flux de sortie du tuyau" dans le tampon du "flux d'entrée du tuyau". La taille par défaut du tampon tampon du "flux d'entrée de tuyau" est de 1024 octets.
À ce stade, nous savons que: t1.start () démarre le thread de l'expéditeur, et le thread de l'expéditeur écrira les données "Ceci est un court message" au "Stream de sortie du tuyau"; et le "flux de sortie du tuyau" transférera les données vers le "flux d'entrée du tuyau", c'est-à-dire qu'il sera enregistré dans le tampon du "flux d'entrée du tuyau".
Ensuite, nous examinons "comment les utilisateurs lisent les données du tampon du" Stream d'entrée de tuyau "". Il s'agit en fait de l'action du fil du récepteur.
T2.Start () Démarrera le thread du récepteur, exécutant ainsi la fonction récepteur.java run (). En regardant le code source de Receiver.java, nous savons que Run () appelle ReadMessageOnce ().
ReadMessageOnce () doit appeler dans.read (buf) pour lire les données du "flux d'entrée de tuyau dans" et les enregistrer sur BUF.
Grâce à l'analyse ci-dessus, nous savons déjà que les données du tampon du "flux d'entrée de tuyau dans" sont "Ceci est un message court"; Par conséquent, les données de BUF sont "Ceci est un court message".
Afin d'approfondir la compréhension du pipeline. Nous continuerons les deux petites expériences suivantes.
Expérience 1: Modifier Sender.java
Volonté
public void run () {WriteShOrtMessage (); // writeLongMessage ();} Modifié à
public void run () {// WriteShOrtMessage (); WriteLongMessage ();} Exécutez le programme. Le résultat en cours est:
Ces données sont écrites dans le "flux de sortie du tuyau" via WinelongMessage (), puis transférées dans le "flux d'entrée du tuyau", puis stockées dans le tampon du "flux d'entrée de tuyau"; puis lisez le tampon par l'utilisateur.
Ensuite, observez le code source de WriteLongMessage (). Nous pouvons constater que la longueur de STR est de 1046 octets, puis le résultat de l'exécution n'est que de 1024 octets! Pourquoi cela se produit-il?
La raison est simple: la taille par défaut du tampon du flux d'entrée du pipeline est de 1024 octets. Par conséquent, au plus, 1024 octets peuvent être écrits.
En observant le code source de PipedInputStream.java, nous pouvons comprendre de manière plus approfondie.
private static final int default_pipe_size = 1024; public PipeDInputStream () {initPipe (default_pipe_size);} Le constructeur par défaut appelle initpipe (default_pipe_size), et son code source est le suivant:
private void initPipe (int pipeSize) {if (piceSize <= 0) {lance un nouveau IllégalArgumentException ("Taille du tuyau <= 0"); } tampon = nouveau octet [pipecesize];} À partir de cela, nous pouvons savoir que la taille par défaut du tampon tampon est de 1024 octets.
Expérience 2: Continuez à modifier le récepteur.java sur la base de "l'expérience 1"
Volonté
public void run () {readMessageOnce (); // readMessageContinued ();} Modifié à
public void run () {// readMessageOnce (); readMessageContinueued ();} Exécutez le programme. Le résultat en cours est:
Ce résultat est les données complètes écrites dans le "tampon d'entrée".
PipedWriter et PipeDreader
PipedWriter est un flux de sortie de pipeline de caractères, qui est hérité de l'écrivain.
PipeDreader est un flux d'entrée de pipeline de caractères qui hérite de l'écrivain.
La fonction de PipedWriter et PipeDreader est de communiquer entre les fils à travers des pipelines. Lors de l'utilisation de la communication du pipeline, PipedWriter et PipeDreader doivent être utilisés en conjonction les uns avec les autres.
Ci-dessous, nous examinons des exemples de communication via PipedWriter et PipeDreader dans Multithreading. Les exemples incluent 3 classes: récepteur.java, expéditeur.java et pipepest.java
Le code de récepteur.java est le suivant:
Importer java.io.ioException; import java.io.pipeDreader; @SuppressWarnings ("all") / ** * Thread du récepteur * / classe publique Le récepteur étend Thread {// Pipeline Enting Stream Object. // Il est lié à l'objet "PipedWriter", // Cela vous permet de recevoir les données du "PipedWriter", puis de laisser l'utilisateur le lire. Private PipeDaDer dans = new PipeDaDer (); // Obtenez "Pipe Enting Stream Object" Public PipeDaDer getReader () {return in; } @Override public void run () {readMessageOnce (); // readMessageContinued (); } // Lisez les données une fois à partir de "Pipe Input Stream" public void readMessageOnce () {// Bien que la taille de BUF soit de 2048 caractères, il ne lira que 1024 caractères de "Pipe d'entrée de flux". // Parce que, la taille du tampon du "flux d'entrée de tuyau" n'est que de 1024 caractères par défaut. char [] buf = nouveau char [2048]; try {int len = in.read (buf); System.out.println (nouvelle chaîne (buf, 0, len)); joindre(); } catch (ioException e) {e.printStackTrace (); }} // Lorsque vous lisez> 1024 caractères de "Pipe Input Stream", arrêtez la lecture du public réadmessageContinueued () {int total = 0; while (true) {char [] buf = new char [1024]; try {int len = in.read (buf); Total + = len; System.out.println (nouvelle chaîne (buf, 0, len)); // Si le nombre total de caractères lus est> 1024, la boucle est sortie. if (total> 1024) pause; } catch (ioException e) {e.printStackTrace (); }} essayez {in.close (); } catch (ioException e) {e.printStackTrace (); }}} Le code de Sender.java est le suivant:
Importer java.io.ioException; import java.io.pipedwriter; @SuppressWarnings ("all") / ** * Thread de l'expéditeur * / classe publique Sender étend Thread {// Pipeline Output Stream Object. // Il est lié à l'objet "PipeDreader", // cela permet d'envoyer des données aux données du "PipeDreader" et l'utilisateur peut ensuite lire les données du "PipeDreader". Private PipedWriter out = new PipedWriter (); // Obtenez le "Stream de sortie du tuyau" Public PipEdWriter GetWriter () {return out; } @Override public void run () {WriteShOrtMessage (); // writeLongMessage (); } // Écrivez un court message dans le "Stream de sortie du tuyau": "Ceci est un court message" Private void WriteShortMessage () {String strinfo = "Ceci est un court message"; essayez {out.write (strinfo.tocharArray ()); out.close (); } catch (ioException e) {e.printStackTrace (); }} // Écrivez un long message au "Stream de sortie de tuyau" privé void writeLongMessage () {StringBuilder sb = new StringBuilder (); // Écrivez 1020 caractères via une boucle pour (int i = 0; i <102; i ++) sb.append ("0123456789"); // Écrivez 26 caractères de plus. SB.APPEND ("ABCDEFGHIJKLMNOPQRSTUVWXYZ"); // La longueur totale de Str est 1020 + 26 = 1046 caractères String str = sb.toString (); Essayez {// Écrivez 1046 caractères dans le "Stream de sortie du tuyau" out.write (str); out.close (); } catch (ioException e) {e.printStackTrace (); }}} Le code de pipepest.java est le suivant:
Importer java.io.pipeDreader; import java.io.pipedwriter; import java.io.ioexception; @SuppressWarnings ("all") / ** * Programme interactif pour le flux d'entrée de pipeline et la sortie du pipeline stream * / public class PipeTest {public static void main (string [] args) {envoyage t1 = new Sender ();); Récepteur t2 = nouveau récepteur (); PipedWriter out = t1.getWriter (); PipeDreader dans = t2.GetReader (); Essayez la connexion {// du tuyau. L'essence des deux phrases suivantes est la même. //out.connect(in); in.connect (out); / ** * Méthode de démarrage de la classe de threads: * Faites en sorte que le thread commence à exécuter; La machine virtuelle Java appelle la méthode d'exécution du thread. * Le résultat est que deux fils fonctionnent simultanément; le thread actuel (renvoyé de l'appel à la méthode de démarrage) et l'autre thread (exécutant sa méthode d'exécution). * Il est illégal de démarrer plusieurs fois un fil. Surtout lorsque le thread a terminé l'exécution, il ne peut pas être redémarré. * / t1.start (); t2.start (); } catch (ioException e) {e.printStackTrace (); }}} Résultats en cours:
Ceci est un court message
Description des résultats:
(1)
in.connect (out);
Sa fonction est d'associer le "flux d'entrée du tuyau" et le "flux de sortie du tuyau". Vérifiez le code source de Connect () dans PipedWriter.Java et PipeDreader.java; Nous savons.connect (in); est équivalent à in.connect (out);
(2)
t1.start (); // Démarrez le thread "Sender" T2.Start (); // Démarrez le thread "récepteur"
Vérifiez d'abord le code source de Sender.java et exécutez la fonction run () après le démarrage du thread; Dans le run () de Sender.java, appelez WriteShOrtMessage ();
La fonction de WriteShortMessage (); est d'écrire des données "Ceci est un court message" au "Stream de sortie du tuyau"; Ces données seront reçues par le "flux d'entrée de tuyau". Voyons comment cela est réalisé.
Examinons d'abord le code source d'écriture (Char Char. Pipedwriter.java hérite de Writer.java; le code source d'écriture (char C []) dans écrivain.java est le suivant:
public void write (char cbuf []) lève ioException {write (cbuf, 0, cbuf.length);}
En fait, l'écriture (char c []) est la fonction d'appel écrite (char c [], int off, int len) dans pipedwriter.java. En regardant le code source d'écriture (char c [], int off, int len), nous avons constaté qu'il appellera swier.receive (cbuf, off, len); En regardant en outre la définition de la réception (char c [], int off, int len), nous savons que Sink.receive (CBUF, OFF, len) est de sauver les données du "flux de sortie du tuyau" dans le tampon du "flux d'entrée du tuyau". La taille par défaut du tampon tampon du "flux d'entrée de tuyau" est de 1024 caractères.
À ce stade, nous savons que: t1.start () démarre le thread de l'expéditeur, et le thread de l'expéditeur écrira les données "Ceci est un court message" au "Stream de sortie du tuyau"; et le "flux de sortie du tuyau" transférera les données vers le "flux d'entrée du tuyau", c'est-à-dire qu'il sera enregistré dans le tampon du "flux d'entrée du tuyau".
Ensuite, nous examinons "comment les utilisateurs lisent les données du tampon du" Stream d'entrée de tuyau "". Il s'agit en fait de l'action du fil du récepteur.
T2.Start () Démarrera le thread du récepteur, exécutant ainsi la fonction récepteur.java run (). En regardant le code source de Receiver.java, nous savons que Run () appelle ReadMessageOnce ().
ReadMessageOnce () doit appeler dans.read (buf) pour lire les données du "flux d'entrée de tuyau dans" et les enregistrer sur BUF.
Grâce à l'analyse ci-dessus, nous savons déjà que les données du tampon du "flux d'entrée de tuyau dans" sont "Ceci est un message court"; Par conséquent, les données de BUF sont "Ceci est un court message".
Afin d'approfondir la compréhension du pipeline. Nous continuerons les deux petites expériences suivantes.
Expérience 1: Modifier Sender.java
Volonté
public void run () {WriteShOrtMessage (); // writeLongMessage ();} Modifié à
public void run () {// WriteShOrtMessage (); WriteLongMessage ();} Exécutez le programme. Les résultats de l'opération sont les suivants:
À partir de cela, nous pouvons voir que le programme fonctionne mal! Lancer une exception java.io.ioException: le tuyau fermé
Pourquoi cela se produit-il?
Je vais analyser le flux du programme.
(1) dans PipetEst, connectez les pipelines d'entrée et de sortie via in.connect (out); Ensuite, commencez deux fils. t1.start () démarre l'expéditeur de thread et t2.start () démarre le récepteur de thread.
(2) Après le démarrage du thread de l'expéditeur, les données sont écrites dans le "pipeline de sortie" via WriteLongMessage (), et Out.Write (str.t.tocharArray ()) écrit un total de 1046 caractères. Selon le code source de PipedWriter, la fonction écrite () de PipedWriter appellera la fonction recevoir () de PipeDreader. En regardant la fonction recevoir () de PipeDreader, nous savons que PipeDreader stockera le tampon de données accepté. Si vous observez attentivement la fonction recevoir (), il y a le code suivant:
while (in == out) {if ((readside! = null) &&! readside.isalive ()) {throw new ioException ("Pipe Broken"); } / * complet: botter les lecteurs en attente * / notifyall (); essayez {attendre (1000); } catch (InterruptedException ex) {lancez new Java.io.InterrupteDioException (); }} Les valeurs initiales de l'intérieur et de l'extérieur sont dans = -1, out = 0, respectivement; combiné avec ce qui précède (in == out). Nous savons que sa signification est que chaque fois qu'un personnage est écrit dans le pipeline, la condition dans == out est remplie. Ensuite, Notifyall () est appelé pour réveiller le "fil qui lit le pipeline".
Autrement dit, chaque fois qu'un personnage est écrit dans le pipeline, il bloquera et attendra que d'autres fils se lisent.
Cependant, la taille par défaut du tampon de PipeDreder est de 1024! Cependant, il y a 1046 données à écrire pour le moment! Par conséquent, au plus 1024 caractères peuvent être écrits à la fois.
(03) Une fois le thread du récepteur démarré, ReadMessageOnce () sera appelé pour lire le flux d'entrée du pipeline. La lecture de 1024 caractères sera effectuée et clôture () sera appelée à fermer, le tuyau.
D'après l'analyse de (02) et (03), on peut voir que l'expéditeur doit écrire 1046 caractères dans le pipeline. Parmi eux, les 1024 premiers caractères (capacité tampon est 1024) peuvent être écrits normalement, et un est lu pour chaque écriture. Lorsque 1025 caractères sont écrits, écrire () dans pipedwriter.java est toujours appelé en séquence; Ensuite, recevoir () dans pipeDreader.java est appelé; Dans PipeDreader.java, la fonction recevoir (int c) sera finalement appelée. À l'heure actuelle, le flux d'entrée du pipeline a été fermé, c'est-à-dire que FermByReader est vrai, donc lancer une nouvelle ioException ("Pipe fermé") est lancé.
Nous continuons à modifier "tester un" pour résoudre le problème.
Expérience 2: Continuez à modifier le récepteur.java sur la base de "l'expérience 1".
public void run () {readMessageOnce (); // readMessageContinued ();} Modifié à
public void run () {// readMessageOnce (); readMessageContinueued ();} Pour le moment, le programme peut fonctionner normalement. Le résultat en cours est: