node基础篇之stream流
kekobin opened this issue · 0 comments
kekobin commented
简介
流是数据的集合 - 就像数组或者字符串。差异就是流可能不是一次性获取到的,它们不需要匹配内存。这让流在处理大容量数据,或者从一个额外的源每次获取一块数据的时候变得非常强大。
然而,流不仅可以处理大容量的数据。它们也给了我们在代码中组合的能力。类似linux的管道命令一样:
const grep = ... // grep 输出流
const wc = ... // wc 输入流
grep.pipe(wc)
上面的TCP sockets,zlib,crypto 流即是可读也是可写流。
Stream分类
在nodejs中,有四种stream类型:
- Readable:用来读取数据,比如 fs.createReadStream()。
- Writable:用来写数据,比如 fs.createWriteStream()。
- Duplex:可读+可写,比如 net.Socket()。
- Transform:在读写的过程中,可以对数据进行修改,比如 zlib.createDeflate()(数据压缩/解压)。
所有的流都是 EventEmitter 的实例。触发它们的事件可以读或者写入数据,然而,我们可以使用 pipe 方法消费流的数据。
消费流的方式一:pipe 方法
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
pipe 方法是消费流最简单的方法,类似linux中的管道,好处在于,如果源文件较大,对于降低内存占用有好处。
消费流的方式二: Stream 事件
pipe方式随便很简单和方便,但是在某些场景下,比如需要对读取的流进程某些操作之后再写出时,就必须得使用事件的方式了:
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
上面的drain 事件指的是: 当可写流可以接受更多的数据时的一个标志。
finish 事件指的是: 当所有的数据都写入到底层系统中时会触发。
实现一个可写流
可写流基于以下api实现:
const { Writable } = require('stream');
完整的一个示例:
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
这等价于内部实现的 process.stdout, 即下面的代码与上面的功能等价:
process.stdin.pipe(process.stdout);
实现一个可读流
可读流基于以下api实现:
const { Readable } = require('stream');
const inStream = new Readable({});
简单的完整示例:
const { Readable } = require('stream');
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
实现双向流和转换流
双向流也就是将上面的可读可写流的实现进行结合即可:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
转换流
对于转换流,们没必要实现 read 或者 write 方法,我们仅仅需要实现 transform 方法,它结合了它们两个。它有一个 write 方法的特性并且我们也可以使用它来 push 数据:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
//将每个 chunk 转换到大写的格式,并且作为可读的一部分被 push 到可读流里面了
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
Node 的内置转换流
主要是zlib 和 crypto 流。
例一:使用 zlib.crreateGzip() 流与 fs readable/writable 流结合起来创建一个文件压缩的脚本
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
也可以结合各种事件进行监听:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('.'))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
大文件读取与写入示例
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
// res 是可写流,所以可以由可读流直接流入
src.pipe(res);
});
server.listen(8000);