1. Introdução
Este artigo apresenta os métodos básicos de desenvolvimento de programas usando fluxos Node.js.
<Code> "Devemos ter algumas maneiras de conectar programas como a mangueira de jardim-escapu-se o segmento quando se tornar necessário massagear os dados de maneira mais importante. Esse também é o caminho de IO." Doug McIlroy. 11 de outubro de 1964 </code>
O primeiro a entrar em contato com o fluxo foi as décadas de prática que começaram com o UNIX antigo, o que provou que as idéias do fluxo podem simplesmente desenvolver alguns sistemas enormes. No UNIX, o fluxo é implementado através de |; No nó, como um módulo de fluxo embutido, são usados muitos módulos principais e módulos de três partes. Como o Unix, a operação principal do fluxo de nós é .pipe () e os usuários podem usar o mecanismo de contrapressão para controlar o equilíbrio entre leitura e gravação.
O fluxo pode fornecer aos desenvolvedores uma interface unificada que pode reutilizar e controlar o equilíbrio de leitura e gravação entre fluxos através de interfaces de fluxo abstrato.
2. Por que usar o fluxo
A E/S no nó é assíncrona; portanto, ler e escrever para disco e rede requer leitura e gravação para dados por meio de funções de retorno de chamada. A seguir, é apresentado um código simples para um servidor de download de arquivos:
<Code> var http = requer ('http'); var fs = requer ('fs'); var server = http.createServer (function (req, res) {fs.readfile (__ DirName + '/data.txt', function, data) {ress. (dados);});Esses códigos podem implementar as funções necessárias, mas o serviço precisa armazenar em cache todos os dados do arquivo para a memória antes de enviar os dados do arquivo. Se o arquivo "data.txt" for grande e a simultaneidade for grande, muita memória será desperdiçada. Como o usuário precisa esperar até que todo o arquivo seja armazenado em cache na memória antes que os dados do arquivo possam ser aceitos, isso leva a uma experiência muito ruim do usuário. Felizmente, ambos os parâmetros (req, res) são fluxo, para que possamos usar o fs.createrEadstream () em vez de fs.readfile ():
<code> var http = requer ('http'); var fs = requer ('fs'); var server = http.createServer (function (req, res) {var stream = fs.createradStream (__ dirname + '/data.txt');O método .pipe () ouve os eventos 'dados' e 'end' do fs.createradstream (), para que o arquivo "data.txt" não precise cache o arquivo inteiro. Após a conclusão da conexão do cliente, um bloco de dados pode ser enviado ao cliente imediatamente. Outro benefício do uso de .pipe () é que ele pode resolver o problema de ler e escrever desequilíbrio causado por uma latência muito grande do cliente. Se você deseja comprimir o arquivo e enviá-lo, pode usar um módulo de três partes para implementá-lo:
<code> var http = requer ('http'); var fs = requer ('fs'); var opressor = requer ('opressor'); var server = http.createServer (function (req, rese) {var stream = fs.createradstream (__ dirname + '/data.txt'); stream.pipe (opressor (req)). Pipe (res);}); server.listen (8000); </code>Dessa forma, o arquivo comprime o navegador que suporta GZIP e deflate. O módulo opressor lida com todo o codificação de conteúdo.
O fluxo simplifica o desenvolvimento de programas.
3. Conceitos básicos
Existem cinco fluxos básicos: legível, gravável, transformada, duplex e "clássico".
3-1, tubo
Todos os tipos de fluxo de uso .pipe () para criar um par de entrada e saída, receber um fluxo legível SRC e produzir seus dados para o DST de fluxo gravável, como segue:
<code> src.pipe (dst) </code>
O método .pipe (dst) retorna o fluxo de DST, para que múltiplos .pipe () possam ser usados sucessivamente, como segue:
<code> a.pipe (b) .pipe (c) .pipe (d) </code>
A função é a mesma do código a seguir:
<code> a.pipe (b); b.pipe (c); c.pipe (d); </code>
3-2. Fluxos legíveis
Ao chamar o método .pipe () de fluxos legíveis, você pode escrever os dados de fluxos legíveis em um fluxo gravável, transformado ou duplex.
<code> readableStream.pipe (dst) </code>
1> Crie um fluxo legível
Aqui criamos um fluxo legível!
<code> var readerable = requim ('stream').Rs.push (nulo) notifica o destinatário de dados que os dados foram enviados.
Observe que não chamamos Rs.pipe (process.stdout); Antes de enviar todos os dados para o fluxo legível, mas todos os dados no fluxo legível ainda são emitidos completamente, porque o fluxo legível cache todos os dados pressionados antes que o receptor lê os dados. Mas, em muitos casos, uma maneira melhor é pressionar apenas os dados no fluxo legível quando os dados estão recebendo os dados solicitados em vez de cache todos os dados. Vamos reescrever a função ._read () abaixo:
<code> var readerable = requim ('stream'). legível; var rs = readertable (); var c = 97; rs._read = function () {rs.push (string.fromCharcode (c ++)); if (c> 'z'.charcodeat (0)) rs.push (null);}; rs.pipe (rssiT.d.sout (0) rs.push (null); read1.jsabcdefghijklmnopqrstuvwxyz </code>O código acima implementa a reescrita do método _read () para pressionar os dados para o fluxo legível somente se o destinatário de dados solicitar os dados. O método _read () também pode receber um parâmetro de tamanho que indica o tamanho dos dados solicitados dos dados, mas o fluxo legível pode ignorar esse parâmetro conforme necessário.
Observe que também podemos usar o util.Irits () para herdar fluxos legíveis. Para ilustrar que o método _read () é chamado apenas quando o destinatário de dados solicita dados, diminuímos um atraso ao enviar dados para o fluxo legível, como segue:
<code> var readerable = requim ('stream'). legível; var rs = readertable (); var c = 97 - 1; rs._read = function () {if (c> = 'z'.Charcodeat (0)) retorna rs.push (null); setTimeout (function () {rs.push (string.from (str.push (null); 100);}; rs.pipe (process.stdout); process.on ('exit', function () {console.error ('/n_read () chamado' + (c - 97) + 'times');}); process.stdout.on ('error', process.exit); </code>Ao executar o programa com o comando a seguir, descobrimos que o método _read () era chamado apenas 5 vezes:
<Code> $ Node Read2.js | cabeça -c5abcde_read () chamada 5 vezes </code>
O motivo do uso de um cronômetro é que o sistema leva tempo para enviar sinais para informar o programa para fechar o pipeline. Use Process.stdout.on ('Error', FN) para lidar com o sistema enviando um sinal Sigpipe porque o comando do cabeçalho fecha o pipeline, porque isso causará processo.stdout para acionar o evento EPIPE. Se você deseja criar um fluxo legível que possa ser pressionado em qualquer forma de dados, basta definir o parâmetro ObjectMode como true ao criar o fluxo, por exemplo: legível ({objectMode: true}).
2> dados de fluxo legíveis
Na maioria dos casos, simplesmente usamos o método do tubo para redirecionar os dados do fluxo legível para outra forma de fluxo, mas em alguns casos pode ser mais útil ler dados diretamente do fluxo legível. do seguinte modo:
<code> process.stdin.on ('legível', function () {var buf = process.stdin.read (); console.dir (buf);}); $ (echo abc; sono 1; echo def; sono 1; echo ghi) | Consumo de nós 0.js <buffer 0a = "" 61 = "" 62 = "" 63 = ""> <buffer 0a = "" 64 = "" 65 = "" 66 = ""> <buffer 0a = "" 67 = "" 68 = "" 69 = ""> null </buffer> </buffer> <///"/"/"/"/"/"/"/"/"/"/"" "">Quando houver dados a serem lidos no fluxo legível, o fluxo acionará o evento 'legível', para que o método .read () possa ser chamado para ler os dados relevantes. Quando não há dados a serem lidos no fluxo legível, .reread () retornará nulo, para que a chamada de .read () possa ser encerrada e aguarde o próximo evento 'legível' a ser acionado. Aqui está um exemplo de uso .read (n) para ler 3 bytes cada vez a partir da entrada padrão:
<code> process.stdin.on ('readável', function () {var buf = process.stdin.read (3); console.dir (buf);}); </code>Executar o programa da seguinte forma mostra que os resultados da saída não estão completos!
<code> $ (echo abc; sono 1; echo def; sono 1; echo ghi) | Consumo de nó1.js <buffer 61 = "" 62 = "" 63 = ""> <buffer 0a = "" 64 = "" 65 = ""> <buffer 0a = "" 66 = "" 67 = ""> </buffer> </fuffer> </buffer> </code>
Isso deve ser feito para que os dados adicionais sejam deixados no buffer interno do fluxo e precisamos notificar o fluxo que queremos ler mais dados. Leia (0) pode conseguir isso.
<Code> process.stdin.on ('readável', function () {var buf = process.stdin.read (3); console.dir (buf); process.stdin.read (0);}); </code>Os resultados desta execução são os seguintes:
<code> $ (echo abc; sono 1; echo def; sono 1; echo ghi) | consumo de nós2.js <buffer 0a = "" 64 = "" 65 = ""> <buffer 0a = "" 68 = "" 69 = ""> </fuffer> </fuffer> </code>
Podemos usar .UNSHIFT () para redimensionar os dados de volta à cabeça da fila de dados de streaming, para que possamos continuar lendo os dados estabelecidos. Como no código a seguir, o conteúdo de entrada padrão será emitido por linha:
<Code> var offset = 0; process.stdin.on ('readerável', function () {var buf = process.stdin.read (); if (! buf) return; para (;; offset <buf.length; offtring ++) {if (buf [offset] === 0x0a) (console.dur (buf.sll) buf.slice (deslocamento + 1); offset = 0; process.stdin.unshift (buf); return;}} process.stdin.unshift (buf);}); Cabeça -n10 | Node lines.js 'Hearties'''Hearties''Hearly'''Heartiness''Heartiness''Heartiness''Heartiness/' S''Heartland''Heartland/'S''Heartlands''Hearless''Heartly' </code>Obviamente, existem muitos módulos que podem implementar essa função, como a divisão.
3-3. fluxos graváveis
Os fluxos graváveis só podem ser usados como o parâmetro de destino da função .pipe (). O seguinte código:
<code> src.pipe (writableStream); </code>
1> Crie um fluxo gravável
Reescreva o método ._write (chunk, enc, next) para aceitar dados de um fluxo legível.
<code> var webentável = requer ('stream'). writable; var ws = writable (); ws._write = function (chunk, enc, next). Node write0.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> </buffer> </fuffer> </code>O primeiro parâmetro Chunk são os dados gravados pelo entrada de dados. A segunda extremidade do parâmetro é o formato de codificação dos dados. O terceiro parâmetro a seguir (err) notifica o redator de dados através da função de retorno de chamada que mais tempo pode ser gravado. Se o fluxo legível gravar uma string, a string será convertida em um buffer por padrão. Se o parâmetro Winditable ({Decodestrings: false}) estiver definido ao criar o fluxo, a conversão não será executada. Se os dados forem escritos pelo fluxo legível, você precisará criar um fluxo gravável dessa maneira
<Code> Writitable ({objectMode: true}) </code>2> Escreva dados para o fluxo gravável
Ligue para o método .Write (Data) de fluxo gravável para concluir a redação de dados.
<code> process.stdout.write ('beep boop/n'); </code>Chamar o método .END () notifica o fluxo gravável que os dados foram gravados para concluir.
<Code> var fs = requim ('fs'); var ws = fs.createwritSTream ('message.txt'); ws.write ('beep'); setTimeout (function () {ws.end ('boop/n');}, 1000); $ writing1.js $ Cat Message.txtBepSe você precisar definir o tamanho do buffer do fluxo gravável, ao criar o fluxo, precisará definir opts.highwatermark, para que, se os dados no buffer exceder o opts.highwatermark, o método .Write (Data) retornará falsa. Quando o buffer é gravável, o fluxo gravável aciona o evento 'drenagem'.
3-4. fluxos clássicos
O Classic Streams é uma interface mais antiga, que apareceu pela primeira vez na versão Node 0.4, mas ainda é muito bom entender seu princípio operacional.
onde. Quando um fluxo é registrado no evento "dados" de volta à função, o fluxo funcionará no modo de versão antiga, ou seja, a API antiga será usada.
1> fluxos legíveis clássicos
O evento clássico de fluxos legíveis é um gatilho do evento. Se os fluxos legíveis clássicos tiverem dados para ler, ele aciona o evento "dados". Quando os dados são lidos, o evento "final" será acionado. O método .pipe () determina se o fluxo possui dados a serem lidos, verificando o valor do fluxo. Aqui está um exemplo de impressão de letras AJ usando fluxos clássicos legíveis:
<Code> var stream = requer ('stream'); var stream = new stream; stream.reablelable = true; var c = 64; var iv = setInterval (function () {if (++ c> = 75) {clearInterval (iv); stream.emit ('end');}}} ('data', string.from.emit (c); c); c); c); c); c); c); Classic0.jsabcdefghij </code>Se você deseja ler dados do fluxo legível clássico, registre as funções de retorno de chamada dos dois eventos "Dados" e "End" Events, o código é o seguinte:
<code> process.stdin.on ('dados', function (buf) {console.log (buf);}); process.stdin.on ('end', function () {console.log ('__ end __');}); $ (echo biep; sono 1; echo boop) |); node clássico1.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>Deve -se notar que, se você usar esse método para ler os dados, perderá os benefícios do uso da nova interface. Por exemplo, quando você escreve dados em um fluxo com uma latência muito alta, é necessário prestar atenção ao saldo entre os dados de leitura e gravação, caso contrário, isso fará com que uma grande quantidade de dados seja armazenada em cache na memória, resultando em um desperdício de muita memória. Geralmente, é altamente recomendável usar o método .pipe () do fluxo, para que você não precise ouvir os eventos "dados" e "encerrar" você mesmo, e não precisa se preocupar com o problema de leitura e escrita desequilibradas. Obviamente, você também pode usar em vez de ouvir os eventos "dados" e "final", como o código a seguir:
<code> var a = requer ('através'); process.stdin.pipe (através (escreva, end)); função write (buf) {console.log (buf);} função end () {console.log ('__ end __');} $ (echo beep; sono 1; ecoop) | nó através de.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>Ou você também pode usar o fluxo concat para armazenar em cache o conteúdo de todo o fluxo:
<code> var concat = requer ('concat-stream'); process.stdin.pipe (concat (function (body) {console.log (json.parse (body));})); $ echo '{"beep": "boop"}' | node concat.js {beep: 'boop'} </code>Obviamente, se você precisar ouvir os eventos "Dados" e "End" você mesmo, poderá usar o método .Pause () para pausar fluxos legíveis clássicos e continuar acionando o evento "Data" quando o fluxo de dados de gravação não é gravável. Aguarde até que os dados de gravação do fluxo sejam escritos antes de usar o método .resume () notifica o fluxo para continuar acionando o evento "dados" para continuar a ler.
dados.
2> fluxos de gravidade clássicos
Fluxos de gravidade clássicos são muito simples. Existem apenas três métodos: .Write (BUF), .nd (BUF) e .Destroy (). O parâmetro BUF do método .END (BUF) é opcional. Se você selecionar este parâmetro, é equivalente a Stream.Write (BUF); stream.end (). Deve -se notar que, quando o buffer do fluxo estiver cheio, ou seja, o fluxo não pode ser escrito. O método Write (BUF) retornará false. Se o fluxo for escrito novamente, o fluxo acionará o evento de drenagem.
4. Transform
Transform é um fluxo que filtra a saída dos dados de leitura.
5. duplex
O fluxo duplex é um fluxo de mão dupla que pode ser legível ou escrita. Por exemplo, um abaixo é um fluxo duplex:
<code> a.pipe (b) .pipe (a) </code>
O conteúdo acima é o manual do usuário do fluxo de dados do NodeJS Stream apresentado pelo editor. Espero que seja útil para você!