1. Introducción
Este artículo presenta los métodos básicos para desarrollar programas utilizando transmisiones Node.js.
<código> "Deberíamos tener algunas formas de conectar programas como Garden Hose: bisear en otro segmento cuando sea necesario masajear los datos de otra manera. Esta es la forma de IO también". Doug McIlroy. 11 de octubre de 1964 </code>
El primero en entrar en contacto con Stream fueron las décadas de práctica que comenzaron con el UNIX temprano, lo que demostró que las ideas de Stream simplemente pueden desarrollar algunos sistemas enormes. En Unix, Stream se implementa a través de | En el nodo, como módulo de flujo incorporado, se utilizan muchos módulos centrales y módulos tres partes. Al igual que UNIX, la operación principal de la transmisión de nodo es .Pipe (), y los usuarios pueden usar el mecanismo de contrapresión para controlar el equilibrio entre lectura y escritura.
Stream puede proporcionar a los desarrolladores una interfaz unificada que pueda reutilizar y controlar el equilibrio de lectura y escritura entre las transmisiones a través de interfaces de transmisión abstracta.
2. ¿Por qué usar Stream
La E/S en el nodo es asíncrona, por lo que leer y escribir en el disco y la red requiere leer y escribir a los datos a través de funciones de devolución de llamada. El siguiente es un código simple para un servidor de descarga de archivos:
<code> var http = require ('http'); var fs = require ('fs'); var server = http.createServer (function (req, res) {fs.ReadFile (__ dirname + '/Data.txt', function (err, data) {res.end (data);});}); server.listen (8000);Estos códigos pueden implementar las funciones requeridas, pero el servicio debe almacenar en caché los datos completos de la memoria antes de enviar los datos del archivo. Si el archivo "data.txt" es grande y la concurrencia es grande, se desperdiciará mucha memoria. Debido a que el usuario debe esperar hasta que todo el archivo esté en caché a la memoria antes de que se puedan aceptar los datos del archivo, esto lleva a una experiencia de usuario muy mala. Afortunadamente, ambos parámetros (REQ, RES) son transmisión, por lo que podemos usar fs.CreateReadStream () en lugar de fs.ReadFile ()::
<code> var http = require ('http'); var fs = require ('fs'); var server = http.createServer (function (req, res) {var stream = fs.CreateReadStream (__ dirname + '/data.txt'); stream.pipe (res);}); servidor.listen (8000); <Code>El método .Pipe () escucha los eventos 'Data' y 'End' de Fs.CreateReadStream (), para que el archivo "data.txt" no necesite almacenar en caché todo el archivo. Una vez completada la conexión del cliente, se puede enviar un bloque de datos al cliente de inmediato. Otro beneficio de usar .Pipe () es que puede resolver el problema del desequilibrio de lectura y escritura causado por una latencia muy grande del cliente. Si desea comprimir el archivo y enviarlo, puede usar un módulo de tres partes para implementarlo:
<Code> var http = require ('http'); var fs = require ('fs'); var opressor = require ('opressor'); var servidor = http.createserver (function (req, res) {var stream = fs.createReamStream (__ dirname + + '/data.txt'); stream.pipe (opressor (req)). pipe (res);}); server.listen (8000); </code>De esta manera, el archivo comprimirá el navegador que admite GZIP y desinflará. El módulo Oppressor maneja toda la codificación de contenido.
Stream hace que el desarrollo de programas sea simple.
3. Conceptos básicos
Hay cinco transmisiones básicas: legibles, escritas, transformadas, dúplex y "clásico".
3-1, tubería
Todos los tipos de flujo usan .pipe () para crear un par de entrada y salida, recibir un flujo legible SRC y emitir sus datos al DST de flujo de escritura, de la siguiente manera:
<code> src.pipe (dst) </code>
El método .Pipe (DST) devuelve el flujo DST, de modo que múltiples .Pipe () se puede usar sucesivamente, de la siguiente manera:
<code> A.Pipe (b) .Pipe (c) .pipe (d) </code>
La función es la misma que el siguiente código:
<code> A.Pipe (b); B.Pipe (C); C.Pipe (D); </code>
3-2. Transmisiones legibles
Al llamar al método .pipe () de secuencias legibles, puede escribir los datos de las transmisiones legibles a una secuencia de escritura, transformación o dúplex.
<code> readAbreStream.pipe (dst) </code>
1> Crear una transmisión legible
¡Aquí creamos una transmisión legible!
<Code> var requerible = request ('stream'). readable; var rs = new Readable; rs.push ('beep'); rs.push ('boop/n'); rs.push (null); rs.pipe (process.stdout); $ node read0.jsbeep boop </code>Rs.Push (NULL) notifica al destinatario de datos que se han enviado los datos.
Tenga en cuenta que no llamamos a Rs.Pipe (Process.StDout); Antes de empujar todos los datos en la transmisión legible, pero todos los datos en la secuencia legible todavía se emiten por completo, porque la secuencia legible almacena todos los datos presionados antes de que el receptor lea los datos. Pero en muchos casos, una mejor manera es presionar solo los datos en la transmisión legible cuando los datos reciben los datos solicitados en lugar de almacenar en caché todos los datos. Reescribamos la función ._read () a continuación:
<código> var legible = request ('stream'). readable; var rs = readable (); var c = 97; rs._read = function () {rs.push (string.fromCharCode (c ++)); if (c> 'z'.charcodeat (0)) rs.push (null);}; rs.pipe (process.stdout); </code> <code> read1.jsabcdefghijklmnopqrstuvwxyz </code>El código anterior implementa la reescritura del método _read () para impulsar los datos a la secuencia legible solo si el destinatario de datos solicita los datos. El método _read () también puede recibir un parámetro de tamaño que indica el tamaño de datos solicitados de los datos, pero la secuencia legible puede ignorar este parámetro según sea necesario.
Tenga en cuenta que también podemos usar Util.inherits () para heredar las transmisiones legibles. Para ilustrar que el método _read () se llama solo cuando el destinatario de datos solicita datos, hacemos un retraso al impulsar los datos en la secuencia legible, de la siguiente manera:
<Code> var requerible = require ('stream'). readable; var rs = readable (); var c = 97 - 1; rs._read = function () {if (c> = 'z'.charcodeat (0)) return rs.push (null); setTimeOut (function () {rs.push (string.fromchomChOn 100);}; rs.pipe (process.stdout); process.on ('exit', function () {console.error ('/n_read () llamado' + (c - 97) + 'Times');}); Process.stdout.on ('Error', Process.exit); </code>Al ejecutar el programa con el siguiente comando, encontramos que el método _read () se llamaba solo 5 veces:
<código> $ node read2.js | head -c5abcde_read () llamado 5 veces </code>
La razón para usar un temporizador es que el sistema lleva tiempo enviar señales para informar al programa para cerrar la tubería. Use Process.stdout.on ('Error', FN) para manejar el sistema que envía una señal Sigpipe porque el comando de encabezado cierra la tubería, porque esto hará que Process.stdout active el evento EPIPE. Si desea crear una secuencia legible que se pueda presionar en cualquier forma de datos, simplemente establezca el modo de objeto de parámetro en True al crear la transmisión, por ejemplo: Readable ({ObjectMode: True}).
2> Datos de secuencia legibles
En la mayoría de los casos, simplemente usamos el método de tubería para redirigir los datos de la transmisión legible a otra forma de flujo, pero en algunos casos puede ser más útil leer datos directamente desde la secuencia legible. como sigue:
<code> process.stdin.on ('readable', function () {var buf = process.stdin.read (); console.dir (buf);}); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | consumo de nodo0.js <buffer 0a = "" 61 = "" 62 = "" 63 = ""> <buffer 0a = "" 64 = "" 65 = "" 66 = ""> <buffer 0a = "" 67 = "" 68 = "" 69 = "> null </buffer> </buffer> </fuffer> </code>Cuando hay datos que se leen en la transmisión legible, la transmisión activará el evento 'legible', de modo que el método .read () pueda llamarse para leer los datos relevantes. Cuando no hay datos que se lean en la transmisión legible, .read () devolverá NULL, de modo que la llamada de .read () pueda finalizarse y esperar a que se active el próximo evento 'legible'. Aquí hay un ejemplo de usar .read (n) para leer 3 bytes cada vez de la entrada estándar:
<code> process.stdin.on ('readable', function () {var buf = process.stdin.read (3); console.dir (buf);}); </code>¡Ejecutar el programa de la siguiente manera muestra que los resultados de la salida no están completos!
<code> $ (echo ABC; Sleep 1; Echo def; Sleep 1; Echo Ghi) | consumo de nodo1.js <buffer 61 = "" 62 = "" 63 = ""> <buffer 0a = "" 64 = "" 65 = ""> <buffer 0a = "" 66 = "" 67 = ""> </buffer> </buffer> </code>
Esto debe hacerse para que los datos adicionales se dejen en el búfer interno de la transmisión, y necesitamos notificar a la secuencia que queremos leer más datos. Leer (0) puede lograr esto.
<code> process.stdin.on ('readable', function () {var buf = process.stdin.read (3); console.dir (buf); process.stdin.read (0);}); </code>Los resultados de esta ejecución son los siguientes:
<code> $ (echo ABC; Sleep 1; Echo def; Sleep 1; Echo Ghi) | consumo de nodo2.js <buffer 0a = "" 64 = "" 65 = ""> <buffer 0a = "" 68 = "" 69 = ""> </uffer> </uffer> </code>
Podemos usar .unshift () para cambiar el tamaño de los datos al jefe de la cola de datos de transmisión, de modo que podamos continuar leyendo los datos estados. Como en el siguiente código, el contenido de entrada estándar se emitirá por línea:
<code> var offset = 0; process.stdin.on ('readable', function () {var buf = process.stdin.read (); if (! buf) return; for (; offset <buf.length; offset ++) {if (bUf [offset] === 0x0a) {console.dir (buf.slice (0, offesset) .tostring ()))). buf.slice (offset + 1); offset = 0; process.stdin.unshift (buf); return;}} Process.stdin.unshift (buf);}); $ tail -n +50000/usr/share/dict/american -inglish | Cabeza -n10 | Node Lines.js 'Hearties''hearties''Hearty'''Heartiness''Heartiness'''Heartiness''Heartiness/' S''Heartland''Heartland/'S''Heartlands''Heartless''Heartly' </Code>Por supuesto, hay muchos módulos que pueden implementar esta función, como la división.
3-3. corrientes de escritura
Las transmisiones de escritura solo se pueden usar como el parámetro de destino de la función .Pipe (). El siguiente código:
<code> src.pipe (writableStream); </code>
1> Crear una transmisión de escritura
Reescribe el método ._write (Chunk, ENC, Next) para aceptar datos de una secuencia legible.
<Code> var witable = request ('stream'). Writable; var ws = witable (); ws._write = function (chunk, enc, next) {console.dir (chunk); next ();}; process.stdin.pipe (ws); $ (echo beep; dormir 1; echo boop) | nodo write0.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> </uffer> </buffer> </code>La primera fragmentación de parámetros son los datos escritos por el entrante de datos. El segundo final del parámetro es el formato de codificación de los datos. El tercer parámetro Siguiente (ERR) notifica al escritor de datos a través de la función de devolución de llamada que se puede escribir más tiempo. Si la transmisión legible escribe una cadena, la cadena se convertirá en un búfer de forma predeterminada. Si el parámetro WRITITY ({Decodestrings: False}) se establece al crear la transmisión, entonces la conversión no se realizará. Si los datos son escritos por la transmisión legible, entonces debe crear un flujo de escritura de esta manera
<code> witable ({objectMode: true}) </code>2> Escribir datos para la transmisión de escritura
Llame al método .Write (Data) de la secuencia de escritura para completar la redacción de datos.
<code> process.stdout.write ('beep boop/n'); </code>Llamar al método .end () notifica el flujo de escritura que los datos han sido escritos para completar.
<code> var fs = request ('fs'); var ws = fs.CreateWriteStream ('Message.txt'); ws.write ('beep'); setTimeOut (function () {ws.end ('boop/n');}, 1000); $ nodo Writing1.js $ Cat Message.txtbeep Boop </code>Si necesita establecer el tamaño del búfer de la secuencia de escritura, al crear la transmisión, debe configurar Opts.highwatermark, de modo que los datos en el búfer exceden las opts.highwatermark, el método .write (data) devolverá falso. Cuando el amortiguador es redonda, la corriente de escritura activará el evento 'drenaje'.
3-4. transmisiones clásicas
Classic Streams es una interfaz más antigua, que apareció por primera vez en la versión Node 0.4, pero aún es muy bueno comprender su principio operativo.
dónde. Cuando se registra una transmisión con el evento "Datos" a la función, la transmisión funcionará en el modo de versión anterior, es decir, se utilizará la API anterior.
1> transmisiones legibles clásicas
El evento clásico de transmisión legible es un desencadenante de eventos. Si las transmisiones legibles clásicas tienen datos para leer, desencadena el evento "Datos". Cuando se leen los datos, se activará el evento "Fin". El método .Pipe () determina si la secuencia tiene datos para leer al verificar el valor de stream.ecable. Aquí hay un ejemplo de impresión de letras AJ usando transmisiones legibles clásicas:
<código> var stream = request ('stream'); var stream = new stream; stream.readable = true; var c = 64; var IV = setInterval (function () {if (++ c> = 75) {clearInterval (iv); stream.emit ('end');} stream.emit ('data', string.frombarcode (c));}, 100); nodo clásico0.jsabcdefghij </code>Si desea leer datos de la transmisión legible clásica, registre las funciones de devolución de llamada de los dos eventos de "datos" y "fin", el código es el siguiente:
<code> process.stdin.on ('data', function (buf) {console.log (buf);}); process.stdin.on ('end', function () {console.log ('__ end __');}); $ (echo beep; dormir 1; echo boop) | nodo Classic1.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>Cabe señalar que si usa este método para leer datos, perderá los beneficios de usar la nueva interfaz. Por ejemplo, cuando escribe datos en una transmisión con una latencia muy alta, debe prestar atención al saldo entre leer y escribir datos, de lo contrario causará una gran cantidad de datos en la memoria, lo que resulta en un desperdicio de mucha memoria. En general, se recomienda usar el método .Pipe () de la transmisión, para que no tenga que escuchar los eventos de "datos" y "terminar" usted mismo, y no tiene que preocuparse por el problema de la lectura y la escritura desequilibradas. Por supuesto, también puede usar en lugar de escuchar los eventos de "datos" y "finalizar" usted mismo, como el siguiente código:
<Code> var a través de = requirir ('a través de'); proceso.stdin.pipe (a través de (escribir, end)); function write (buf) {console.log (buf);} function end () {console.log ('__ end __');} $ (echo beep; dormir 1; echo boop) | nodo a través de.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </ buffer> </code>O también puede usar Concat-stream para almacenar en caché el contenido de toda la transmisión:
<code> var concat = require ('concat-stream'); process.stdin.pipe (concat (function (body) {console.log (json.parse (cuerpo));})); $ echo '{"beep": "boop"}' | node concat.js {beep: 'boop'} </code>Por supuesto, si tiene que escuchar los eventos de "datos" y "finalizar" usted mismo, entonces puede usar el método .Pause () para pausar las transmisiones legibles clásicas y continuar activando el evento "Datos" cuando la transmisión de datos de escritura no es redonda. Espere hasta que la secuencia de datos de escritura sea escrita antes de usar el método .resume () notifica que la transmisión continúa activando el evento "Datos" para continuar leyendo.
datos.
2> transmisiones de escritura clásicas
Las transmisiones de escritura clásicas son muy simples. Solo hay tres métodos: .write (buf), .end (buf) y .destroy (). El parámetro BUF del método .end (BUF) es opcional. Si selecciona este parámetro, es equivalente a stream.write (buf); stream.end (). Cabe señalar que cuando el búfer de la transmisión está lleno, es decir, la transmisión no se puede escribir. El método Write (BUF) devolverá falso. Si la transmisión se escribe nuevamente, la corriente activará el evento de drenaje.
4. Transformar
Transform es una secuencia que filtra la salida de datos de lectura.
5. Duplex
La transmisión dúplex es una transmisión de dos vías que puede ser legible o escrita. Por ejemplo, un siguiente es una secuencia dúplex:
<code> A.Pipe (b) .Pipe (a) </code>
El contenido anterior es el Manual del usuario del flujo de datos de NodeJS Stream que le presenta el editor. ¡Espero que te sea útil!