1. Opening analysis
Stream is an abstract interface implemented by many objects in Node. For example, a request to an HTTP server is a stream, and stdout is also a stream. Streams are readable, writable or both.
The first contact with Stream started with the early Unix. Decades of practice have proved that Stream’s idea can simply develop some huge systems.
In unix, Stream is implemented through "|". In node, as a built-in stream module, many core modules and three-party modules are used.
Like unix, the main operation of node stream is .pipe(), and users can use the counterpressure mechanism to control the balance between read and write.
Stream can provide developers with a unified interface that can reuse and control the read and write balance between Streams through abstract Stream interfaces.
A TCP connection is both a readable stream and a writable stream, while an Http connection is different. An http request object is a readable stream, while an http response object is a writable stream.
The stream transmission process is transmitted in the form of a buffer by default, unless you set other encoding forms for it, the following is an example:
The code copy is as follows:
var http = require('http') ;
var server = http.createServer(function(req,res){
res.writeHeader(200, {'Content-Type': 'text/plain'}) ;
res.end("Hello, Big Bear!") ;
}) ;
server.listen(8888);
console.log("http server running on port 8888...") ;
After running, garbled code will appear because the specified character set is not set, such as: "utf-8".
Just modify it:
The code copy is as follows:
var http = require('http') ;
var server = http.createServer(function(req,res){
res.writeHeader(200,{
'Content-Type' : 'text/plain;charset=utf-8' // Add charset=utf-8
}) ;
res.end("Hello, Big Bear!") ;
}) ;
server.listen(8888);
console.log("http server running on port 8888...") ;
Running results:
Why use Stream
The I/O in the node is asynchronous, so reading and writing to disk and network requires reading and reading data through callback functions. The following is an example of a file download
On code:
The code copy is as follows:
var http = require('http') ;
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
}) ;
}) ;
server.listen(8888);
The code can implement the required functions, but the service needs to cache the entire file data to memory before sending the file data. If the "data.txt" file is very
If it is large and has a large concurrency, a lot of memory will be wasted. Because the user needs to wait until the entire file is cached to memory to accept the file data, this leads to
The user experience is quite bad. Fortunately, both parameters (req,res) are Stream, so we can use fs.createReadStream() instead of fs.readFile(). as follows:
The code copy is as follows:
var http = require('http') ;
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt') ;
stream.pipe(res);
}) ;
server.listen(8888);
The .pipe() method listens to the 'data' and 'end' events of fs.createReadStream(), so that the "data.txt" file does not need to be cached.
A file can be sent to the client immediately after the client connection is completed. Another benefit of using .pipe() is that it can be solved when a customer
Read and write imbalance caused by very large end delay.
There are five basic Streams: readable, writable, transform, duplex, and "classic". (Please check the API for details)
2. Introduce examples
When the data that needs to be processed cannot be loaded in memory at one time, or when the processing is more efficient while reading, we need to use data streams. NodeJS provides operations on data streams through various Streams.
Taking the large file copy program as an example, we can create a read-only data stream for the data source, the example is as follows:
The code copy is as follows:
var rs = fs.createReadStream(pathname);
rs.on('data', function (chunk) {
doSomething(chunk); // Use the specific details as you want
});
rs.on('end', function () {
cleanUp() ;
}) ;
Data events in the code will be triggered continuously, regardless of whether the doSomething function can be processed. The code can be modified as follows to solve this problem.
The code copy is as follows:
var rs = fs.createReadStream(src);
rs.on('data', function (chunk) {
rs.pause();
doSomething(chunk, function () {
rs.resume() ;
}) ;
}) ;
rs.on('end', function () {
cleanUp();
}) ;
A callback is added to the doSomething function, so we can pause data reading before processing data and continue to read the data after processing data.
In addition, we can also create a write-only data stream for the data target, as follows:
The code copy is as follows:
var rs = fs.createReadStream(src);
var ws = fs.createWriteStream(dst);
rs.on('data', function (chunk) {
ws.write(chunk);
}) ;
rs.on('end', function () {
ws.end();
}) ;
After doSomething is replaced by writing data into the write-only stream, the above code looks like a file copy program. However, the above code has the problems mentioned above. If the write speed cannot keep up with the read speed, only writing the cache inside the data stream will explode. We can judge whether the incoming data has been written to the target or temporarily placed in the cache based on the return value of the .write method, and judge when only the write data has been written to the target based on the drain event, and pass in the next data to be written to. Therefore the code is as follows:
The code copy is as follows:
var rs = fs.createReadStream(src);
var ws = fs.createWriteStream(dst);
rs.on('data', function (chunk) {
if (ws.write(chunk) === false) {
rs.pause();
}
}) ;
rs.on('end', function () {
ws.end();
});
ws.on('drain', function () {
rs.resume();
}) ;
Finally, the data transfer from read-only data stream to write-only data stream is realized, and the explosion-proof warehouse control is included. Because there are many usage scenarios, such as the above large file copy program, NodeJS directly provides the .pipe method to do this, and its internal implementation method is similar to the above code.
Here is a more complete process of copying files:
The code copy is as follows:
var fs = require('fs'),
path = require('path'),
out = process.stdout;
var filePath = '/bb/bigbear.mkv';
var readStream = fs.createReadStream(filePath);
var writeStream = fs.createWriteStream('file.mkv');
var stat = fs.statSync(filePath);
var totalSize = stat.size;
var passedLength = 0;
var lastSize = 0;
var startTime = Date.now();
readStream.on('data', function(chunk) {
passedLength += chunk.length;
if (writeStream.write(chunk) === false) {
readStream.pause();
}
});
readStream.on('end', function() {
writeStream.end();
});
writeStream.on('drain', function() {
readStream.resume();
});
setTimeout(function show() {
var percent = Math.ceil((passedLength / totalSize) * 100);
var size = Math.ceil(passedLength / 1000000);
var diff = size - lastSize;
lastSize = size;
out.clearLine();
out.cursorTo(0);
out.write('Completed' + size + 'MB, ' + percent + '%, Speed: ' + diff * 2 + 'MB/s');
if (passedLength < totalSize) {
setTimeout(show, 500);
} else {
var endTime = Date.now();
console.log();
console.log('When shared:' + (endTime - startTime) / 1000 + 'seconds.');
}
}, 500);
You can save the above code as "copy.js". Experiment: We added a recursive setTimeout (or directly use setInterval) to be a bystander.
Observe the completion progress every 500ms, and write the completed size, percentage and copy speed to the console together. When copying is completed, the total time is calculated.
Three, let's summarize
(1) Understand the concept of Stream.
(2) Proficient in using relevant Stream API
(3) Pay attention to the control of details, such as: copying large files, using the form of "chunk data" for sharding.
(4), the use of pipe
(5), emphasize a concept again: a TCP connection is both a readable stream and a writable stream, while an Http connection is different. An http request object is a readable stream, while an http response object is a writable stream.