作為前端,我們常常會和 Stream 有著頻繁的接觸。比如使用 gulp 對項目進行構建的時候,我們會使用 gulp.src 接口將匹配到的文件轉為 stream(流)的形式,再通過 .pipe() 接口對其進行鏈式加工處理;
或者比如我們通過 http 模塊創建一個 HTTP 服務:
const http = require('http'); http.createServer( (req, res) => { //... }).listen(3000);
此處的 req 和 res 也屬于 Stream 的消費接口(前者為 Readable Stream,后者為 Writable Stream)。
事實上像上述的 req/res,或者 process.stdout 等接口都屬于 Stream 的實例,因此較少存在情況,是需要我們手動引入 Stream 模塊的,例如:
//demo1.js 'use strict'; const Readable = require('stream').Readable; const rs = Readable(); const s = 'VaJoy'; const l = s.length; let i = 0; rs._read = ()=>{ if(i == l){ rs.push(' is my name'); return rs.push(null) } rs.push(s[i++]) }; rs.pipe(process.stdout);
如果不太能讀懂上述代碼,或者對 Stream 的概念感到模糊,那么可以放輕松,因為本文會進一步地對 Stream 進行剖析,并且談談直接使用它可能會存在的一些問題(這也是為何 gulp 要使用 through2 的原因)。
另外本文的示例均可在我的 github 倉庫(https://github.com/VaJoy/stream/)獲取到,讀者可以自行下載和調試。
一. Stream的作用
在介紹 Stream(流)之前,我們先來看一個例子 —— 模擬服務器把本地某個文件內容吐給客戶端:
//demo2 var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(3000);
這段代碼雖然可以正常執行,但存在一個顯著的問題 —— 對于每一個客戶端的請求,fs.readFile 接口都會把整個文件都緩存到內存中去,然后才開始把數據吐給用戶。那么當文件體積很大、請求也較多(且特別當請求來自慢速用戶)的時候,服務器需要消耗很大的內存,導致性能低下。
然而這個問題,則正是 stream 發揮所長的地方。如前文提及的,res 是流對象,那我們正好可以將其利用起來:
var server2 = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server2.listen(4000);
在上方代碼段里,fs.createReadStream 創建了 data.txt 的可讀流(Readable Stream)。這里需要事先了解的是,流可以簡單地分為“可讀的(readable)”、“可寫的(writable)”,或者“讀寫均可”三種類型,且所有的流都屬于 EventEmitter 的實例。
回到代碼,對于創建的可讀流,我們通過 .pipe() 接口來監聽其 data 和 end 事件,并把 data.txt (的可讀流)拆分成一小塊一小塊的數據(chunks),像流水一樣源源不斷地吐給客戶端,而不再需要等待整個文件都加載到內存后才發送數據。
其中 .pipe 可以視為流的“管道/通道”方法,任何類型的流都會有這個 .pipe 方法去成對處理流的輸入與輸出。
為了方便理解,我們把上述兩種方式(不使用流/使用流)處理為如下的情景(臥槽我好好一個前端為啥要P這么萌的圖):
⑴ 不使用流:
⑵ 使用流:
由此可以得知,使用流(stream)的形式,可以大大提升響應時間,又能有效減輕服務器內存的壓力。
二. Stream的分類
在上文我們曾提及到,stream 可以按讀寫權限來簡單地分做三類,不過這里我們再細化下,可以把 stream 歸為如下五個類別:
⑴ Readable Streams
⑵ Writable Streams
⑶ Transform Streams
⑷ Duplex Streams
⑸ Classic Streams
其中 Transform Streams 和 Duplex Streams 都屬于即可讀又可寫的流,而最后一個 Classic Streams 是對 Node 古早版本上的 Stream 的一個統稱。我們將照例對其進行逐一介紹。
2.1 Readable Streams
即可讀流,通過 .pipe 接口可以將其數據傳遞給一個 writable、transform 或者 duplex流:
readableStream.pipe(dst)
常見的 Readable Streams 包括:
- 客戶端上的 HTTP responses
- 服務端上的 HTTP requests
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- 子進程的 stdout 和 stderr
- process.stdin
例如在前面 demo2 的代碼段中,我們就使用了 fs.createReadStream 接口來創建了一個 fs read stream:
var server2 = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server2.listen(4000);
這里有個有趣的地方 —— 雖然 Readable Streams 稱為可讀流,但在將其傳入一個消耗對象之前,它都是可寫的:
var Readable = require('stream').Readable; var rs = new Readable; rs.push('servers '); rs.push('are listening on\n'); rs.push('3000 and 4000\n'); rs.push(null); rs.pipe(process.stdout);
執行結果:
在這段代碼中,我們通過 readStream.push(data) 的形式往可讀流里注入數據,并以 readStream.push(null) 來結束可讀流。
不過這種寫法有個弊端 —— 從使用 .push() 將數據注入 readable 流中開始,直到另一個東西(process.stdout)來消耗數據之前,這些數據都會存在緩存中。
這里有個內置接口 ._read() 可以用來處理這個問題,它是從系統底層開始讀取數據流時才會不斷調用自身,從而減少緩存冗余。
我們可以回過頭來看 demo1 的例子:
'use strict'; const Readable = require('stream').Readable; const rs = Readable(); const s = 'VaJoy'; const l = s.length; let i = 0; rs._read = ()=>{ if(i == l){ rs.push(' is my name'); return rs.push(null) } rs.push(s[i++]) }; rs.pipe(process.stdout);
我們是在 ._read 方法中才使用 readStream.push(data) 往可讀流里注入數據供下游消耗(也會流經緩存),從而提升流處理的性能。
這里也有個小問題 —— 上一句話所提到的“供下游消耗”,這個下游通常又會以怎樣的形式來消耗可讀流的呢?
首先,可以使用我們熟悉的 .pipe() 方法將可讀流推送給一個消耗對象(writable、transform 或者 duplex流):
//ext1 const fs = require('fs'); const zlib = require('zlib'); const r = fs.createReadStream('data.txt'); const z = zlib.createGzip(); const w = fs.createWriteStream('data.txt.gz'); r.pipe(z).pipe(w);
其次,也可以通過監聽可讀流的“data”事件(別忘了文章前面提到的“所有的流都屬于 EventEmitter 的實例”)來實現消耗處理 —— 在首次監聽其 data 事件后,readStream 便會持續不斷地調用 _read(),通過觸發 data 事件將數據輸出。當數據全部被消耗時,則觸發 end 事件。
示例:
//demo3 const Readable = require('stream').Readable; class ToReadable extends Readable { constructor(iterator) { super(); this.iterator = iterator } _read() { const res = this.iterator.next(); if (res.done) { // 迭代結束,順便結束可讀流 this.push(null) } setTimeout(() => { // 將數據添加到流中 this.push(res.value + '\n') }, 0) } } const gen = function *(a){ let count = 5, res = a; while(count--){ res = res*res; yield res } }; const readable = new ToReadable(gen(2)); // 監聽`data`事件,一次獲取一個數據 readable.on('data', data => process.stdout.write(data)); // 可讀流消耗完畢 readable.on('end', () => process.stdout.write('readable stream ends~'));
執行結果為:
這里需要留意的是,在使用 .push() 往可讀流里注入數據的代碼段,我們使用了 setTimeout 將其包裹起來,這是為了讓系統能有足夠時間優先處理接收流結束信號的事務。當然你也可以改寫為:
if (res.done) { // 直接 return return this.push(null) } this.push(res.value + '\n')
2.2 Writable Streams
Writable(可寫)流接口是對寫入數據的目標的抽象:
src.pipe(writableStream)
常見的 Writable Streams 包括:
- 客戶端的 HTTP requests
- 服務端的 HTTP responses
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- 子進程的 stdin
- process.stdout 和 process.stderr
可寫流有兩個重要的方法:
- writableStream.write(chunk[, encoding, callback]) —— 往可寫流里寫入數據;
- writableStream.end([chunk, encoding, callback]) —— 停止寫入數據,結束可寫流。在調用 .end() 后,再調用 .write() 方法會產生錯誤。
上方兩方法的 encoding 參數表示編碼字符串(chunk為String時才可以用)。
write 方法的 callback 回調參數會在 chunk 被消費后(從緩存中移除后)被觸發;end 方法的 callback 回調參數則在 Stream 結束時觸發。
另外,如同通過 readable._read() 方法可以處理可讀流,我們可以通過 writable._write(chunk, enc, next) 方法在系統底層處理流寫入的邏輯中,對數據進行處理。
其中參數 chunk 代表寫進來的數據;enc 代表編碼的字符串;next(err) 則是一個回調函數,調用它可以告知消費者進行下一輪的數據流寫入。
示例:
//demo4 const Writable = require('stream').Writable; const writable = Writable(); writable._write = (chunck, enc, next) => { // 輸出打印 process.stdout.write(chunck.toString().toUpperCase()); // 寫入完成時,調用`next()`方法通知流傳入下一個數據 process.nextTick(next) }; // 所有數據均已寫入底層 writable.on('finish', () => process.stdout.write('DONE')); // 將一個數據寫入流中 writable.write('a' + '\n'); writable.write('b' + '\n'); writable.write('c' + '\n'); // 再無數據寫入流時,需要調用`end`方法 writable.end();
執行如下:
2.3 Duplex Streams
Duplex 是雙工的意思,因此很容易猜到 Duplex 流就是既能讀又能寫的一類流,它繼承了 Readable 和 Writable 的接口。
常見的 Duplex Streams 有:
- TCP sockets
- zlib streams
- crypto streams
示例:
//demo5 const Duplex = require('stream').Duplex; const duplex = Duplex(); duplex._read = function () { var date = new Date(); this.push( date.getFullYear().toString() ); this.push(null) }; duplex._write = function (buf, enc, next) { console.log( buf.toString() + '\n' ); next() }; duplex.on('data', data => console.log( data.toString() )); duplex.write('the year is'); duplex.end();
執行結果:
2.4 Transform Streams
Transform Stream 是在繼承了 Duplex Streams 的基礎上再進行了擴展,它可以把寫入的數據和輸出的數據,通過 ._transform 接口關聯起來。
常見的 Transform Streams 有:
- zlib streams
- crypto streams
示例:
//demo6 const Transform = require('stream').Transform; class SetName extends Transform { constructor(name, option) { super(option || {}); this.name = name || '' } // .write接口寫入的數據,處理后直接從 data 事件的回調中可取得 _transform(buf, enc, next) { var res = buf.toString().toUpperCase(); this.push(res + this.name + '\n'); next() } } var transform = new SetName('VaJoy'); transform.on('data', data => process.stdout.write(data)); transform.write('my name is '); transform.write('here is '); transform.end();
執行結果:
其中的 _transform 是 Transform Streams 的內置方法,所有 Transform Streams 都需要使用該接口來接收輸入和處理輸出,且該方法只能由子類來調用。
_transform 接口格式如下:
transform._transform(chunk, encoding, callback)
第一個參數表示被轉換(transformed)的數據塊(chunk),除非構造方法 option 參數(可選)傳入了 “decodeString : false”,否則其類型均為 Buffer;
第二個參數用于設置編碼,但只有當 chunck 為 String 格式(即構造方法傳入 “decodeString : false”參數)的時候才可配置,否則默認為“buffer”;
第三個參數 callback 用于在 chunk 被處理后調用,通知系統進入下一輪 _transform 調用。該回調方法接收兩個可選參數 —— callback([error, data]),其中的 data 參數可以將 chunck 寫入緩存中(供更后面的消費者去消費):
transform.prototype._transform = function(data, encoding, callback){ this.push(data); callback() }; ///////等價于 transform.prototype._transform = function(data, encoding, callback){ callback(null, data) };
另外 Transform Streams 還有一個 _flush(callback) 內置方法,它會在沒有更多可消耗的數據時、在“end”事件之前被觸發,而且會清空緩存數據并結束 Stream。
該內置方法同樣只允許由子類來調用,而且執行后,不能再調用 .push 方法。
關于 Transform Streams 的更多細節還可以參考這篇文章,推薦閱讀。
2.5 Classic Streams
在較早版本的 NodeJS 里,Stream 的實現相較簡陋,例如上文提及的“Stream.Readable”接口均是從 Node 0.9.4 開始才有,因此我們往往需要對其進行多次封裝擴展才能更好地用來開發。
而 Classic Streams 便是對這種古舊模式的 Stream 接口的統稱。
需要留意的是,只要往任意一個 stream 注冊一個“data”事件監聽器,它就會自動切換到“classic”模式,并按照舊的 API 去執行。
classic 流可以當作一個帶有 .pipe 接口的事件發射器(event emitter),當它要為消耗者提供數據時會發射“data”事件,當要結束生產數據時,則發射“end”事件。
另外只有當設置 Stream.readable 為 true 時,.pipe 接口才會將當前流視作可讀流:
//demo7 var Stream = require('stream'); var stream = new Stream(); stream.readable = true; //告訴 .pipe 這是個可讀流 var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit('end'); } else stream.emit('data', String.fromCharCode(c)); }, 100); stream.pipe(process.stdout);
另外,Classic readable streams 還有 .pause() 和 .resume() 兩個接口可用于暫停/恢復流的讀取:
createServer(function(q,s) { // ADVISORY only! q.pause() session(q, function(ses) { q.on('data', handler) q.resume() }) })
3. Object Mode
對于可讀流來說,push(data) 時,data 的類型只能是 String 或Buffer,且消耗時 data 事件輸出的數據類型都為 Buffer;
對于可寫流來說,write(data) 時,data 的類型也只能是 String 或 Buffer,_write(data) 調用時所傳進來的 data 類型都為 Buffer。
示例:
//demo8 writable._write = (chunck, enc, next) => { // 輸出打印 console.log(chunck); //Buffer //console.log(chunck.toString()); //轉為String process.nextTick(next) }; writable.write('Happy Chinese Year'); writable.end();
執行結果:
不過,為了增強數據類型的靈活性,無論是可讀流或是可寫流,只需要往其構造函數里傳入配置參數“{ objectMode: true }”,便可往流里傳入/獲取任意類型(null除外)的數據:
const objectModeWritable = Writable({ objectMode: true }); objectModeWritable._write = (chunck, enc, next) => { // 輸出打印 console.log(typeof chunck); console.log(chunck); process.nextTick(next) }; objectModeWritable.write('Happy Chinese Year'); objectModeWritable.write( { year : 2017 } ); objectModeWritable.end( 2017 );
執行結果:
4. Stream的兼容問題
在前文我們介紹了 classic streams,它屬于陳舊版本的 Node 上的 Stream 接口,可以把它稱為 Streams1。而從 Node 0.10 開始,Stream 新增了系列實用的新接口,可以做更多除了 .pipe() 之外的事情,我們把其歸類為 Streams2(事實上,在 Node 0.11+開始,Stream有些許新的變動,從該版本開始的 Stream 也可稱為 Streams3)。
那么這里存在一個問題 —— 那些使用了 Stream1 的項目(特別是 npm 包),想升級使用環境的 Node 版本到 0.10+,會否導致兼容問題呢?
還好 Streams2 雖然改頭換面,但本質上是設計為向后兼容的。
打個比方,如果你同時推送了一條 Streams2 流和一條舊格式的、基于事件發射器的流,Stream2 將降級為舊模式(shim mode)來向后兼容。
但是,如果我們的開發環境使用的是 Node 0.8(且因為某些原因不能升級),但又想使用 Streams2 的API怎么辦呢?或者比如 npm 上的某些開源的工具包,想要擁抱 Streams2 的便利,又想保持對使用 Node 0.8 的用戶進行兼容處理,這樣又得怎么處理?
針對上述問題,早在 Node 0.10 釋放之前,Issacs 就把 Node-core 中操作 Stream 的核心接口獨立拷貝了一份出來,開源到了 npm 上并持續更新,它就是 readable-stream。
通過使用 readable-stream,我們就可以在那些核心里沒有 Streams2/3 的低版本 Node 中,直接使用 Streams2/3:
var Readable = require('stream').Readable || require('readable-stream').Readable
readable-stream 現在有 v1.0.x 和 v1.1.x 兩個主要版本,前者跟進 Streams2 的迭代,后者跟進 Streams3 的迭代,用戶可以根據需求使用對應版本的包。
5. through2
readable-stream 雖然提供了一個 Streams 的兼容方案,但我們也希望能對 Stream 復雜的API進行精簡。
而 through2 便基于 readable-stream 對 Stream 接口進行了封裝,并提供了更簡單和靈活的方法。
through2 會為你生成 Transform Streams(貌似舊版本是 Duplex Streams)來處理任意你想使用的流 —— 如前文介紹,相比其它流,Transform 流處理起數據會更加靈活方便。
來看下 through2 的示例:
//demo9 const fs = require('fs'); const through2 = require('through2'); fs.createReadStream('data.txt') .pipe(through2(function (chunk, enc, callback) { for (var i = 0; i < chunk.length; i++) if (chunk[i] == 97) chunk[i] = 122; // 把 'a' 替換為 'z' this.push(chunk); callback() })) .pipe(fs.createWriteStream('out.txt')) .on('finish', ()=> { console.log('DONE') });
使用 through2.obj 接口操作 Object Mode 下的流:
//demo10 const fs = require('fs'); const through2 = require('through2'); const csv2 = require('csv2'); let all = []; fs.createReadStream('list.csv') .pipe(csv2()) // through2.obj(fn) 是 through2({ objectMode: true }, fn) 的簡易封裝 .pipe(through2.obj(function (chunk, enc, callback) { var data = { name: chunk[0], sex: chunk[1], addr: chunk[2] }; this.push(data); callback() })) .on('data', function (data) { all.push(data) }) .on('end', function () { console.log(all) });
對比原生的 Stream API,through2 簡潔了不少,加上有 readable-stream 依賴加持,也很好理解為何像 gulp 及其插件都會使用 through2 來操作和處理 stream 了。
順便貼下對 through2 的源碼注釋:

var Transform = require('readable-stream/transform'), inherits = require('util').inherits, xtend = require('xtend'); //構造方法,繼承了Transform function DestroyableTransform(opts) { Transform.call(this, opts); this._destroyed = false } inherits(DestroyableTransform, Transform); //原型接口 destroy,用于關閉當前流 DestroyableTransform.prototype.destroy = function (err) { if (this._destroyed) return; this._destroyed = true; var self = this; process.nextTick(function () { if (err) self.emit('error', err); self.emit('close') }) }; // a noop _transform function function noop(chunk, enc, callback) { callback(null, chunk) } // 閉包,用于返回對外接口方法 function through2(construct) { //最終返回此匿名函數 return function (options, transform, flush) { if (typeof options == 'function') { flush = transform transform = options options = {} } if (typeof transform != 'function') transform = noop if (typeof flush != 'function') flush = null return construct(options, transform, flush) } } // 出口,執行 throuh2 閉包函數,返回一個 DestroyableTransform 的實例(t2) module.exports = through2(function (options, transform, flush) { //t2 為 Transform Stream 對象 var t2 = new DestroyableTransform(options); //Transform Streams 的內置接口 _transform(chunk, encoding, next) 方法 t2._transform = transform; if (flush) t2._flush = flush; return t2 }); // 對外暴露一個可以直接 new (或者不加 new)來創建實例的的構造函數 module.exports.ctor = through2(function (options, transform, flush) { function Through2(override) { if (!(this instanceof Through2)) return new Through2(override) this.options = xtend(options, override) DestroyableTransform.call(this, this.options) } inherits(Through2, DestroyableTransform) Through2.prototype._transform = transform if (flush) Through2.prototype._flush = flush return Through2 }) //Object Mode接口的簡易封裝 module.exports.obj = through2(function (options, transform, flush) { var t2 = new DestroyableTransform(xtend({objectMode: true, highWaterMark: 16}, options)) t2._transform = transform if (flush) t2._flush = flush return t2 })
以上是本文對 Stream 的一個介紹,但事實上 Stream 還有許多未露面的 API,感興趣的同學可以直接閱讀官方 API文檔做進一步了解。
本篇文章是對后續 gulp 源碼解析系列的一個基礎鋪墊,想了解更多 gulp 相關內容的話可以留意我的博客。最后恭祝大家雞年大吉!共勉~
Reference
⑴ Stream API Doc - https://nodejs.org/api/stream.html
⑵ stream-handbook - https://github.com/substack/stream-handbook
⑶ Node.js Stream - 基礎篇 - http://www.cnblogs.com/zapple/p/5759670.html
⑷ Why I don't use Node's core 'stream' module - https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html
文章列表