1. Introduction
This article introduces the basic methods of developing programs using node.js streams.
<code>"We should have some ways of connecting programs like garden hose--screw inanother segment when it becomes necessary to massage data inanother way. This is the way of IO also."Doug McIlroy. October 11, 1964</code>
The first to come into contact with Stream was the decades of practice that began with the early Unix, which proved that Stream ideas can simply develop some huge systems. In Unix, Stream is implemented through |; in node, as a built-in stream module, many core modules and three-party modules are used. Like unix, the main operation of node Stream is .pipe(), and users can use the counterpressure mechanism to control the balance between read and write.
Stream can provide developers with a unified interface that can reuse and control the read and write balance between Streams through abstract Stream interfaces.
2. Why use Stream
I/O in node is asynchronous, so reading and writing to disk and network requires reading and writing to data through callback functions. The following is a simple code for a file download server:
<code>var http = require('http');var fs = require('fs');var server = http.createServer(function (req, res) {fs.readFile(__dirname + '/data.txt', function (err, data) {res.end(data);});});server.listen(8000);</code>These codes can implement the required functions, but the service needs to cache the entire file data to memory before sending the file data. If the "data.txt" file is large and the concurrency is large, a lot of memory will be wasted. Because the user needs to wait until the entire file is cached to memory before the file data can be accepted, this leads to a very bad user experience. Fortunately, both parameters (req, res) are Stream, so we can use fs.createReadStream() instead of fs.readFile():
<code>var http = require('http');var fs = require('fs');var server = http.createServer(function (req, res) {var stream = fs.createReadStream(__dirname + '/data.txt');stream.pipe(res);});server.listen(8000);</code>The .pipe() method listens to the 'data' and 'end' events of fs.createReadStream(), so that the "data.txt" file does not need to cache the entire file. After the client connection is completed, a data block can be sent to the client immediately. Another benefit of using .pipe() is that it can solve the problem of read and write imbalance caused by very large client latency. If you want to compress the file and send it, you can use a three-party module to implement it:
<code>var http = require('http');var fs = require('fs');var oppressor = require('oppressor');var server = http.createServer(function (req, res) {var stream = fs.createReadStream(__dirname + '/data.txt');stream.pipe(oppressor(req)).pipe(res);});server.listen(8000);</code>This way the file will compress the browser that supports gzip and deflate. The oppressor module handles all content-encoding.
Stream makes developing programs simple.
3. Basic concepts
There are five basic Streams: readable, writable, transform, duplex, and “classic”.
3-1, pipe
All types of Stream use .pipe() to create an input and output pair, receive a readable stream src and output its data to the writable stream dst, as follows:
<code>src.pipe(dst)</code>
The .pipe( dst ) method returns the dst stream, so that multiple .pipe() can be used successively, as follows:
<code>a.pipe( b ).pipe( c ).pipe( d )</code>
The function is the same as the following code:
<code>a.pipe( b );b.pipe( c );c.pipe( d );</code>
3-2. Readable streams
By calling the .pipe() method of Readable streams, you can write the data of Readable streams to a Writable, Transform, or Duplex stream.
<code>readableStream.pipe( dst )</code>
1>Create a readable stream
Here we create a readable stream!
<code>var Readable = require('stream').Readable;var rs = new Readable;rs.push('beep ');rs.push('boop/n');rs.push(null);rs.pipe(process.stdout);$ node read0.jsbeep boop</code>rs.push( null ) notifies the data recipient that the data has been sent.
Note that we did not call rs.pipe(process.stdout); before pushing all data into the readable stream, but all data into the readable stream is still output completely, because the readable stream caches all the pressed data before the receiver reads the data. But in many cases, a better way is to only press the data into the readable stream when the data is receiving the requested data instead of cache the entire data. Let's rewrite the ._read() function below:
<code>var Readable = require('stream').Readable;var rs = Readable();var c = 97;rs._read = function () {rs.push(String.fromCharCode(c++));if (c > 'z'.charCodeAt(0)) rs.push(null);};rs.pipe(process.stdout);</code><code>$ node read1.jsabcdefghijklmnopqrstuvwxyz</code>The above code implements the rewrite of the _read() method to push data into the readable stream only if the data recipient requests the data. The _read() method can also receive a size parameter that indicates the requested data size of the data, but the readable stream can ignore this parameter as needed.
Note that we can also use util.inherits() to inherit readable streams. To illustrate that the _read() method is called only when the data recipient requests data, we make a delay when pushing data into the readable stream, as follows:
<code>var Readable = require('stream').Readable;var rs = Readable();var c = 97 - 1;rs._read = function () {if (c >= 'z'.charCodeAt(0)) return rs.push(null);setTimeout(function () {rs.push(String.fromCharCode(++c));}, 100);};rs.pipe(process.stdout);process.on('exit', function () {console.error('/n_read() called ' + (c - 97) + ' times');});process.stdout.on('error', process.exit);</code>When running the program with the following command, we found that the _read() method was called only 5 times:
<code>$ node read2.js | head -c5abcde_read() called 5 times</code>
The reason for using a timer is that the system takes time to send signals to inform the program to close the pipeline. Use process.stdout.on('error', fn) to handle the system sending a SIGPIPE signal because the header command closes the pipeline, because this will cause process.stdout to trigger the EPIPE event. If you want to create a readable stream that can be pressed into any form of data, just set the parameter objectMode to true when creating the stream, for example: Readable({ objectMode: true }).
2>Readable stream data
In most cases, we simply use the pipe method to redirect the data of the readable stream to another form of stream, but in some cases it may be more useful to read data directly from the readable stream. as follows:
<code>process.stdin.on('readable', function () {var buf = process.stdin.read();console.dir(buf);});$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consumption0.js <buffer 0a="" 61="" 62="" 63=""><buffer 0a="" 64="" 65="" 66=""><buffer 0a="" 67="" 68="" 69="">null</buffer></buffer></buffer></code>When there is data to be read in the readable stream, the stream will trigger the 'readable' event, so that the .read() method can be called to read the relevant data. When there is no data to be read in the readable stream, .read() will return null, so that the call of .read() can be ended and wait for the next 'readable' event to be triggered. Here is an example of using .read(n) to read 3 bytes each time from standard input:
<code>process.stdin.on('readable', function () {var buf = process.stdin.read(3);console.dir(buf);});</code>Running the program as follows shows that the output results are not complete!
<code>$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consumption1.js <buffer 61="" 62="" 63=""><buffer 0a="" 64="" 65=""><buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>
This should be done for the additional data to be left in the internal buffer of the stream, and we need to notify the stream that we want to read more data. read(0) can achieve this.
<code>process.stdin.on('readable', function () {var buf = process.stdin.read(3);console.dir(buf);process.stdin.read(0);});</code>The results of this run are as follows:
<code>$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consumption2.js <buffer 0a="" 64="" 65=""><buffer 0a="" 68="" 69=""></buffer></buffer></code>
We can use .unshift() to resize the data back to the head of the streaming data queue, so that we can continue to read the staked data. As in the following code, the standard input content will be output by line:
<code>var offset = 0;process.stdin.on('readable', function () {var buf = process.stdin.read();if (!buf) return;for (; offset < buf.length; offset++) {if (buf[offset] === 0x0a) {console.dir(buf.slice(0, offset).toString());buf = buf.slice(offset + 1);offset = 0;process.stdin.unshift(buf);return;}}process.stdin.unshift(buf);});$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 'hearties''hearties''heartily''heartiness''heartiness''heartiness''heartiness/'s''heartland''heartland/'s''heartlands''heartless''heartlessly'</code>Of course, there are many modules that can implement this function, such as split.
3-3. writable streams
writable streams can only be used as the destination parameter of the .pipe() function. The following code:
<code>src.pipe( writableStream );</code>
1>Create a writable stream
Rewrite the ._write(chunk, enc, next) method to accept data from a readable stream.
<code>var Writable = require('stream').Writable;var ws = Writable();ws._write = function (chunk, enc, next) {console.dir(chunk);next();};process.stdin.pipe(ws);$ (echo beep; sleep 1; echo boop) | node write0.js <buffer 0a="" 62="" 65="" 70=""><buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>The first parameter chunk is the data written by the data inputer. The second parameter end is the encoding format of the data. The third parameter next(err) notifies the data writer through the callback function that more time can be written. If the readable stream writes a string, the string will be converted to a Buffer by default. If the Writable({ decodeStrings: false }) parameter is set when creating the stream, then the conversion will not be performed. If the data is written by the readable stream, then you need to create a writable stream in this way
<code>Writable({ objectMode: true })</code>2>Write data to writable stream
Call the .write(data) method of writable stream to complete data writing.
<code>process.stdout.write('beep boop/n');</code>Calling the .end() method notifies the writable stream that the data has been written to complete.
<code>var fs = require('fs');var ws = fs.createWriteStream('message.txt');ws.write('beep ');setTimeout(function () {ws.end('boop/n');}, 1000);$ node writing1.js $ cat message.txtbeep boop</code>If you need to set the size of the buffer of the writable stream, then when creating the stream, you need to set opts.highWaterMark, so that if the data in the buffer exceeds opts.highWaterMark, the .write(data) method will return false. When the buffer is writable, the writable stream will trigger the 'drain' event.
3-4. classic streams
Classic streams is an older interface, which first appeared in node 0.4 version, but it is still very good to understand its operating principle.
where. When a stream is registered with the "data" event back to the function, the stream will work in the old version mode, that is, the old API will be used.
1>classic readable streams
The Classic readable streams event is an event trigger. If Classic readable streams has data to read, it triggers the "data" event. When the data is read, the "end" event will be triggered. The .pipe() method determines whether the stream has data to read by checking the value of stream.readable. Here is an example of printing AJ letters using Classic readable streams:
<code>var Stream = require('stream');var stream = new Stream;stream.readable = true;var c = 64;var iv = setInterval(function () {if (++c >= 75) {clearInterval(iv);stream.emit('end');}else stream.emit('data', String.fromCharCode(c));}, 100);stream.pipe(process.stdout);$ node classic0.jsABCDEFGHIJ</code>If you want to read data from the classic readable stream, register the callback functions of the two events "data" and "end" events, the code is as follows:
<code>process.stdin.on('data', function (buf) {console.log(buf);});process.stdin.on('end', function () {console.log('__END__');});$ (echo beep; sleep 1; echo boop) | node classic1.js <buffer 0a="" 62="" 65="" 70=""><buffer 0a="" 62="" 6f="" 70="">__END__</buffer></buffer></code>It should be noted that if you use this method to read data, you will lose the benefits of using the new interface. For example, when you write data to a stream with a very high latency, you need to pay attention to the balance between reading and writing data, otherwise it will cause a large amount of data to be cached in memory, resulting in a waste of a lot of memory. Generally, it is strongly recommended to use the .pipe() method of the stream, so that you don’t have to listen to the “data” and “end” events yourself, and you don’t have to worry about the problem of unbalanced reading and writing. Of course, you can also use through instead of listening to the "data" and "end" events yourself, such as the following code:
<code>var through = require('through');process.stdin.pipe(through(write, end));function write (buf) {console.log(buf);}function end () {console.log('__END__');}$ (echo beep; sleep 1; echo boop) | node through.js <buffer 0a="" 62="" 65="" 70=""><buffer 0a="" 62="" 6f="" 70="">__END__</buffer></buffer></code>Or you can also use concat-stream to cache the content of the entire stream:
<code>var concat = require('concat-stream');process.stdin.pipe(concat(function (body) {console.log(JSON.parse(body));}));$ echo '{"beep":"boop"}' | node concat.js { beep: 'boop' }</code>Of course, if you have to listen to the "data" and "end" events yourself, then you can use the .pause() method to pause Classic readable streams and continue to trigger the "data" event when the stream of writing data is not writable. Wait until the stream writing data is writeable before using the .resume() method notifies the stream to continue to trigger the "data" event to continue to read.
data.
2>classic writable streams
Classic writable streams are very simple. There are only three methods: .write(buf), .end(buf) and .destroy(). The buf parameter of the .end(buf) method is optional. If you select this parameter, it is equivalent to stream.write(buf); stream.end(). It should be noted that when the stream's buffer is full, that is, the stream cannot be written. The write(buf) method will return false. If the stream is writeable again, the stream will trigger the drain event.
4. Transform
transform is a stream that filters the output of read data.
5. Duplex
Duplex stream is a two-way stream that can be readable or written. For example, a below is a duplex stream:
<code>a.pipe(b).pipe(a)</code>
The above content is the Nodejs Stream data flow user manual introduced to you by the editor. I hope it will be helpful to you!