1. Pendahuluan
Artikel ini memperkenalkan metode dasar pengembangan program menggunakan aliran Node.js.
<code> "Kita harus memiliki beberapa cara untuk menghubungkan program-program seperti Garden Hose-sekrup segmen yang tidak lain yang diperlukan untuk memijat data yang sama lain. Ini adalah cara IO juga." Doug McIlroy. 11 Oktober 1964 </code>
Yang pertama bersentuhan dengan Stream adalah latihan puluhan tahun yang dimulai dengan UNIX awal, yang membuktikan bahwa ide -ide aliran dapat dengan mudah mengembangkan beberapa sistem besar. Di UNIX, aliran diimplementasikan melalui |; Dalam node, sebagai modul aliran bawaan, banyak modul inti dan modul tiga partai digunakan. Seperti Unix, operasi utama aliran node adalah .pipe (), dan pengguna dapat menggunakan mekanisme penanggulangan untuk mengontrol keseimbangan antara baca dan tulis.
Stream dapat memberi pengembang antarmuka terpadu yang dapat menggunakan kembali dan mengontrol keseimbangan baca dan tulis antara aliran melalui antarmuka aliran abstrak.
2. Mengapa menggunakan Stream
I/O di Node tidak sinkron, jadi membaca dan menulis ke disk dan jaringan memerlukan membaca dan menulis ke data melalui fungsi callback. Berikut ini adalah kode sederhana untuk server unduhan file:
<code> var http = membutuhkan ('http'); var fs = membutuhkan ('fs'); var server = http.createServer (function (req, res) {fs.readfile (__ dirname + '/data.txt', function (err, data) {res.end (data);};Kode -kode ini dapat mengimplementasikan fungsi yang diperlukan, tetapi layanan perlu men -cache seluruh data file ke memori sebelum mengirim data file. Jika file "data.txt" besar dan konkurensi besar, banyak memori akan sia -sia. Karena pengguna perlu menunggu sampai seluruh file di -cache ke memori sebelum data file dapat diterima, ini mengarah pada pengalaman pengguna yang sangat buruk. Untungnya, kedua parameter (req, res) adalah aliran, sehingga kita dapat menggunakan fs.createreadstream () bukan fs.readfile ():
<code> var http = membutuhkan ('http'); var fs = membutuhkan ('fs'); var server = http.createServer (function (req, res) {var stream = fs.creatreeadStream (__ dirname + '/data.txt'); stream.pipe (res) (} server + '/data.tx (stream.pipe (res);};Metode .pipe () mendengarkan peristiwa 'data' dan 'akhir' dari fs.createreadstream (), sehingga file "data.txt" tidak perlu men -cache seluruh file. Setelah koneksi klien selesai, blok data dapat dikirim ke klien segera. Manfaat lain dari menggunakan .pipe () adalah bahwa ia dapat menyelesaikan masalah ketidakseimbangan membaca dan menulis yang disebabkan oleh latensi klien yang sangat besar. Jika Anda ingin mengompres file dan mengirimkannya, Anda dapat menggunakan modul tiga partai untuk mengimplementasikannya:
<code> var http = membutuhkan ('http'); var fs = membutuhkan ('fs'); var penindas = membutuhkan ('penindas'); var server = http.createServer (fungsi (req, res) {var stream = fs.createreadstream (__ dirname + '/data.txt'); stream.pipe (penindas (req)). pipa (res);}); server.listen (8000); </code>Dengan cara ini file akan mengompres browser yang mendukung GZIP dan mengempis. Modul penindas menangani semua pengkodean konten.
Stream membuat program berkembang menjadi sederhana.
3. Konsep Dasar
Ada lima aliran dasar: dapat dibaca, dapat ditulis, transformasi, dupleks, dan "klasik".
3-1, pipa
Semua jenis aliran.
<code> src.pipe (dst) </code>
Metode .pipe (DST) mengembalikan aliran DST, sehingga beberapa .pipe () dapat digunakan secara berturut -turut, sebagai berikut:
<code> a.pipe (b) .pipe (c) .pipe (d) </code>
Fungsinya sama dengan kode berikut:
<code> a.pipe (b); b.pipe (c); c.pipe (d); </code>
3-2. Aliran yang dapat dibaca
Dengan memanggil metode .pipe () dari aliran yang dapat dibaca, Anda dapat menulis data aliran yang dapat dibaca ke aliran yang dapat ditulis, mengubah, atau dupleks.
<code> readableStream.pipe (dst) </code>
1> Buat aliran yang dapat dibaca
Di sini kami membuat aliran yang dapat dibaca!
<code> var readable = membutuhkan ('stream'). dapat dibaca; var rs = baru dapat dibaca; rs.push ('bip'); rs.push ('boop/n'); rs.push (null); rs.pipe (proses.stdout); $ node read0.jsbeep boop </code>rs.push (null) memberi tahu penerima data bahwa data telah dikirim.
Perhatikan bahwa kami tidak memanggil rs.pipe (Process.stdout); Sebelum mendorong semua data ke dalam aliran yang dapat dibaca, tetapi semua data ke dalam aliran yang dapat dibaca masih sepenuhnya, karena aliran yang dapat dibaca mencaci semua data yang ditekan sebelum penerima membaca data. Tetapi dalam banyak kasus, cara yang lebih baik adalah hanya menekan data ke aliran yang dapat dibaca ketika data menerima data yang diminta alih -alih cache seluruh data. Mari kita tulis ulang fungsi ._read () di bawah ini:
<code> var readable = membutuhkan ('stream'). dapat dibaca; var rs = dapat dibaca (); var c = 97; rs._read = function () {rs.push (string.fromCharcode (c ++)); if (c> 'z'.charcodeat (0)) rs.push (null);}; rs. nol (0)) rs.push (null);}; read1.jsabcdefghijklmnopqrstuvwxyz </code>Kode di atas menerapkan penulisan ulang metode _read () untuk mendorong data ke aliran yang dapat dibaca hanya jika penerima data meminta data. Metode _read () juga dapat menerima parameter ukuran yang menunjukkan ukuran data yang diminta dari data, tetapi aliran yang dapat dibaca dapat mengabaikan parameter ini sesuai kebutuhan.
Perhatikan bahwa kita juga dapat menggunakan util.inherit () untuk mewarisi aliran yang dapat dibaca. Untuk mengilustrasikan bahwa metode _read () dipanggil hanya ketika penerima data meminta data, kami membuat penundaan saat mendorong data ke aliran yang dapat dibaca, sebagai berikut:
<code> var readable = membutuhkan ('stream'). dapat dibaca; var rs = dapat dibaca (); var c = 97 - 1; rs._read = function () {if (c> = 'z'.charcodeat (0)) return rs.push (null); setTimeout (function () {rs.push (string.FromCode (++ C), {rs.push (string.FrOMCODE (++ CUSHCODE (++ CUSHCODE (++ CUSHCUDE (++ CUSHCUDE (++ CUSHCUDE (++ CUSHCUDE (++ 100);}; rs.pipe (process.stdout); process.on ('exit', function () {console.error ('/n_read () disebut' + (c - 97) + 'kali');}); process.stdout.on ('error', process.exit); </kode>Saat menjalankan program dengan perintah berikut, kami menemukan bahwa metode _read () hanya dipanggil 5 kali:
<code> $ node read2.js | head -c5abcde_read () dipanggil 5 kali </code>
Alasan menggunakan timer adalah bahwa sistem membutuhkan waktu untuk mengirim sinyal untuk menginformasikan program untuk menutup pipa. Gunakan proses.stdout.on ('error', fn) untuk menangani sistem yang mengirim sinyal sigpipe karena perintah header menutup pipa, karena ini akan menyebabkan proses.stdout untuk memicu acara epipe. Jika Anda ingin membuat aliran yang dapat dibaca yang dapat ditekan ke segala bentuk data, cukup atur Parameter ObjectMode ke True saat membuat aliran, misalnya: dapat dibaca ({ObjectMode: true}).
2> Data aliran yang dapat dibaca
Dalam kebanyakan kasus, kami cukup menggunakan metode pipa untuk mengarahkan data aliran yang dapat dibaca ke bentuk aliran lain, tetapi dalam beberapa kasus mungkin lebih berguna untuk membaca data langsung dari aliran yang dapat dibaca. sebagai berikut:
<code> process.stdin.on ('dapat dibaca', fungsi () {var buf = process.stdin.read (); console.dir (buf);}); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | Node Consumption0.js <buffer 0a = "" 61 = "" 62 = "" 63 = ""> <buffer 0a = "" 64 = "" 65 = "" 66 = ""> <buffer 0a = "" 67 = "" 68 = "" 69 = ""> null </buffer> </buffer> </Buffer> Buffer> "" null </buffer> </buffer> </Buffer> Buffer> </Buffer> </"> null </buffer> </buffer> </Buffer> BUFFER> </Buffer> </Buffer> Buffer> </Buffer>Ketika ada data yang harus dibaca dalam aliran yang dapat dibaca, aliran akan memicu acara 'dapat dibaca', sehingga metode .read () dapat dipanggil untuk membaca data yang relevan. Ketika tidak ada data untuk dibaca dalam aliran yang dapat dibaca, .Read () akan mengembalikan null, sehingga panggilan .read () dapat berakhir dan menunggu acara 'dapat dibaca' berikutnya dipicu. Berikut adalah contoh penggunaan. Bread (n) untuk membaca 3 byte setiap kali dari input standar:
<code> process.stdin.on ('readable', function () {var buf = process.stdin.read (3); console.dir (buf);}); </code>Menjalankan program sebagai berikut menunjukkan bahwa hasil output tidak lengkap!
<code> $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | Node Consumption1.js <buffer 61 = "" 62 = "" 63 = ""> <buffer 0a = "" 64 = "" 65 = ""> <buffer 0a = "" 66 = "" 67 = ""> </buffer> </buffer> </buffer> </kode> "
Ini harus dilakukan agar data tambahan ditinggalkan dalam buffer internal aliran, dan kami perlu memberi tahu aliran yang ingin kami baca lebih banyak data. Baca (0) dapat mencapai ini.
<code> process.stdin.on ('dapat dibaca', function () {var buf = process.stdin.read (3); console.dir (buf); process.stdin.read (0);}); </code>Hasil menjalankan ini adalah sebagai berikut:
<code> $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | Node Consumption2.js <buffer 0a = "" 64 = "" 65 = ""> <buffer 0a = "" 68 = "" 69 = ""> </buffer> </buffer> </code>
Kita dapat menggunakan .unshift () untuk mengubah ukuran data kembali ke kepala antrian data streaming, sehingga kita dapat terus membaca data yang dipertaruhkan. Seperti pada kode berikut, konten input standar akan output berdasarkan baris:
<code> var offset = 0; process.stdin.on ('readable', function () {var buf = process.stdin.read (); if (! buf) return; for (; offset <buf.length; offset ++) {if ever (buf] === 0x0a) {konsol. buf.slice (offset + 1); offset = 0; process.stdin.unshift (buf); return;}} process.stdin.unshift (buf);}); $ tail -n +50000/usr/share/Dict/American -English | head -n10 | node lines.js 'heartes''Hearties''Heartily''Heartiness''Heartiness''Heartiness''Heartiness/' S''Heartland''Heartland/'S''Heartlands''Heartless'''Heartlessly' </code>Tentu saja, ada banyak modul yang dapat mengimplementasikan fungsi ini, seperti Split.
3-3. Aliran yang dapat ditulis
Aliran yang dapat ditulis hanya dapat digunakan sebagai parameter tujuan fungsi .pipe (). Kode berikut:
<code> src.pipe (writableStream); </code>
1> Buat aliran yang dapat ditulis
Tulis ulang metode ._write (chunk, enc, berikutnya) untuk menerima data dari aliran yang dapat dibaca.
<code> var writable = membutuhkan ('stream'). writable; var ws = writable (); ws._write = function (chunk, enc, next) {console.dir (chunk); next ();}; process.stdin.pipe (ws); $ (echo beep; sleep 1; echo boop) | node write0.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> </buffer> </buffer> </code>Potongan parameter pertama adalah data yang ditulis oleh inputer data. Ujung parameter kedua adalah format pengkodean data. Parameter ketiga berikutnya (ERR) memberi tahu penulis data melalui fungsi panggilan balik bahwa lebih banyak waktu dapat ditulis. Jika aliran yang dapat dibaca menulis string, string akan dikonversi ke buffer secara default. Jika parameter writable ({decodestrings: false}) diatur saat membuat aliran, maka konversi tidak akan dilakukan. Jika data ditulis oleh aliran yang dapat dibaca, maka Anda perlu membuat aliran yang dapat ditulis dengan cara ini
<code> writable ({ObjectMode: true}) </code>2> Tulis data ke aliran yang dapat ditulis
Hubungi metode .write (data) dari aliran writable untuk menyelesaikan penulisan data.
<code> process.stdout.write ('bip boop/n'); </code>Memanggil metode .EnnD () memberi tahu aliran yang dapat ditulis bahwa data telah ditulis untuk diselesaikan.
<code> var fs = membutuhkan ('fs'); var ws = fs.createWriteStream ('message.txt'); ws.write ('bip'); setTimeout (function () {ws.end ('boop/n');}, 1000); $ node writes1.js $ cat message.txtbeep boop </Jika Anda perlu mengatur ukuran buffer aliran yang dapat ditulis, maka saat membuat aliran, Anda perlu mengatur opts.highwatermark, sehingga jika data dalam buffer melebihi opts.highwatermark, metode .write (data) akan mengembalikan false. Ketika buffer dapat ditulis, aliran yang dapat ditulis akan memicu peristiwa 'tiriskan'.
3-4. Aliran klasik
Classic Streams adalah antarmuka yang lebih lama, yang pertama kali muncul dalam versi Node 0,4, tetapi masih sangat baik untuk memahami prinsip operasinya.
Di mana. Ketika aliran terdaftar dengan acara "Data" kembali ke fungsi, aliran akan bekerja dalam mode versi lama, yaitu, API lama akan digunakan.
1> aliran klasik yang dapat dibaca
Acara Streams yang dapat dibaca klasik adalah pemicu acara. Jika aliran klasik yang dapat dibaca memiliki data untuk dibaca, itu memicu acara "data". Ketika data dibaca, acara "Akhir" akan dipicu. Metode .pipe () menentukan apakah aliran memiliki data untuk dibaca dengan memeriksa nilai aliran. Berikut adalah contoh mencetak huruf AJ menggunakan aliran klasik yang dapat dibaca:
<code> var stream = membutuhkan ('stream'); var stream = stream baru; stream.readable = true; var c = 64; var iv = setInterval (function () {if (++ c> = 75) {clearInterval (iv); stream.emit ('end');} else stream.emit ('data', string.fromChorc (c) (c);}, creat. 100); stream.pipe (process.stdout); $ node classic0.jsabcdefghij </code>Jika Anda ingin membaca data dari aliran klasik yang dapat dibaca, daftarkan fungsi panggilan balik dari dua peristiwa "data" dan "end" peristiwa, kodenya adalah sebagai berikut:
<code> process.stdin.on ('data', function (buf) {console.log (buf);}); process.stdin.on ('end', function () {console.log ('__ end __');}); $ (echo beep; sleep 1; echo boop) | node classic1.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>Perlu dicatat bahwa jika Anda menggunakan metode ini untuk membaca data, Anda akan kehilangan manfaat menggunakan antarmuka baru. Misalnya, ketika Anda menulis data ke aliran dengan latensi yang sangat tinggi, Anda perlu memperhatikan keseimbangan antara membaca dan menulis data, jika tidak, itu akan menyebabkan sejumlah besar data di -cache dalam memori, menghasilkan buang -buang banyak memori. Secara umum, sangat disarankan untuk menggunakan metode .pipe () dari aliran, sehingga Anda tidak perlu mendengarkan sendiri acara "data" dan "akhir", dan Anda tidak perlu khawatir tentang masalah membaca dan menulis yang tidak seimbang. Tentu saja, Anda juga dapat menggunakan melalui alih -alih mendengarkan "data" dan "akhir" acara sendiri, seperti kode berikut:
<code> var through = membutuhkan ('through'); process.stdin.pipe (through (write, end)); function write (buf) {console.log (buf);} function end () {console.log ('__ end __');} $ (echo beep; sleep 1; echo boop) | node through.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>Atau Anda juga dapat menggunakan concat-stream untuk menyimpan konten seluruh aliran:
<code> var concat = membutuhkan ('concat-stream'); process.stdin.pipe (concat (function (body) {console.log (json.parse (body));})); $ echo '{"bip": "boop"}' | node concat.js {bip: 'boop'} </code>Tentu saja, jika Anda harus mendengarkan sendiri peristiwa "data" dan "end", maka Anda dapat menggunakan metode .Pause () untuk menjeda aliran klasik yang dapat dibaca dan terus memicu peristiwa "data" ketika aliran data penulisan tidak dapat ditulis. Tunggu sampai aliran data penulisan dapat ditulis sebelum menggunakan metode .resume () memberi tahu aliran untuk terus memicu acara "data" untuk terus membaca.
data.
2> aliran klasik
Aliran yang dapat ditulis klasik sangat sederhana. Hanya ada tiga metode: .write (buf), .end (buf) dan .destroy (). Parameter BUF dari metode .EnnD (BUF) adalah opsional. Jika parameter ini dipilih, itu setara dengan stream.write (buf); operasi stream.end (). Perlu dicatat bahwa ketika buffer aliran penuh, yaitu, aliran tidak dapat ditulis. Metode tulis (BUF) akan mengembalikan false. Jika alirannya bisa ditulis lagi, aliran akan memicu acara pembuangan.
4. Transformasi
Transform adalah aliran yang menyaring output data baca.
5. Dupleks
Duplex Stream adalah aliran dua arah yang dapat dibaca atau ditulis. Misalnya, di bawah ini adalah aliran dupleks:
<code> a.pipe (b) .pipe (a) </code>
Konten di atas adalah Manual Pengguna Aliran Data NodeJS Stream yang diperkenalkan kepada Anda oleh editor. Saya harap ini akan membantu Anda!