1。はじめに
この記事では、node.jsストリームを使用してプログラムを開発する基本的な方法を紹介します。
<Code>「ガーデンホースなどのプログラムを接続する方法がある必要があります。これは、他の方法でデータをマッサージする必要がある場合に、他のセグメントを構成する必要があります。これがIOの方法でもあります。」Doug McIlroy。 1964年10月11日</code>
ストリームと最初に接触したのは、初期のUNIXで始まった数十年の実践でした。 UNIXでは、ストリームは|;を介して実装されます。ノードでは、組み込みのストリームモジュールとして、多くのコアモジュールと3つのパーティモジュールが使用されています。 UNIXと同様に、ノードストリームの主な動作は.pipe()であり、ユーザーは逆圧力メカニズムを使用して読み取りと書き込みのバランスを制御できます。
Streamは、開発者に、抽象ストリームインターフェイスを介してストリーム間の読み取りおよび書き込みバランスを再利用および制御できる統一インターフェイスを提供できます。
2.なぜストリームを使用するのか
ノードのI/Oは非同期であるため、ディスクとネットワークの読み取りと書き込みには、コールバック関数を介してデータを読み書きする必要があります。以下は、ファイルダウンロードサーバーの簡単なコードです。
<code> var http = require( 'http'); var fs = require( 'fs'); var server = http.createserver(function(req、res){fs.readfile(__ dirname + '/data.txt'、function(err、data){res.end);})これらのコードは必要な機能を実装できますが、サービスはファイルデータを送信する前にファイルデータ全体をメモリにキャッシュする必要があります。 「data.txt」ファイルが大きく、並行性が大きい場合、多くのメモリが無駄になります。ファイルデータを受け入れる前にファイル全体がメモリにキャッシュされるまで待機する必要があるため、これは非常に悪いユーザーエクスペリエンスにつながります。幸いなことに、両方のパラメーター(req、res)はストリームであるため、fs.readfile()の代わりにfs.createreadstream()を使用できます。
<code> var http = require( 'http'); var fs = require( 'fs'); var server = http.createserver(function(req、res){var stream = fs.createreadstream(__ dirname + '/data.txt'); stream.pipe(res);.pipe()メソッドは、fs.createreadStream()の「データ」および「終了」イベントを聴いているため、「data.txt」ファイルがファイル全体をキャッシュする必要はありません。クライアント接続が完了すると、データブロックをすぐにクライアントに送信できます。 .pipe()を使用するもう1つの利点は、非常に大きなクライアントの遅延によって引き起こされる不均衡の読み取りと書き込みの問題を解決できることです。ファイルを圧縮して送信する場合は、3つのパーティモジュールを使用して実装できます。
<code> var http = require( 'http'); var fs = require( 'fs'); var pruspressor = require( 'prostressor'); var server = http.createserver(function(req、res){var stream = fs.createredstream(__ dirname + + '/data.txt'); stream.pipe(pruspressor(req))。pipe(res);}); server.listen(8000); </code>このようにして、ファイルはGZIPとデフレートをサポートするブラウザを圧縮します。抑圧者モジュールは、すべてのコンテンツエンコードを処理します。
Streamは、開発のプログラムを簡単にします。
3。基本概念
読み取り可能、書き込み、変換、デュプレックス、「クラシック」の5つの基本的なストリームがあります。
3-1、パイプ
すべてのタイプのストリーム使用.pipe()入力と出力ペアを作成し、読み取り可能なストリームSRCを受信し、次のように、そのデータを書き込み可能なストリームDSTに出力します。
<code> src.pipe(dst)</code>
.pipe(dst)メソッドはDSTストリームを返し、次のように複数の.pipe()を連続して使用できるようにします。
<Code> A.Pipe(b).Pipe(c).Pipe(d)</code>
関数は次のコードと同じです。
<Code> A.Pipe(b); B.Pipe(c); c.pipe(d); </code>
3-2。読み取り可能なストリーム
読み取り可能なストリームの.pipe()方法を呼び出すことにより、読み取り可能なストリームのデータを書き込み可能な、変換、または二重ストリームに書き込むことができます。
<code> readablestream.pipe(dst)</code>
1>読み取り可能なストリームを作成します
ここで読みやすいストリームを作成します!
<code> var readable = require( 'stream')。読み取り可能; var rs = new readable; rs.push( 'beep'); rs.push( 'boop/n'); rs.push(null); rs.pipe(process.stdout); $ node read0.jsbeep </code> code>
Rs.Push(null)は、データが送信されたことをデータ受信者に通知します。
rs.pipe(process.stdout)に電話しなかったことに注意してください。すべてのデータを読み取り可能なストリームにプッシュする前に、読み取り可能なストリームが受信機がデータを読み取る前にすべてのプレスデータをキャッシュするため、すべてのデータを読み取り可能なストリームに完全に出力します。しかし、多くの場合、より良い方法は、データがデータ全体をキャッシュする代わりに、データが要求されたデータを受信しているときにのみデータを読み取り可能なストリームに押し込むことです。以下の._read()関数を書き直しましょう。
<code> var readable = require( 'stream')。readable; var rs = readable(); var c = 97; rs._read = function(){rs.push(string.fromCharcode(c ++)); if(c>'.Charcodeat(0))rs.Push(NULL); read1.jsabcdefghijklmnopqrstuvwxyz </code>上記のコードは、_Read()メソッドの書き換えを実装して、データを受信者がデータを要求した場合にのみ、データを読み取り可能なストリームにプッシュします。 _read()メソッドは、データの要求されたデータサイズを示すサイズパラメーターを受信することもできますが、読み取り可能なストリームは必要に応じてこのパラメーターを無視できます。
util.inherits()を使用して読み取り可能なストリームを継承できることに注意してください。 _read()メソッドは、データ受信者がデータを要求する場合にのみ呼び出されることを示すために、次のようにデータを読み取り可能なストリームにプッシュするときに遅延を実行します。
<code> var readable = require( 'stream')。readable; var rs = readable(); var c = 97-1; rs._read = function(){if(c> = 'z'charcodeat(0))return rs.push(null); 100);}; rs.Pipe(process.stdout); process.on( 'exit'、function(){console.error( '/n_read()call' +(c -97) + 'times');}); cross.stdout.on( 'error'、process.exit); </code> </code> </code>次のコマンドでプログラムを実行すると、_read()メソッドは5回のみ呼び出されることがわかりました。
<code> $ node read2.js | Head -C5ABCDE_READ()は5回</code>に呼び出されます
タイマーを使用する理由は、システムがパイプラインを閉じるためにプログラムに通知するために信号を送信するのに時間がかかるからです。 Process.stdout.on( 'エラー'、FN)を使用して、ヘッダーコマンドがパイプラインを閉じるため、Sigpipe信号を送信するシステムを処理します。任意の形式のデータに押し込むことができる読み取り可能なストリームを作成する場合は、ストリームを作成するときにパラメーターオブジェクトモードをtrueに設定するだけです。
2>読み取り可能なストリームデータ
ほとんどの場合、PIPEメソッドを使用して、読み取り可能なストリームのデータを別の形式のストリームにリダイレクトするだけですが、場合によっては、読み取り可能なストリームから直接データを読み取る方が便利かもしれません。次のように:
<code> process.stdin.on( 'readable'、function(){var buf = process.stdin.read(); console.dir(buf);}); $(echo abc; sleep 1; echo def; sleep 1; echo ghi)| Node Consumption0.js <Buffer 0a = "" 61 = "" 62 = "" 63 = ""> <バッファー0a = "" 64 = "" 65 = "" 66 = ""> <バッファー0A = "" 67 = "" 68 = "69 =" ""> null </null </buffer </buffer </code>読み取り可能なストリームで読み取るデータがある場合、ストリームは「読み取り可能」イベントをトリガーし、.Read()メソッドを呼び出して関連するデータを読み取ることができます。読み取り可能なストリームに読み取るデータがない場合、.read()はnullを返し、.read()の呼び出しを終了し、次の「読み取り可能な」イベントがトリガーされるのを待つことができます。以下は、標準入力から毎回3バイトを読み取るために.read(n)を使用する例です。
<code> process.stdin.on( 'readable'、function(){var buf = process.stdin.read(3); console.dir(buf);}); </code>次のようにプログラムを実行すると、出力の結果が完了していないことが示されています。
<code> $(echo abc; sleep 1; echo def; sleep 1; echo ghi)|ノード消費1.js <バッファ61 = "" 62 = "" 63 = ""> <バッファー0a = "" 64 = "" 65 = ""> <バッファー0a = "" 66 = "" 67 = ""> </buffer> </buffer> </buffer> </code>
これは、追加のデータをストリームの内部バッファーに残しておく必要があります。また、より多くのデータを読みたいストリームに通知する必要があります。読み取り(0)はこれを達成できます。
<code> process.stdin.on( 'readable'、function(){var buf = process.stdin.read(3); console.dir(buf); process.stdin.read(0);}); </code>この実行の結果は次のとおりです。
<code> $(echo abc; sleep 1; echo def; sleep 1; echo ghi)| node Consumption2.js <バッファ0A = "" 64 = "" 65 = ""> <バッファー0A = "" 68 = "" 69 = ""> </buffer> </buffer> </code>
.unshift()を使用して、データをストリーミングデータキューのヘッドに変更して、杭打ちデータを読み続けることができます。次のコードと同様に、標準の入力コンテンツは行ごとに出力されます。
<code> var offset = 0; process.stdin.on( 'readable'、function(){var buf = process.stdin.read(); if(!buf)return; for(; offset <buf.length; offset ++){if(buf [offset] ==== 0x0a){console.dir.(0、buf.slice(0、buf.slice(0、bufs) buf.slice(offset + 1); offset = 0; process.stdin.unshift(buf); return;}} process.stdin.unshift(buf);}); $ tail -n +50000/usr/share/share/dict/amean -english |ヘッド-N10 | node lines.js 'hearties''hearties''heartily'heartinessもちろん、Splitなどのこの関数を実装できる多くのモジュールがあります。
3-3。書き込み可能なストリーム
書き込み可能なストリームは、.pipe()関数の宛先パラメーターとしてのみ使用できます。次のコード:
<code> src.pipe(writableStream); </code>
1>書き込み可能なストリームを作成します
読み取り可能なストリームからデータを受け入れるために、._write(chunk、enc、next)メソッドを書き直します。
<code> var writable = require( 'stream')。writable; var ws = writable(); ws._write = function(chunk、enc、next){console.dir(chunk); next();}; process.stdin.pipe(ws); $(echo beep; sleep 1; echo boop)| node write0.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <バッファー0a = "" 62 = "" 6f = "" 70 = ""> </buffer> </buffer> </code>最初のパラメーターチャンクは、データ入力者によって記述されたデータです。 2番目のパラメーターの終了は、データのエンコード形式です。次の3番目のパラメーター(ERR)は、より多くの時間を記述できるというコールバック関数を介してデータライターに通知します。読み取り可能なストリームが文字列を書き込む場合、文字列はデフォルトでバッファーに変換されます。ストリームを作成するときに、書き込み可能な({decodeStrings:false})パラメーターが設定されている場合、変換は実行されません。データが読み取り可能なストリームによって記述されている場合、この方法で書き込み可能なストリームを作成する必要があります
<code> writable({objectMode:true})</code>2>書き込み可能なストリームにデータを書き込みます
.write(data)writableストリームの方法を呼び出して、データライティングを完了します。
<code> process.stdout.write( 'beep boop/n'); </code>
.end()メソッドを呼び出すと、データが記述されていることを書き込み可能なストリームに通知します。
<code> var fs = require( 'fs'); var ws = fs.createwritestream( 'message.txt'); ws.write( 'beep'); setimeout(function(){ws.end( 'boop/n');}、$ node writing1.js $ cat message.txtbeep </code書き込み可能なストリームのバッファーのサイズを設定する必要がある場合は、ストリームを作成するときは、opts.highwatermarkを設定する必要があります。バッファが書き込み可能な場合、書き込み可能なストリームは「ドレイン」イベントをトリガーします。
3-4。クラシックストリーム
Classic Streamsは古いインターフェイスで、最初にNode 0.4バージョンに登場しましたが、その動作原則を理解することは非常に良いことです。
どこ。ストリームが関数に戻る「データ」イベントに登録されると、ストリームは古いバージョンモードで動作します。つまり、古いAPIが使用されます。
1>古典的な読み取り可能なストリーム
Classic Readable Streamsイベントは、イベントトリガーです。 Classic Readable Streamsに読み取るデータがある場合、「データ」イベントをトリガーします。データを読み取ると、「終了」イベントがトリガーされます。 .pipe()メソッドは、Stream.readableの値をチェックすることにより、ストリームに読み取るデータがあるかどうかを決定します。これは、古典的な読み取り可能なストリームを使用してAJレターを印刷する例です。
<code> var stream = require( 'stream'); var stream = new stream.read.readable = true; var c = 64; var iv = setinterval(function(){if(++ c> = 75){clearinterval(iv); stream.emit( 'end');} elsent.emit( 'data'、String.fromCharcode(c); fromCharcode(c); node Classic0.jsabcdefghij </code>Classic Readableストリームのデータを読みたい場合は、2つのイベント「データ」と「終了」イベントのコールバック関数を登録してください。コードは次のとおりです。
<code> process.stdin.on( 'data'、function(buf){console.log(buf);}); process.stdin.on( 'end'、function(){console.log( '__ end __');}); $(echo beep; sleep 1; echo boop)| node Classic1.js <バッファ0A = "" 62 = "" 65 = "" 70 = ""> <バッファー0A = "" 62 = "" 6f = ""この方法を使用してデータを読み取ると、新しいインターフェイスを使用することの利点が失われることに注意してください。たとえば、非常に高いレイテンシのあるストリームにデータを書き込む場合、読み取りデータと書き込みデータのバランスに注意を払う必要があります。そうしないと、メモリに大量のデータがキャッシュされ、多くのメモリが無駄になります。一般的に、ストリームの.pipe()メソッドを使用することを強くお勧めします。そうすることで、「データ」や「終了」イベントを自分で聞く必要がなくなり、バランスの取れた読み取りと執筆の問題を心配する必要はありません。もちろん、次のコードなど、「データ」や「終了」イベントを自分で聞く代わりに使用することもできます。
<code> var sens = require( 'ser'); process.stdin.pipe(write、end)); function write(buf){console.log(buf);} function end(){console.log( '__ end __');} $(echo sleep 1; echo boop)| node sull.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <バッファー0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>または、concat-streamを使用して、ストリーム全体のコンテンツをキャッシュすることもできます。
<code> var concat = require( 'concat-stream'); process.stdin.pipe(concat(body){console.log(json.parse(body));})); $ echo '{"beep": "boop"}' | node concat.js {beep: 'boop'} </code>もちろん、「データ」と「終了」イベントを自分で聴く必要がある場合は、.pause()メソッドを使用して、典型的な読み取り可能なストリームを一時停止し、[データのストリームが書き込みもできない場合に「データ」イベントをトリガーし続けることができます。 .resume()メソッドを使用する前に、ストリームの書き込みデータが書き込み可能になるまで待ってください。ストリームに通知して、「データ」イベントを引き続きトリガーして読み続けます。
データ。
2>古典的な書き込みストリーム
古典的な書き込みストリームは非常にシンプルです。 .write(buf)、.end(buf)、および.destroy()の3つの方法しかありません。 .END(BUF)メソッドのBUFパラメーターはオプションです。このパラメーターを選択すると、stream.write(buf)に相当します。 stream.end()。ストリームのバッファーがいっぱいになった場合、つまりストリームを書き込むことができないことに注意する必要があります。書き込み(buf)メソッドはfalseを返します。ストリームが再び書き込みできる場合、ストリームはドレインイベントをトリガーします。
4。変換
変換は、読み取りデータの出力をフィルタリングするストリームです。
5。デュプレックス
デュプレックスストリームは、読みやすいまたは書き込みできる双方向のストリームです。たとえば、以下は二重ストリームです。
<Code> A.Pipe(b).Pipe(a)</code>
上記のコンテンツは、編集者が紹介したNodeJS Stream Data Flowユーザーマニュアルです。それがあなたに役立つことを願っています!