前面的話
當內存中無法一次裝下需要處理的數據時,或者一邊讀取一邊處理更加高效時,我們就需要用到數據流。NodeJS中通過各種Stream來提供對數據流的操作。本文將詳細說明NodeJS中的流stream
概述
流(stream)在Nodejs中是處理流數據的抽象接口。stream模塊提供了基礎的API 。使用這些API可以很容易地來構建實現流接口的對象。Nodejs提供了多種流對象。 例如,HTTP請求和process.stdout都是流的實例
流可以是可讀的、可寫的,或是可讀寫的。所有的流都是 EventEmitter 的實例。
盡管所有的 Node.js 用戶都應該理解流的工作方式,這點很重要, 但是 stream 模塊本身只對于那些需要創建新的流的實例的開發者最有用處。 對于主要是消費流的開發者來說,他們很少(如果有的話)需要直接使用 stream 模塊
【類型】
Node.js 中有四種基本的流類型:
Readable - 可讀的流 (例如 fs.createReadStream()). Writable - 可寫的流 (例如 fs.createWriteStream()). Duplex - 可讀寫的流 (例如 net.Socket). Transform - 在讀寫過程中可以修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).
所有使用 Node.js API 創建的流對象都只能操作 strings 和 Buffer(或 Uint8Array) 對象。但是,通過一些第三方流的實現,依然能夠處理其它類型的 JavaScript 值 (除了 null,它在流處理中有特殊意義)。 這些流被認為是工作在 “對象模式”(object mode)
在創建流的實例時,可以通過 objectMode 選項使流的實例切換到對象模式。試圖將已經存在的流切換到對象模式是不安全的
【緩沖】
Writable和Readable流都會將數據存儲到內部的緩存(buffer)中。這些緩存可以通過相應的writable._writableState.getBuffer()或readable._readableState.buffer來獲取
緩存的大小取決于傳遞給流構造函數的highWaterMark選項。 對于普通的流,highWaterMark選項指定了總共的字節數。對于工作在對象模式的流,highWaterMark指定了對象的總數
當可讀流的實現調用stream.push(chunk)方法時,數據被放到緩存中。如果流的消費者沒有調用stream.read()方法, 這些數據會始終存在于內部隊列中,直到被消費
當內部可讀緩存的大小達到highWaterMark指定的閾值時,流會暫停從底層資源讀取數據,直到當前緩存的數據被消費(也就是說,流會在內部停止調用readable._read()來填充可讀緩存)
可寫流通過反復調用writable.write(chunk)方法將數據放到緩存。當內部可寫緩存的總大小小于highWaterMark指定的閾值時,調用writable.write()將返true。 一旦內部緩存的大小達到或超過highWaterMark,調用writable.write()將返回false
stream API 的關鍵目標, 尤其對于 stream.pipe() 方法, 就是限制緩存數據大小,以達到可接受的程度。這樣,對于讀寫速度不匹配的源頭和目標,就不會超出可用的內存大小。
Duplex和Transform都是可讀寫的。 在內部,它們都維護了兩 相互獨立的緩存用于讀和寫。 在維持了合理高效的數據流的同時,也使得對于讀和寫可以獨立進行而互不影響。 例如, net.Socket就是Duplex的實例,它的可讀端可以消費從套接字(socket)中接收的數據, 可寫端則可以將數據寫入到套接字。 由于數據寫入到套接字中的速度可能比從套接字接收數據的速度快或者慢, 在讀寫兩端使用獨立緩存,并進行獨立操作就顯得很重要了
幾乎所有的 Node.js 應用,不管多么簡單,都在某種程度上使用了流。 下面是在 Node.js 應用中使用流實現的一個簡單的 HTTP 服務器
var http = require('http'); var server = http.createServer((req, res) => { // req 是一個 Readable Stream;res 是一個 Writable Stream var body = ''; req.setEncoding('utf8'); req.on('data', (chunk) => { body += chunk; }); req.on('end', () => { try { var data = JSON.parse(body); res.write(typeof data); res.end(); } catch (er) { res.statusCode = 400; return res.end(`error: ${er.message}`); } }); }); server.listen(1337);
Writable 流 (比如例子中的 res) 暴露了一些方法,比如 write() 和 end() 。這些方法可以將數據寫入到流中。當流中的數據可以讀取時,Readable 流使用 EventEmitter API 來通知應用。 這些數據可以使用多種方法從流中讀取。Writable 和 Readable 流都使用了 EventEmitter API ,通過多種方式, 與流的當前狀態進行交互。Duplex 和 Transform 都是同時滿足 Writable 和 Readable 。對于只是簡單寫入數據到流和從流中消費數據的應用來說, 不要求直接實現流接口,通常也不需要調用 require('stream')
可寫流
可寫流是對數據流向設備的抽象,用來消費上游流過來的數據,通過可寫流程序可以把數據寫入設備,常見的是本地磁盤文件或者 TCP、HTTP 等網絡響應
process.stdin.pipe(process.stdout);
process.stdout是一個可寫流,程序把可讀流 process.stdin 傳過來的數據寫入的標準輸出設備
Writable(可寫流)包括:
HTTP requests, on the client
HTTP responses, on the server
fs write streams
[zlib streams][zlib]
crypto streams
TCP sockets
child process stdin
process.stdout, process.stderr
[注意]上面的某些例子事實上是 Duplex 流,只是實現了 Writable 接口
所有 Writable 流都實現了 stream.Writable 類定義的接口。盡管特定的 Writable 流的實現可能略有差別, 所有的 Writable streams 都可以按一種基本模式進行使用
var myStream = getWritableStreamSomehow(); myStream.write('some data'); myStream.write('some more data'); myStream.end('done writing data');
【'close' 事件】
'close'事件將在流或其底層資源(比如一個文件)關閉后觸發。'close'事件觸發后,該流將不會再觸發任何事件
[注意]不是所有可寫流都會觸發 'close' 事件
【'drain' 事件】
如果調用 stream.write(chunk) 方法返回 false,流將在適當的時機觸發 'drain' 事件,這時才可以繼續向流中寫入數據
// 向可寫流中寫入數據一百萬次。 // 需要注意背壓(back-pressure) function writeOneMillionTimes(writer, data, encoding, callback) { let i = 1000000; write(); function write() { let ok = true; do { i--; if (i === 0) { // 最后 一次 writer.write(data, encoding, callback); } else { // 檢查是否可以繼續寫入。 // 這里不要傳遞 callback, 因為寫入還沒有結束! ok = writer.write(data, encoding); } } while (i > 0 && ok); if (i > 0) { // 這里提前停下了, // 'drain' 事件觸發后才可以繼續寫入 writer.once('drain', write); } } }
【'error' 事件】
'error' 事件在寫入數據出錯或者使用管道出錯時觸發。事件發生時,回調函數僅會接收到一個 Error 參數
[注意]'error' 事件發生時,流并不會關閉
【'finish' 事件】
在調用了 stream.end() 方法,且緩沖區數據都已經傳給底層系統(underlying system)之后, 'finish' 事件將被觸發
const writer = getWritableStreamSomehow(); for (let i = 0; i < 100; i++) { writer.write(`hello, #${i}!\n`); } writer.end('This is the end\n'); writer.on('finish', () => { console.error('All writes are now complete.'); });
【'pipe' 事件】
src <stream.Readable> 輸出到目標可寫流(writable)的源流(source stream)
在可讀流(readable stream)上調用 stream.pipe() 方法,并在目標流向 (destinations) 中添加當前可寫流 ( writable ) 時,將會在可寫流上觸發 'pipe' 事件
const writer = getWritableStreamSomehow(); const reader = getReadableStreamSomehow(); writer.on('pipe', (src) => { console.error('something is piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer);
【'unpipe' 事件】
src <Readable Stream> unpiped 當前可寫流的源流
在 Readable 上調用 stream.unpipe() 方法,從目標流向中移除當前 Writable 時,將會觸發 'unpipe' 事件
const writer = getWritableStreamSomehow(); const reader = getReadableStreamSomehow(); writer.on('unpipe', (src) => { console.error('Something has stopped piping into the writer.'); assert.equal(src, reader); }); reader.pipe(writer); reader.unpipe(writer);
【writable.cork()】
調用 writable.cork() 方法將強制所有寫入數據都內存中的緩沖區里。 直到調用 stream.uncork() 或 stream.end() 方法時,緩沖區里的數據才會被輸出
在向流中寫入大量小塊數據(small chunks of data)時,內部緩沖區(internal buffer)可能失效,從而導致性能下降。writable.cork() 方法主要就是用來避免這種情況。 對于這種情況, 實現了 writable._writev() 方法的流可以對寫入的數據進行緩沖,從而提高寫入效率
【writable.end([chunk][, encoding][, callback])】
chunk <string> | <Buffer> | <Uint8Array> | <any>
chunk <string> | <Buffer> | <Uint8Array> | <any> 可選的,需要寫入的數據。對于非對象模式下的流, chunk 必須是字符串、或 Buffer、或 Uint8Array。對于對象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null。 encoding <string> 如果 chunk 是字符串,這里指定字符編碼。 callback <Function> 可選的,流結束時的回調函數
調用 writable.end() 方法表明接下來沒有數據要被寫入 Writable。通過傳入可選的 chunk 和 encoding 參數,可以在關閉流之前再寫入一段數據。如果傳入了可選的 callback 函數,它將作為 'finish' 事件的回調函數。
[注意]在調用了 stream.end() 方法之后,再調用 stream.write() 方法將會導致錯誤
// 寫入 'hello, ' ,并用 'world!' 來結束寫入 const file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); // 后面不允許再寫入數據!
【writable.setDefaultEncoding(encoding)】
encoding <string> 新的默認編碼 返回: this
writable.setDefaultEncoding() 用于為 Writable 設置 encoding
【writable.uncork()】
writable.uncork() 將輸出在 stream.cork() 方法被調用之后緩沖在內存中的所有數據
如果使用 writable.cork() 和 writable.uncork() 來管理寫入緩存,建議使用 process.nextTick() 來延遲調用 writable.uncork() 方法。通過這種方式,可以對單個 Node.js 事件循環中調用的所有 writable.write() 方法進行批處理
stream.cork(); stream.write('some '); stream.write('data '); process.nextTick(() => stream.uncork());
如果一個流多次調用了 writable.cork() 方法,那么也必須調用同樣次數的 writable.uncork() 方法以輸出緩沖區數據
stream.cork(); stream.write('some '); stream.cork(); stream.write('data '); process.nextTick(() => { stream.uncork(); // 之前的數據只有在 uncork() 被二次調用后才會輸出 stream.uncork(); });
【writable.write(chunk[, encoding][, callback])】
chunk <string> | <Buffer> | <Uint8Array> | <any> 要寫入的數據。可選的。 For streams not operating in object mode, chunk must be a string, Buffer or Uint8Array. For object mode streams, chunk may be any JavaScript value other than null. encoding <string> 如果 chunk 是字符串,這里指定字符編碼 callback <Function> 緩沖數據輸出時的回調函數 返回: <boolean> 如果流需要等待 'drain' 事件觸發才能繼續寫入數據,這里將返回 false ; 否則返回 true。
writable.write() 方法向流中寫入數據,并在數據處理完成后調用 callback 。如果有錯誤發生, callback不一定會接收到這個錯誤作為第一個參數。要確保可靠地檢測到寫入錯誤,應該監聽 'error' 事件。
在確認了 chunk 后,如果內部緩沖區的大小小于創建流時設定的 highWaterMark 閾值,函數將返回 true 。 如果返回值為 false ,應該停止向流中寫入數據,直到 'drain' 事件被觸發。
當一個流不處在 drain 的狀態, 對 write() 的調用會緩存數據塊, 并且返回 false。 一旦所有當前所有緩存的數據塊都排空了(被操作系統接受來進行輸出), 那么 'drain' 事件就會被觸發。 我們建議, 一旦 write() 返回 false, 在 'drain' 事件觸發前, 不能寫入任何數據塊。 然而,當流不處在 'drain' 狀態時, 調用 write() 是被允許的, Node.js 會緩存所有已經寫入的數據塊, 直到達到最大內存占用, 這時它會無條件中止。 甚至在它中止之前, 高內存占用將會導致差的垃圾回收器的性能和高的系統相對敏感性 (即使內存不在需要,也通常不會被釋放回系統)。 如果遠程的另一端沒有讀取數據, TCP sockets 可能永遠也不會 drain , 所以寫入到一個不會drain的socket可能會導致遠程可利用的漏洞。
對于一個 Transform, 寫入數據到一個不會drain的流尤其成問題, 因為 Transform 流默認被暫停, 直到它們被pipe或者被添加了 'data' 或 'readable' event handler。
如果將要被寫入的數據可以根據需要生成或者取得,我們建議將邏輯封裝為一個 Readable 流并且使用 stream.pipe()。 但是如果調用 write() 優先, 那么可以使用 'drain' 事件來防止回壓并且避免內存問題:
function write(data, cb) { if (!stream.write(data)) { stream.once('drain', cb); } else { process.nextTick(cb); } } // Wait for cb to be called before doing any other write. write('hello', () => { console.log('write completed, do more writes now'); });
[注意]對象模式的寫入流將忽略 encoding 參數
【writable.destroy([error])】
銷毀流,并釋放已傳遞的錯誤。在這之后,可寫的流已經結束了。實現者不應該覆蓋此方法,而是實現writable._destroy
可讀流
可讀流(Readable streams)是對提供數據的源頭(source)的抽象,是生產數據用來供程序消費的流。我們常見的數據生產方式有讀取磁盤文件、讀取網絡請求內容等
const rs = fs.createReadStream(filePath);
rs就是一個可讀流,其生產數據的方式是讀取磁盤的文件,我們常見的控制臺process.stdin也是一個可讀流
process.stdin.pipe(process.stdout);
通過簡單的一句話可以把控制臺的輸入打印出來,process.stdin 生產數據的方式是讀取用戶在控制臺的輸入
可讀流的例子包括:
HTTP responses, on the client
HTTP requests, on the server
fs read streams
[zlib streams][zlib]
crypto streams
TCP sockets
child process stdout and stderr
process.stdin
[注意]所有的 Readable 都實現了 stream.Readable 類定義的接口
【兩種模式】
可讀流事實上工作在下面兩種模式之一:flowing 和 paused 。
在flowing模式下,可讀流自動從系統底層讀取數據,并通過EventEmitter接口的事件盡快將數據提供給應用
在paused模式下,必須顯式調用 stream.read() 方法來從流中讀取數據片段。
所有初始工作模式為 paused 的 Readable 流,可以通過下面三種途徑切換到 flowing 模式:
監聽 'data' 事件。
調用 stream.resume() 方法。
調用 stream.pipe() 方法將數據發送到 Writable。
可讀流可以通過下面途徑切換到 paused 模式:
如果不存在管道目標(pipe destination),可以通過調用 stream.pause() 方法實現。
如果存在管道目標,可以通過取消 'data' 事件監聽,并調用 stream.unpipe() 方法移除所有管道目標來實現。
可讀流需要先為其提供消費或忽略數據的機制,才能開始提供數據。如果消費機制被禁用或取消,可讀流將嘗試停止生成數據。
為了向后兼容,取消 'data' 事件監聽并不會自動將流暫停。同時,如果存在管道目標(pipe destination),且目標狀態變為可以接收數據(drain and ask for more data),調用了 stream.pause() 方法也并不保證流會一直 保持 暫停狀態。
如果 Readable 切換到 flowing 模式,且沒有消費者處理流中的數據,這些數據將會丟失。比如,調用了 readable.resume() 方法卻沒有監聽 'data' 事件,或是取消了 'data' 事件監聽,就有可能出現這種情況
【三種狀態】
可讀流的“兩種操作模式”是一種簡單抽象。它抽象了在可讀流實現(Readable stream implementation)內部發生的復雜的狀態管理過程。
在任意時刻,任意可讀流應確切處于下面三種狀態之一:
readable._readableState.flowing = null readable._readableState.flowing = false readable._readableState.flowing = true
若 readable._readableState.flowing 為 null,由于不存在數據消費者,可讀流將不會產生數據。
如果監聽 'data' 事件,調用 readable.pipe() 方法,或者調用 readable.resume() 方法, readable._readableState.flowing 的值將會變為 true 。這時,隨著數據生成,可讀流開始頻繁觸發事件。
調用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背壓”(back pressure), 將導致 readable._readableState.flowing 值變為 false。 這將暫停事件流,但 不會 暫停數據生成。
當 readable._readableState.flowing 值為 false 時, 數據可能堆積到流的內部緩存中
可讀流 API 的演化貫穿了多個 Node.js 版本,提供了多種方法來消費流數據。通常開發者應該選擇其中一種來消費數據,而不應該在單個流使用多種方法來消費數據
對于大多數用戶,建議使用readable.pipe()方法來消費流數據,因為它是最簡單的一種實現。開發者如果要精細地控制數據傳遞和產生的過程,可以使用EventEmitter 和 readable.pause()/readable.resume() 提供的 API
【'close' 事件】
'close'事件將在流或其底層資源(比如一個文件)關閉后觸發。'close'事件觸發后,該流將不會再觸發任何事件
[注意]不是所有 Readable 都會觸發 'close' 事件
【'data' 事件】
chunk <Buffer> | <string> | <any> 數據片段。對于非對象模式的可讀流,這是一個字符串或者 Buffer。 對于對象模式的可讀流,這可以是除 null 以外的任意類型 JavaScript 值。
'data' 事件會在流將數據傳遞給消費者時觸發。當流轉換到 flowing 模式時會觸發該事件。調用 readable.pipe(), readable.resume() 方法,或為 'data' 事件添加回調可以將流轉換到 flowing 模式。 'data' 事件也會在調用 readable.read() 方法并有數據返回時觸發。
在沒有明確暫停的流上添加'data'事件監聽會將流轉換為flowing模式。數據會在可用時盡快傳遞給下個流程
如果調用 readable.setEncoding() 方法明確為流指定了默認編碼,回調函數將接收到一個字符串,否則接收到的數據將是一個 Buffer 實例
const readable = getReadableStreamSomehow(); readable.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes of data.`); });
【'end' 事件】
'end' 事件將在流中再沒有數據可供消費時觸發。
[注意]'end' 事件只有在數據被完全消費后才會觸發 。 可以在數據被完全消費后,通過將流轉換到 flowing 模式, 或反復調用 stream.read() 方法來實現這一點
const readable = getReadableStreamSomehow(); readable.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes of data.`); }); readable.on('end', () => { console.log('There will be no more data.'); });
【'error' 事件】
'error' 事件可以在任何時候在可讀流實現(Readable implementation)上觸發。 通常,這會在底層系統內部出錯從而不能產生數據,或當流的實現試圖傳遞錯誤數據時發生。
回調函數將接收到一個 Error 對象
【'readable' 事件】
'readable' 事件將在流中有數據可供讀取時觸發。在某些情況下,為 'readable' 事件添加回調將會導致一些數據被讀取到內部緩存中
const readable = getReadableStreamSomehow(); readable.on('readable', () => { // 有一些數據可讀了 });
當到達流數據尾部時, 'readable' 事件也會觸發。觸發順序在 'end' 事件之前。
事實上, 'readable' 事件表明流有了新的動態:要么是有了新的數據,要么是到了流的尾部。 對于前者, stream.read() 將返回可用的數據。而對于后者, stream.read() 將返回 null。 例如,下面的例子中的 foo.txt 是一個空文件:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt'); rr.on('readable', () => { console.log('readable:', rr.read()); }); rr.on('end', () => { console.log('end'); });
[注意]通常情況下, 應該使用 readable.pipe() 方法和 'data' 事件機制,而不是 'readable' 事件
【readable.isPaused()】
返回: <boolean>
readable.isPaused() 方法返回可讀流的當前操作狀態。 該方法主要是在 readable.pipe() 方法的底層機制中用到。大多數情況下,沒有必要直接使用該方法
const readable = new stream.Readable(); readable.isPaused(); // === false readable.pause(); readable.isPaused(); // === true readable.resume(); readable.isPaused(); // === false
【readable.pause()】
返回: this
readable.pause() 方法將會使 flowing 模式的流停止觸發 'data' 事件, 進而切出 flowing 模式。任何可用的數據都將保存在內部緩存中
const readable = getReadableStreamSomehow(); readable.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes of data.`); readable.pause(); console.log('There will be no additional data for 1 second.'); setTimeout(() => { console.log('Now data will start flowing again.'); readable.resume(); }, 1000); });
【readable.pipe(destination[, options])】
destination <stream.Writable> 數據寫入目標 options <Object> Pipe 選項 end <boolean> 在 reader 結束時結束 writer 。默認為 true。
readable.pipe() 綁定一個 Writable 到 readable 上, 將可寫流自動切換到 flowing 模式并將所有數據傳給綁定的 Writable。數據流將被自動管理。這樣,即使是可讀流較快,目標可寫流也不會超負荷(overwhelmed)。
下面例子將 readable 中的所有數據通過管道傳遞給名為 file.txt 的文件
const readable = getReadableStreamSomehow(); const writable = fs.createWriteStream('file.txt'); // readable 中的所有數據都傳給了 'file.txt' readable.pipe(writable);
可以在單個可讀流上綁定多個可寫流。
readable.pipe() 方法返回目標流的引用,這樣就可以對流進行鏈式地管道操作:
const r = fs.createReadStream('file.txt'); const z = zlib.createGzip(); const w = fs.createWriteStream('file.txt.gz'); r.pipe(z).pipe(w);
默認情況下,當源可讀流(the source Readable stream)觸發'end'事件時,目標流也會調用stream.end()方法從而結束寫入。要禁用這一默認行為, end選項應該指定為false,這將使目標流保持打開, 如下所示:
reader.pipe(writer, { end: false }); reader.on('end', () => { writer.end('Goodbye\n'); });
如果可讀流在處理時發生錯誤,目標可寫流不會自動關閉。 如果發生錯誤,需要手動關閉所有流以避免內存泄漏。
[注意]不管對 process.stderr 和 process.stdout 指定什么選項,它們都是直到 Node.js 進程退出才關閉
【readable.read([size])】
size <number> Optional argument to specify how much data to read. Return <string> | <Buffer> | <null>
readable.read()方法從內部緩沖區中抽出并返回一些數據。 如果沒有可讀的數據,返回null。readable.read()方法默認數據將作為“Buffer”對象返回 ,除非已經使用readable.setEncoding()方法設置編碼或流運行在對象模式。
可選的size參數指定要讀取的特定數量的字節。如果size字節不可讀,將返回null除非流已經結束,在這種情況下所有保留在內部緩沖區的數據將被返回(即使它超過size 字節 )
如果沒有指定size參數,則內部緩沖區包含的所有數據將返回。
readable.read()方法只應該在暫停模式下的可讀流上運行。在流模式下,readable.read()自動調用直到內部緩沖區的數據完全耗盡
const readable = getReadableStreamSomehow(); readable.on('readable', () => { let chunk; while (null !== (chunk = readable.read())) { console.log(`Received ${chunk.length} bytes of data.`); } });
一般來說,避免使用'readable'事件和readable.read()方法,使用readable.pipe()或'data'事件代替
無論size參數的值是什么,對象模式中的可讀流將始終返回調用readable.read(size)的單個項目。
[注意]如果readable.read()方法返回一個數據塊,那么一個'data'事件也將被發送。在已經被發出的'end'事件后調用stream.read([size])事件將返回null。不會拋出運行時錯誤
【readable.resume()】
Returns: this
readable.resume()方法使一個顯式暫停的可讀流恢復發出“數據”事件,將流轉換為流模式。
readable. resume()方法可用于從流中完全地使用數據,而不需要實際處理任何數據,如以下示例所示:
getReadableStreamSomehow() .resume() .on('end', () => { console.log('Reached the end, but did not read anything.'); });
【readable.setEncoding(encoding)】
encoding <string> 要使用的編碼 Returns: this
readble.setEncoding() 方法會為從可讀流讀入的數據設置字符編碼
By default, no encoding is assigned and stream data will be returned as Buffer objects. 設置編碼會使得該流數據返回指定編碼的字符串而不是Buffer對象。例如,調用readable.setEncoding('utf-8')會使得輸出數據作為UTF-8數據解析,并作為字符串返回。調用readable.setEncoding('hex')使得數據被編碼成16進制字符串格式。
可讀流會妥善處理多字節字符,如果僅僅直接從流中取出Buffer對象,很可能會導致錯誤解碼
const readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', (chunk) => { assert.equal(typeof chunk, 'string'); console.log('got %d characters of string data', chunk.length); });
【readable.unpipe([destination])】
destination <stream.Writable> 可選的特定流到unpipe
unpipe()方法通過使用stream. pipe()方法來分離之前附加的可寫流。
如果沒有指定目的地,則所有管道都是獨立的。如果指定了目的地,但是沒有設置管道,則什么都不做
const readable = getReadableStreamSomehow(); const writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second readable.pipe(writable); setTimeout(() => { console.log('Stop writing to file.txt'); readable.unpipe(writable); console.log('Manually close the file stream'); writable.end(); }, 1000);
【readable.unshift(chunk)】
chunk <Buffer> | <Uint8Array> | <string> | <any> 將數據塊移到讀隊列上。對于不以對象模式操作的流,塊必須是字符串、緩沖區或Uint8Array。對于對象模式流,塊可能是除了null之外的任何JavaScript值。
unshift()方法將數據塊返回到內部緩沖區中。這在某些情況下是有用的,因為在某些情況下,流被需要“不消耗”一些數據的代碼所消耗,而這些數據是樂觀地從源代碼中提取出來的,這樣數據就可以傳遞給其他的一方。
[注意]在“end”事件發出或將拋出運行時錯誤之后,不能調用流。使用stream. unshift()的開發人員通常應該考慮改用轉換流
// Pull off a header delimited by \n\n // use unshift() if we get too much // Call the callback with (error, header, stream) const StringDecoder = require('string_decoder').StringDecoder; function parseHeader(stream, callback) { stream.on('error', callback); stream.on('readable', onReadable); const decoder = new StringDecoder('utf8'); let header = ''; function onReadable() { let chunk; while (null !== (chunk = stream.read())) { const str = decoder.write(chunk); if (str.match(/\n\n/)) { // found the header boundary const split = str.split(/\n\n/); header += split.shift(); const remaining = split.join('\n\n'); const buf = Buffer.from(remaining, 'utf8'); stream.removeListener('error', callback); // remove the readable listener before unshifting stream.removeListener('readable', onReadable); if (buf.length) stream.unshift(buf); // now the body of the message can be read from the stream. callback(null, header, stream); } else { // still reading the header. header += str; } } } }
【readable.destroy([error])】
銷毀流,并發出“錯誤”。調用后,可讀流將釋放任何內部資源。實現者不應該覆蓋此方法,而是實現readable._destroy
讀寫流
讀寫流又叫雙工流,就是同時實現了 Readable 和 Writable 的流,即可以作為上游生產數據,又可以作為下游消費數據,這樣可以處于數據流動管道的中間部分
rs.pipe(rws1).pipe(rws2).pipe(rws3).pipe(ws);
在 NodeJS 中雙工流常用的有兩種:Duplex和Transform
【stream.Duplex】
雙工流(Duplex streams)是同時實現了 Readable and Writable 接口
const Duplex = require('stream').Duplex; const myDuplex = new Duplex({ read(size) { // ... }, write(chunk, encoding, callback) { // ... } });
Duplex 實例內同時包含可讀流和可寫流,在實例化 Duplex 類的時候可以傳遞幾個參數
readableObjectMode : 可讀流是否設置為 ObjectMode,默認 false writableObjectMode : 可寫流是否設置為 ObjectMode,默認 false allowHalfOpen : 默認 true, 設置成 false 的話,當寫入端結束的時,流會自動的結束讀取端,反之亦然。
雙工流(Duplex streams) 的例子包括:
tcp sockets
zlib streams
crypto streams
轉換流
【stream.Transform】
轉換流(Transform streams) 是雙工 Duplex 流,它的輸出是從輸入計算得來。 它實現了Readable 和 Writable 接口
transform.prototype._transform = function (data, encoding, callback) { this.push(data); callback(); }; transform.prototype._transform = function (data, encoding, callback) { callback(null, data); };
Transform 同樣是雙工流,看起來和 Duplex 重復了,但兩者有一個重要的區別:Duplex 雖然同時具備可讀流和可寫流,但兩者是相對獨立的;Transform 的可讀流的數據會經過一定的處理過程自動進入可寫流。
雖然會從可讀流進入可寫流,但并不意味這兩者的數據量相同,上面說的一定的處理邏輯會決定如果 tranform 可讀流,然后放入可寫流,transform 原義即為轉變,很貼切的描述了 Transform 流作用。
我們最常見的壓縮、解壓縮用的 zlib 即為 Transform 流,壓縮、解壓前后的數據量明顯不同,而流的作用就是輸入一個 zip 包,輸入一個解壓文件或反過來。我們平時用的大部分雙工流都是 Transform。
轉換流(Transform streams) 的例子包括:
zlib streams
crypto streams
【socket】
net 模塊可以用來創建 socket,socket 在 NodeJS 中是一個典型的 Duplex
var net = require('net'); //創建客戶端 var client = net.connect({port: 1234}, function() { console.log('已連接到服務器'); client.write('Hi!'); }); //data事件監聽。收到數據后,斷開連接 client.on('data', function(data) { console.log(data.toString()); client.end(); }); //end事件監聽,斷開連接時會被觸發 client.on('end', function() { console.log('已與服務器斷開連接'); });
可以看到 client 就是一個 Duplex,可寫流用于向服務器發送消息,可讀流用于接受服務器消息,兩個流內的數據并沒有直接的關系
【gulp】
gulp 非常擅長處理代碼本地構建流程
gulp.src('client/templates/*.jade')
.pipe(jade())
.pipe(minify())
.pipe(gulp.dest('build/minified_templates'));
其中 jada() 和 minify() 就是典型的 Transform,處理流程大概是
.jade 模板文件 -> jade() -> html 文件 -> minify -> 壓縮后的 html
可以看出,jade() 和 minify() 都是對輸入數據做了些特殊處理,然后交給了輸出數據。
在平時使用的時候,當一個流同時面向生產者和消費者服務的時候我們會選擇 Duplex,當只是對數據做一些轉換工作的時候我們便會選擇使用Tranform
用途
寫程序需要讀取某個配置文件 config.json,這時候簡單分析一下
數據:config.json 的內容
方向:設備(物理磁盤文件) -> NodeJS 程序
我們應該使用 readable 流來做此事
const fs = require('fs'); const FILEPATH = '...'; const rs = fs.createReadStream(FILEPATH);
通過 fs 模塊提供的 createReadStream() 方法我們輕松的創建了一個可讀的流,這時候 config.json 的內容從設備流向程序。我們并沒有直接使用 Stream 模塊,因為 fs 內部已經引用了 Stream 模塊,并做了封裝。
有了數據后我們需要處理,比如需要寫到某個路徑 DEST ,這時候我們遍需要一個 writable 的流,讓數據從程序流向設備
const ws = fs.createWriteStream(DEST);
兩種流都有了,也就是兩個數據加工器,那么我們如何通過類似 Unix 的管道符號 | 來鏈接流呢?在 NodeJS 中管道符號就是 pipe() 方法。
const fs = require('fs'); const FILEPATH = '...'; const rs = fs.createReadStream(FILEPATH); const ws = fs.createWriteStream(DEST); rs.pipe(ws);
這樣我們利用流實現了簡單的文件復制功能,有個值得注意的地方是,數據必須是從上游 pipe 到下游,也就是從一個 readable 流 pipe 到 writable 流
如果有個需求,把本地一個 package.json 文件中的所有字母都改為小寫,并保存到同目錄下的 package-lower.json 文件下
這時候我們就需要用到雙向的流了,假定我們有一個專門處理字符轉小寫的流 lower,那么代碼寫出來大概是這樣的
const fs = require('fs'); const rs = fs.createReadStream('./package.json'); const ws = fs.createWriteStream('./package-lower.json'); rs.pipe(lower).pipe(ws);
rs -> lower:lower 在下游,所以 lower 需要是個 writable 流
lower -> ws:相對而言,lower 又在上游,所以 lower 需要是個 readable 流
當然如果我們還有額外一些處理動作,比如字母還需要轉成 ASCII 碼
rs.pipe(lower).pipe(acsii).pipe(ws);
同樣 ascii 也必須是雙向的流。這樣處理的邏輯是非常清晰的
有個用戶需要在線看視頻的場景,假定我們通過 HTTP 請求返回給用戶電影內容
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.readFile(moviePath, (err, data) => { res.end(data); }); }).listen(8080);
這樣的代碼有兩個明顯的問題
1、電影文件需要讀完之后才能返回給客戶,等待時間超長
2、電影文件需要一次放入內存中,相似動作多了,內存吃不消
用流可以將電影文件一點點的放入內存中,然后一點點的返回給客戶(利用了 HTTP 協議的 Transfer-Encoding: chunked 分段傳輸特性),用戶體驗得到優化,同時對內存的開銷明顯下降
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.createReadStream(moviePath).pipe(res); }).listen(8080);
除了上述好處,代碼優雅了很多,拓展也比較簡單。比如需要對視頻內容壓縮,我們可以引入一個專門做此事的流,這個流不用關心其它部分做了什么,只要是接入管道中就可以了
const http = require('http'); const fs = require('fs'); const oppressor = require(oppressor); http.createServer((req, res) => { fs.createReadStream(moviePath) .pipe(oppressor) .pipe(res); }).listen(8080);
可以看出來,使用流后,我們的代碼邏輯變得相對獨立,可維護性也會有一定的改善
【文件復制】
下面以流stream來實現文件復制
var fs = require('fs'); var readStream = fs.createReadStream('a.txt'); var writeStream = fs.createWriteStream('aa.txt'); //讀取數據 readStream.on('data',function(chunk){ //如果讀取的數據還在緩存區,還沒有被寫入 if(writeStream.write(chunk) === false){ //停止讀數據 readStream.pause(); } }); //如果數據讀取完成 readStream.on('end',function(chunk){ //停止寫入數據 writeStream.end(); }); //如果緩存區的數據被消耗完 writeStream.on('drain',function(){ //接著讀取數據 readStream.resume(); });
使用pipe()方法進行簡化
var fs = require('fs'); var readStream = fs.createReadStream('a.txt'); var writeStream = fs.createWriteStream('aa.txt'); readStream.pipe(writeStream);
【遠程訪問文件】
var http = require('http'); var fs = require('fs'); http.createServer(function(req,res){ fs.readFile('./a.txt',function(err,data){ if(err){ res.end('file not exist!'); }else{ res.writeHeader(200,{'Context-Type':'text/html'}); res.end(data); } }) }).listen(8000);
如果使用pipe()方法,則簡單很多
var http = require('http'); var fs = require('fs'); http.createServer(function(req,res){ fs.createReadStream('./a.txt').pipe(res); }).listen(8000);
甚至可以加載網上的文件,使用插件request

var http = require('http'); var fs = require('fs'); var request = require('request'); http.createServer(function(req,res){ request('https://www.cnblogs.com/images/logo_small.gif').pipe(res); }).listen(8000);

【自定義輸入輸出】
var stream = require('stream'); var Readable = stream.Readable; var Writable = stream.Writable; var readStream = new Readable(); var writeStream = new Writable(); readStream.push('I '); readStream.push('Love '); readStream.push('NodeJS\n'); readStream.push(null); writeStream._write = function(chunk,encode,cb){ console.log(chunk.toString()); cb(); } //I //Love //NodeJS readStream.pipe(writeStream);
【使用轉換流進行功能定制】
var stream = require('stream'); var util = require('util'); function ReadStream(){ stream.Readable.call(this); } util.inherits(ReadStream,stream.Readable); ReadStream.prototype._read = function(){ this.push('I '); this.push('Love '); this.push('NodeJS\n'); this.push(null); } function WriteStream(){ stream.Writable.call(this); this._cached = Buffer.from(''); } util.inherits(WriteStream,stream.Writable); WriteStream.prototype._write = function(chunk,encode,cb){ console.log(chunk.toString()); cb(); } function TransformStream(){ stream.Transform.call(this); } util.inherits(TransformStream,stream.Transform); TransformStream.prototype._transform = function(chunk,encode,cb){ this.push(chunk); cb(); } TransformStream.prototype._flush = function(cb){ this.push('Oh Yeah!'); cb(); } var readStream = new ReadStream(); var writeStream = new WriteStream(); var transformStream = new TransformStream(); //I //Love //NodeJS // //Oh Yeah! readStream.pipe(transformStream).pipe(writeStream);
文章列表