1. บทนำ
บทความนี้แนะนำวิธีการพื้นฐานในการพัฒนาโปรแกรมโดยใช้สตรีม node.js
<code> "เราควรมีวิธีการเชื่อมต่อโปรแกรมเช่น Garden Hose-ส่วนที่ไม่เป็นจริงเมื่อมันจำเป็นที่จะต้องนวดข้อมูลที่ไม่เป็นจริงนี่คือวิธีของ IO ด้วยเช่นกัน" Doug McIlroy 11 ตุลาคม 1964 </code>
คนแรกที่เข้ามาติดต่อกับสตรีมคือการฝึกฝนมานานหลายทศวรรษซึ่งเริ่มต้นด้วย UNIX ยุคแรกซึ่งพิสูจน์แล้วว่าความคิดสตรีมสามารถพัฒนาระบบขนาดใหญ่ได้ ใน UNIX สตรีมจะถูกนำไปใช้ผ่าน |; ในโหนดเป็นโมดูลสตรีมในตัวจะใช้โมดูลหลักจำนวนมากและโมดูลสามพรรค เช่นเดียวกับ UNIX การทำงานหลักของสตรีมโหนดคือ. pipe () และผู้ใช้สามารถใช้กลไกการตอบโต้เพื่อควบคุมความสมดุลระหว่างการอ่านและการเขียน
สตรีมสามารถให้อินเทอร์เฟซแบบครบวงจรแก่นักพัฒนาที่สามารถนำกลับมาใช้ใหม่และควบคุมความสมดุลของการอ่านและการเขียนระหว่างสตรีมผ่านอินเทอร์เฟซสตรีมนามธรรม
2. ทำไมต้องใช้สตรีม
I/O ในโหนดเป็นแบบอะซิงโครนัสดังนั้นการอ่านและการเขียนลงในดิสก์และเครือข่ายจำเป็นต้องอ่านและเขียนลงในข้อมูลผ่านฟังก์ชั่นการโทรกลับ ต่อไปนี้เป็นรหัสง่าย ๆ สำหรับเซิร์ฟเวอร์ดาวน์โหลดไฟล์:
<code> var http = reghed ('http'); var fs = require ('fs'); var server = http.createserver (ฟังก์ชั่น (req, res) {fs.readfile (__ dirname + '/data.txt', ฟังก์ชัน (err, data)รหัสเหล่านี้สามารถใช้ฟังก์ชั่นที่ต้องการได้ แต่บริการจำเป็นต้องแคชข้อมูลไฟล์ทั้งหมดไปยังหน่วยความจำก่อนที่จะส่งข้อมูลไฟล์ หากไฟล์ "data.txt" มีขนาดใหญ่และการเกิดขึ้นพร้อมกันมีขนาดใหญ่หน่วยความจำจำนวนมากจะสูญเปล่า เนื่องจากผู้ใช้จำเป็นต้องรอจนกว่าไฟล์ทั้งหมดจะถูกแคชไปที่หน่วยความจำก่อนที่ข้อมูลไฟล์จะได้รับการยอมรับสิ่งนี้นำไปสู่ประสบการณ์ผู้ใช้ที่แย่มาก โชคดีที่พารามิเตอร์ทั้งสอง (req, res) เป็นสตรีมดังนั้นเราสามารถใช้ fs.createReadStream () แทน fs.readfile ():
<code> var http = require ('http'); var fs = ต้องการ ('fs'); var server = http.createserver (ฟังก์ชั่น (req, res) {var stream = fs.createReadstream (__ dirname + '/data.txt'); stream.pipe (res);};เมธอด. pipe () ฟังเหตุการณ์ 'ข้อมูล' และ 'สิ้นสุด' ของ fs.createReadStream () ดังนั้นไฟล์ "data.txt" ไม่จำเป็นต้องแคชไฟล์ทั้งหมด หลังจากการเชื่อมต่อไคลเอนต์เสร็จสิ้นบล็อกข้อมูลสามารถส่งไปยังไคลเอนต์ได้ทันที ประโยชน์อีกประการหนึ่งของการใช้. pipe () คือมันสามารถแก้ปัญหาการอ่านและการเขียนความไม่สมดุลที่เกิดจากเวลาแฝงของลูกค้าที่มีขนาดใหญ่มาก หากคุณต้องการบีบอัดไฟล์และส่งคุณสามารถใช้โมดูลสามพรรคเพื่อนำไปใช้งาน:
<code> var http = reghed ('http'); var fs = reghed ('fs'); var reppressor = reghed ('repressor'); var server = http.createserver (ฟังก์ชั่น (req, res) {var stream = fs.createReadStream '/data.txt'); stream.pipe (ผู้กดขี่ (req)). pipe (res);}); server.listen (8000); </code>วิธีนี้ไฟล์จะบีบอัดเบราว์เซอร์ที่รองรับ GZIP และ deflate โมดูลผู้กดขี่จัดการการเข้ารหัสเนื้อหาทั้งหมด
สตรีมทำให้โปรแกรมการพัฒนาง่ายขึ้น
3. แนวคิดพื้นฐาน
มีห้าสตรีมพื้นฐาน: อ่านได้เขียนได้การแปลงดูเพล็กซ์และ“ คลาสสิก”
3-1 ท่อ
การใช้สตรีมทุกประเภทใช้. pipe () เพื่อสร้างอินพุตและคู่เอาต์พุตรับสตรีมที่อ่านได้ SRC และส่งออกข้อมูลไปยังสตรีมที่เขียนได้ดังต่อไปนี้:
<code> src.pipe (DST) </code>
เมธอด. pipe (dst) ส่งคืนสตรีม DST เพื่อให้สามารถใช้งานได้หลาย pipe () อย่างต่อเนื่องดังนี้:
<code> a.pipe (b) .pipe (c) .pipe (d) </code>
ฟังก์ชั่นเหมือนกับรหัสต่อไปนี้:
<code> A.pipe (b); b.pipe (c); c.pipe (d); </code>
3-2. สตรีมที่อ่านได้
โดยการเรียกใช้วิธี. pipe () ของสตรีมที่อ่านได้คุณสามารถเขียนข้อมูลของสตรีมที่อ่านได้ไปยังสตรีมที่เขียนได้การแปลงหรือดูเพล็กซ์
<code> readableStream.pipe (DST) </code>
1> สร้างสตรีมที่อ่านได้
ที่นี่เราสร้างสตรีมที่อ่านได้!
<code> var readable = ต้องการ ('สตรีม'). อ่านได้; var rs = ใหม่อ่านได้; rs.push ('beep'); rs.push ('boop/n'); rs.push (null); rs.pipe (process.stdout); $ node read0.jsbeep boop </code> code>Rs.Push (NULL) แจ้งให้ผู้รับข้อมูลทราบว่าข้อมูลได้ถูกส่งไปแล้ว
โปรดทราบว่าเราไม่ได้เรียก rs.pipe (process.stdout); ก่อนที่จะผลักข้อมูลทั้งหมดลงในสตรีมที่อ่านได้ แต่ข้อมูลทั้งหมดลงในสตรีมที่อ่านได้นั้นยังคงเป็นเอาต์พุตอย่างสมบูรณ์เนื่องจากสตรีมที่อ่านได้นั้นแคชข้อมูลที่กดทั้งหมดก่อนที่ผู้รับจะอ่านข้อมูล แต่ในหลายกรณีวิธีที่ดีกว่าคือการกดข้อมูลลงในสตรีมที่อ่านได้เมื่อข้อมูลได้รับข้อมูลที่ร้องขอแทนแคชข้อมูลทั้งหมด มาเขียนฟังก์ชั่น ._read () ด้านล่าง:
<code> var readable = ต้องการ ('stream'). อ่านได้; var rs = readable (); var c = 97; rs._read = function () {rs.push (String.fromCharcode (C ++)); ถ้า (c> 'z'.charcodeat (0)) rs.push (null); read1.jsabcdefghijklmnopqrstuvwxyz </code>รหัสข้างต้นใช้การเขียนใหม่ของเมธอด _read () เพื่อผลักดันข้อมูลไปยังสตรีมที่อ่านได้เฉพาะในกรณีที่ผู้รับข้อมูลร้องขอข้อมูล วิธี _read () ยังสามารถรับพารามิเตอร์ขนาดที่ระบุขนาดข้อมูลที่ร้องขอของข้อมูล แต่สตรีมที่อ่านได้สามารถเพิกเฉยต่อพารามิเตอร์นี้ได้ตามต้องการ
โปรดทราบว่าเรายังสามารถใช้ util.inherits () เพื่อสืบทอดสตรีมที่อ่านได้ เพื่อแสดงให้เห็นว่าเมธอด _read () จะเรียกเฉพาะเมื่อผู้รับข้อมูลร้องขอข้อมูลเราทำการล่าช้าเมื่อผลักข้อมูลไปยังสตรีมที่อ่านได้ดังนี้:
<code> var readable = ต้องการ ('stream'). อ่านได้; var rs = readable (); var c = 97 - 1; rs._read = function () {ถ้า (c> = 'z'.charcodeat (0)) ส่งคืน rs.push (null); settimeout 100;เมื่อเรียกใช้โปรแกรมด้วยคำสั่งต่อไปนี้เราพบว่าวิธี _read () เรียกว่าเพียง 5 ครั้งเท่านั้น:
<code> $ node read2.js | หัว -C5ABCDE_READ () เรียกว่า 5 ครั้ง </code>
เหตุผลในการใช้ตัวจับเวลาคือระบบต้องใช้เวลาในการส่งสัญญาณเพื่อแจ้งให้โปรแกรมปิดท่อ ใช้ process.stdout.on ('ข้อผิดพลาด', fn) เพื่อจัดการระบบที่ส่งสัญญาณ sigpipe เนื่องจากคำสั่งส่วนหัวปิดไปป์ไลน์เนื่องจากสิ่งนี้จะทำให้เกิดกระบวนการเพื่อเรียกเหตุการณ์ EPIPE หากคุณต้องการสร้างสตรีมที่สามารถอ่านได้ซึ่งสามารถกดลงในข้อมูลรูปแบบใดก็ได้เพียงตั้งค่าพารามิเตอร์ ObjectMode เป็น TRUE เมื่อสร้างสตรีมตัวอย่างเช่น: อ่านได้ ({ObjectMode: TRUE})
2> ข้อมูลสตรีมที่อ่านได้
ในกรณีส่วนใหญ่เราเพียงแค่ใช้วิธีการท่อเพื่อเปลี่ยนข้อมูลข้อมูลของสตรีมที่อ่านได้ไปยังสตรีมรูปแบบอื่น แต่ในบางกรณีอาจมีประโยชน์มากกว่าในการอ่านข้อมูลโดยตรงจากสตรีมที่อ่านได้ ดังนี้:
<code> process.stdin.on ('อ่านได้', function () {var buf = process.stdin.read (); console.dir (buf);}); $ (echo abc; sleep 1; echo def; นอน 1; echo ghi) | การบริโภคโหนด 0.js <บัฟเฟอร์ 0a = "" 61 = "" 62 = "" 63 = ""> <บัฟเฟอร์ 0a = "" 64 = "" 65 = "" 66 = ""> <buffer 0a = "" 67 = "" 68 = "69 ="เมื่อมีข้อมูลที่จะอ่านในสตรีมที่อ่านได้สตรีมจะทริกเกอร์เหตุการณ์ 'อ่านได้' เพื่อให้สามารถเรียกใช้วิธี. read () เพื่ออ่านข้อมูลที่เกี่ยวข้อง เมื่อไม่มีข้อมูลที่จะอ่านในสตรีมที่อ่านได้. read () จะส่งคืนค่า null เพื่อให้การเรียกของ. read () สามารถสิ้นสุดลงและรอเหตุการณ์ 'อ่านได้' ครั้งต่อไปที่จะถูกเรียกใช้ นี่คือตัวอย่างของการใช้. read (n) เพื่ออ่าน 3 ไบต์ในแต่ละครั้งจากอินพุตมาตรฐาน:
<code> process.stdin.on ('อ่านได้', function () {var buf = process.stdin.read (3); console.dir (buf);}); </code>การรันโปรแกรมดังต่อไปนี้แสดงให้เห็นว่าผลลัพธ์ผลลัพธ์ยังไม่สมบูรณ์!
<code> $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | การบริโภคโหนด 1.js <บัฟเฟอร์ 61 = "" 62 = "" 63 = ""> <บัฟเฟอร์ 0a = "" 64 = "" 65 = ""> <บัฟเฟอร์ 0a = "" 66 = "" 67 = ""> </buffer> </buffer>
สิ่งนี้ควรทำเพื่อให้ข้อมูลเพิ่มเติมถูกทิ้งไว้ในบัฟเฟอร์ภายในของสตรีมและเราจำเป็นต้องแจ้งสตรีมที่เราต้องการอ่านข้อมูลเพิ่มเติม อ่าน (0) สามารถบรรลุเป้าหมายนี้ได้
<code> process.stdin.on ('อ่านได้', function () {var buf = process.stdin.read (3); console.dir (buf); process.stdin.read (0);}); </code>ผลลัพธ์ของการดำเนินการนี้มีดังนี้:
<code> $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | การบริโภคโหนด 2.js <บัฟเฟอร์ 0a = "" 64 = "" 65 = ""> <บัฟเฟอร์ 0a = "" 68 = "" 69 = ""> </buffer> </buffer> </code>
เราสามารถใช้. unshift () เพื่อปรับขนาดข้อมูลกลับไปที่หัวของคิวข้อมูลสตรีมมิ่งเพื่อให้เราสามารถอ่านข้อมูลที่ถูกเก็บได้ต่อไป เช่นเดียวกับในรหัสต่อไปนี้เนื้อหาอินพุตมาตรฐานจะเป็นเอาต์พุตทีละบรรทัด:
<code> var Offset = 0; process.stdin.on ('อ่านได้', ฟังก์ชั่น () {var buf = process.stdin.read (); ถ้า (! buf) return; สำหรับ (; Offset <buf.length; Offset ++) {if (buf [Offset] === 0x0a) buf.slice (ออฟเซ็ต + 1); ออฟเซ็ต = 0; process.stdin.unshift (buf); return;}} process.stdin.unshift (buf);}); $ tail -n +50000/usr/share/dict/American -english | หัว -N10 | Node Lines.js 'Hearties''Hearties'''heartily'''heartiness'''heartiness''heartiness''Heartiness/' S'''''heartland'''แน่นอนว่ามีโมดูลมากมายที่สามารถใช้ฟังก์ชั่นนี้เช่นแยก
3-3. สตรีมที่เขียนได้
สตรีมที่เขียนได้สามารถใช้เป็นพารามิเตอร์ปลายทางของฟังก์ชัน. pipe () เท่านั้น รหัสต่อไปนี้:
<code> src.pipe (writableStream); </code>
1> สร้างสตรีมที่เขียนได้
เขียนใหม่วิธี ._write (chunk, enc, ถัดไป) เพื่อรับข้อมูลจากสตรีมที่อ่านได้
<code> var writable = ต้องการ ('สตรีม') เขียนได้; var ws = writable (); ws._write = function (chunk, enc, ถัดไป) {console.dir (chunk); next ();}; process.stdin.pipe (ws); $ (echo beep; โหนด write0.js <บัฟเฟอร์ 0a = "" 62 = "" 65 = "" 70 = ""> <บัฟเฟอร์ 0a = "" 62 = "" 6f = "" 70 = ""> </buffer>ชิ้นพารามิเตอร์แรกคือข้อมูลที่เขียนโดยตัวป้อนข้อมูล ปลายพารามิเตอร์ที่สองคือรูปแบบการเข้ารหัสของข้อมูล พารามิเตอร์ที่สามถัดไป (ERR) จะแจ้งให้ผู้เขียนข้อมูลผ่านฟังก์ชั่นการโทรกลับที่สามารถเขียนเวลาได้มากขึ้น หากสตรีมที่อ่านได้เขียนสตริงสตริงจะถูกแปลงเป็นบัฟเฟอร์โดยค่าเริ่มต้น หากพารามิเตอร์ที่เขียนได้ ({decodestrings: false}) ถูกตั้งค่าเมื่อสร้างสตรีมการแปลงจะไม่ถูกดำเนินการ หากข้อมูลถูกเขียนโดยสตรีมที่อ่านได้คุณจะต้องสร้างสตรีมที่เขียนได้ด้วยวิธีนี้
<code> เขียนได้ ({ObjectMode: TRUE}) </code>2> เขียนข้อมูลไปยังสตรีมที่เขียนได้
เรียกใช้วิธีการ. Write (data) ของสตรีมที่เขียนได้เพื่อเขียนข้อมูลให้เสร็จสมบูรณ์
<code> process.stdout.write ('beep boop/n'); </code>การเรียกใช้วิธี. end () จะแจ้งให้สตรีมเขียนได้ว่าข้อมูลได้รับการเขียนเพื่อให้เสร็จสมบูรณ์
<code> var fs = ต้องการ ('fs'); var ws = fs.createwritestream ('message.txt'); ws.write ('beep'); settimeout (ฟังก์ชั่น () {ws.end ('boop/n');}, 1000); $ node writing1.js $ cat.txtหากคุณต้องการตั้งค่าขนาดของบัฟเฟอร์ของสตรีมที่เขียนได้จากนั้นเมื่อสร้างสตรีมคุณจะต้องตั้งค่าการเลือกใช้ highwatermark ดังนั้นหากข้อมูลในบัฟเฟอร์เกิน opts.highwatermark วิธี. watered (data) จะกลับเท็จ เมื่อบัฟเฟอร์สามารถเขียนได้กระแสที่เขียนได้จะทำให้เกิดเหตุการณ์ 'ระบายน้ำ'
3-4. สตรีมคลาสสิก
สตรีมคลาสสิกเป็นอินเทอร์เฟซเก่าซึ่งปรากฏตัวครั้งแรกในโหนด 0.4 เวอร์ชัน แต่ก็ยังดีมากที่จะเข้าใจหลักการปฏิบัติการ
ที่ไหน. เมื่อสตรีมลงทะเบียนกับเหตุการณ์ "ข้อมูล" กลับไปที่ฟังก์ชั่นสตรีมจะทำงานในโหมดเวอร์ชันเก่านั่นคือ API เก่าจะถูกใช้
1> สตรีมที่อ่านได้แบบคลาสสิก
เหตุการณ์สตรีมที่อ่านได้แบบคลาสสิกเป็นทริกเกอร์เหตุการณ์ หากสตรีมที่อ่านได้แบบคลาสสิกมีข้อมูลที่จะอ่านมันจะกระตุ้นเหตุการณ์ "ข้อมูล" เมื่อข้อมูลถูกอ่านเหตุการณ์ "สิ้นสุด" จะถูกเรียกใช้ เมธอด. pipe () กำหนดว่าสตรีมมีข้อมูลที่จะอ่านโดยตรวจสอบค่าของสตรีมอ่านได้หรือไม่ นี่คือตัวอย่างของการพิมพ์ตัวอักษร AJ โดยใช้สตรีมที่อ่านได้แบบคลาสสิก:
<code> var stream = reghed ('stream'); var stream = stream ใหม่; stream.readable = true; var c = 64; var iv = setInterval (ฟังก์ชัน () {ถ้า (++ c> = 75) {clearInterval (iv); stream.emit ('end');} stream.emit Node Classic0.jsabcdefghij </code>หากคุณต้องการอ่านข้อมูลจากสตรีมที่อ่านได้คลาสสิกลงทะเบียนฟังก์ชั่นการโทรกลับของเหตุการณ์สองเหตุการณ์ "ข้อมูล" และ "สิ้นสุด" รหัสมีดังนี้:
<code> process.stdin.on ('data', function (buf) {console.log (buf);}); process.stdin.on ('end', ฟังก์ชั่น () {console.log ('__ end __');}); $ (echo beep; Node Classic1.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>ควรสังเกตว่าหากคุณใช้วิธีนี้ในการอ่านข้อมูลคุณจะสูญเสียประโยชน์จากการใช้อินเทอร์เฟซใหม่ ตัวอย่างเช่นเมื่อคุณเขียนข้อมูลไปยังสตรีมที่มีเวลาแฝงที่สูงมากคุณต้องให้ความสนใจกับความสมดุลระหว่างการอ่านและการเขียนข้อมูลมิฉะนั้นจะทำให้ข้อมูลจำนวนมากถูกแคชในหน่วยความจำทำให้เกิดการเสียหน่วยความจำจำนวนมาก โดยทั่วไปขอแนะนำอย่างยิ่งให้ใช้วิธี. pipe () ของสตรีมเพื่อที่คุณจะได้ไม่ต้องฟังเหตุการณ์ "ข้อมูล" และ "สิ้นสุด" ด้วยตัวคุณเองและคุณไม่ต้องกังวลเกี่ยวกับปัญหาการอ่านและการเขียนที่ไม่สมดุล แน่นอนคุณสามารถใช้ผ่านแทนที่จะฟังเหตุการณ์ "ข้อมูล" และ "จบ" ด้วยตัวคุณเองเช่นรหัสต่อไปนี้:
<code> var ถึง = ต้องการ ('ถึง'); process.stdin.pipe (ผ่าน (เขียน, สิ้นสุด)); ฟังก์ชั่นเขียน (buf) {console.log (buf);} ฟังก์ชันสิ้นสุด () {console.log ('__ end __');} $ (echo beep; Node Thern.js <buffer 0a = "" 62 = "" 65 = "" 70 = ""> <buffer 0a = "" 62 = "" 6f = "" 70 = ""> __ end __ </buffer> </buffer> </code>หรือคุณสามารถใช้ concat-stream เพื่อแคชเนื้อหาของสตรีมทั้งหมด:
<code> var concat = ต้องการ ('concat-stream'); process.stdin.pipe (concat (ฟังก์ชั่น (ร่างกาย) {console.log (json.parse (ร่างกาย));})); $ echo '{"beep": "boop"}' | node concat.js {beep: 'boop'} </do code>แน่นอนถ้าคุณต้องฟังเหตุการณ์ "ข้อมูล" และ "สิ้นสุด" ด้วยตัวคุณเองคุณสามารถใช้วิธี. pause () เพื่อหยุดสตรีมที่อ่านได้แบบคลาสสิกและดำเนินการต่อไปเพื่อเรียกเหตุการณ์ "ข้อมูล" เมื่อกระแสข้อมูลการเขียนไม่สามารถเขียนได้ รอจนกว่าข้อมูลการเขียนสตรีมจะสามารถเขียนได้ก่อนที่จะใช้วิธี. resume () จะแจ้งให้สตรีมทราบเพื่อดำเนินการต่อไปเพื่อเรียกเหตุการณ์ "ข้อมูล" เพื่ออ่านต่อไป
ข้อมูล.
2> สตรีมที่เขียนได้แบบคลาสสิก
สตรีมที่เขียนได้แบบคลาสสิกนั้นง่ายมาก มีเพียงสามวิธีเท่านั้น: .write (buf), .end (buf) และ. destroy () พารามิเตอร์ BUF ของวิธี. end (buf) เป็นทางเลือก หากเลือกพารามิเตอร์นี้จะเทียบเท่ากับ Stream.write (buf); stream.end () การดำเนินการ ควรสังเกตว่าเมื่อบัฟเฟอร์ของสตรีมเต็มนั่นคือสตรีมไม่สามารถเขียนได้ วิธีการเขียน (buf) จะส่งคืนเท็จ หากสตรีมสามารถเขียนได้อีกครั้งสตรีมจะทำให้เกิดเหตุการณ์การระบายน้ำ
4. แปลง
การแปลงเป็นสตรีมที่กรองเอาต์พุตของข้อมูลการอ่าน
5. ดูเพล็กซ์
Duplex Stream เป็นสตรีมสองทางที่สามารถอ่านได้หรือเขียนได้ ตัวอย่างเช่นด้านล่างเป็นสตรีมเพล็กซ์:
<code> a.pipe (b) .pipe (a) </code>
เนื้อหาข้างต้นคือคู่มือผู้ใช้ข้อมูลการไหลของข้อมูล NodeJS สตรีมที่แนะนำโดยตัวแก้ไข ฉันหวังว่ามันจะเป็นประโยชน์กับคุณ!