文章出處

作為前端,我們常常會和 Stream 有著頻繁的接觸。比如使用 gulp 對項目進行構建的時候,我們會使用 gulp.src 接口將匹配到的文件轉為 stream(流)的形式,再通過 .pipe() 接口對其進行鏈式加工處理;

或者比如我們通過 http 模塊創建一個 HTTP 服務:

const http = require('http');
http.createServer( (req, res) => {
  //...
}).listen(3000);

此處的 req 和 res 也屬于 Stream 的消費接口(前者為 Readable Stream,后者為 Writable Stream)

事實上像上述的 req/res,或者 process.stdout 等接口都屬于 Stream 的實例,因此較少存在情況,是需要我們手動引入 Stream 模塊的,例如:

//demo1.js
'use strict';
const Readable = require('stream').Readable;
const rs = Readable();
const s = 'VaJoy';
const l = s.length;
let i = 0;
rs._read = ()=>{
    if(i == l){
        rs.push(' is my name');
        return rs.push(null)
    }
    rs.push(s[i++])
};
rs.pipe(process.stdout);

如果不太能讀懂上述代碼,或者對 Stream 的概念感到模糊,那么可以放輕松,因為本文會進一步地對 Stream 進行剖析,并且談談直接使用它可能會存在的一些問題(這也是為何 gulp 要使用 through2 的原因)

另外本文的示例均可在我的 github 倉庫https://github.com/VaJoy/stream/獲取到,讀者可以自行下載和調試。

一. Stream的作用

在介紹 Stream(流)之前,我們先來看一個例子 —— 模擬服務器把本地某個文件內容吐給客戶端:

//demo2
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(3000);

這段代碼雖然可以正常執行,但存在一個顯著的問題 —— 對于每一個客戶端的請求,fs.readFile 接口都會把整個文件都緩存到內存中去,然后才開始把數據吐給用戶。那么當文件體積很大、請求也較多(且特別當請求來自慢速用戶)的時候,服務器需要消耗很大的內存,導致性能低下。

然而這個問題,則正是 stream 發揮所長的地方。如前文提及的,res 是流對象,那我們正好可以將其利用起來:

var server2 = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server2.listen(4000);

在上方代碼段里,fs.createReadStream 創建了 data.txt 的可讀流(Readable Stream)。這里需要事先了解的是,流可以簡單地分為“可讀的(readable)”、“可寫的(writable)”,或者“讀寫均可”三種類型,且所有的流都屬于 EventEmitter 的實例

回到代碼,對于創建的可讀流,我們通過 .pipe() 接口來監聽其 dataend 事件,并把 data.txt (的可讀流)拆分成一小塊一小塊的數據(chunks),像流水一樣源源不斷地吐給客戶端,而不再需要等待整個文件都加載到內存后才發送數據。

其中 .pipe 可以視為流的“管道/通道”方法,任何類型的流都會有這個 .pipe 方法去成對處理流的輸入與輸出。

為了方便理解,我們把上述兩種方式(不使用流/使用流)處理為如下的情景(臥槽我好好一個前端為啥要P這么萌的圖)

⑴ 不使用流:

 

⑵ 使用流:

由此可以得知,使用流(stream)的形式,可以大大提升響應時間,又能有效減輕服務器內存的壓力。

二. Stream的分類

在上文我們曾提及到,stream 可以按讀寫權限來簡單地分做三類,不過這里我們再細化下,可以把 stream 歸為如下五個類別:

⑴ Readable Streams
⑵ Writable Streams
⑶ Transform Streams
⑷ Duplex Streams
⑸ Classic Streams

其中 Transform Streams 和 Duplex Streams 都屬于即可讀又可寫的流,而最后一個 Classic Streams 是對 Node 古早版本上的 Stream 的一個統稱。我們將照例對其進行逐一介紹。

2.1 Readable Streams

即可讀流,通過 .pipe 接口可以將其數據傳遞給一個 writable、transform 或者 duplex流:

readableStream.pipe(dst)

常見的 Readable Streams 包括:

  • 客戶端上的 HTTP responses
  • 服務端上的 HTTP requests
  • fs read streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • 子進程的 stdout 和 stderr
  • process.stdin

例如在前面 demo2 的代碼段中,我們就使用了 fs.createReadStream 接口來創建了一個 fs read stream:

var server2 = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server2.listen(4000);

這里有個有趣的地方 —— 雖然 Readable Streams 稱為可讀流,但在將其傳入一個消耗對象之前,它都是可寫的:

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('servers ');
rs.push('are listening on\n');
rs.push('3000 and 4000\n');
rs.push(null);

rs.pipe(process.stdout);

執行結果:

在這段代碼中,我們通過 readStream.push(data) 的形式往可讀流里注入數據,并以 readStream.push(null) 來結束可讀流。

不過這種寫法有個弊端 —— 從使用 .push() 將數據注入 readable 流中開始,直到另一個東西(process.stdout)來消耗數據之前,這些數據都會存在緩存中。

這里有個內置接口 ._read()  可以用來處理這個問題,它是從系統底層開始讀取數據流時才會不斷調用自身,從而減少緩存冗余。

我們可以回過頭來看 demo1 的例子:

'use strict';
const Readable = require('stream').Readable;
const rs = Readable();
const s = 'VaJoy';
const l = s.length;
let i = 0;
rs._read = ()=>{
    if(i == l){
        rs.push(' is my name');
        return rs.push(null)
    }
    rs.push(s[i++])
};
rs.pipe(process.stdout);

我們是在 ._read 方法中才使用 readStream.push(data) 往可讀流里注入數據供下游消耗(也會流經緩存),從而提升流處理的性能。

這里也有個小問題 —— 上一句話所提到的“供下游消耗”,這個下游通常又會以怎樣的形式來消耗可讀流的呢?

首先,可以使用我們熟悉的 .pipe() 方法將可讀流推送給一個消耗對象(writable、transform 或者 duplex流)

//ext1
const fs = require('fs');
const zlib = require('zlib');

const r = fs.createReadStream('data.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('data.txt.gz');
r.pipe(z).pipe(w);

其次,也可以通過監聽可讀流的“data”事件(別忘了文章前面提到的“所有的流都屬于 EventEmitter 的實例”)來實現消耗處理 —— 在首次監聽其 data 事件后,readStream 便會持續不斷地調用 _read(),通過觸發 data 事件將數據輸出。當數據全部被消耗時,則觸發 end 事件。

示例:

//demo3
const Readable = require('stream').Readable;

class ToReadable extends Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator
    }
    _read() {
        const res = this.iterator.next();
        if (res.done) {
            // 迭代結束,順便結束可讀流
            this.push(null)
        }
        setTimeout(() => {
            // 將數據添加到流中
            this.push(res.value + '\n')
        }, 0)
    }
}

const gen = function *(a){
    let count = 5,
        res = a;
    while(count--){
        res = res*res;
        yield res
    }
};

const readable = new ToReadable(gen(2));

// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data));

// 可讀流消耗完畢
readable.on('end', () => process.stdout.write('readable stream ends~'));

執行結果為:

這里需要留意的是,在使用 .push() 往可讀流里注入數據的代碼段,我們使用了 setTimeout 將其包裹起來,這是為了讓系統能有足夠時間優先處理接收流結束信號的事務。當然你也可以改寫為:

        if (res.done) {
            // 直接 return
            return this.push(null)
        }
        this.push(res.value + '\n')

2.2 Writable Streams

Writable(可寫)流接口是對寫入數據的目標的抽象:

src.pipe(writableStream)

常見的 Writable Streams 包括:

  • 客戶端的 HTTP requests
  • 服務端的 HTTP responses
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • 子進程的 stdin
  • process.stdout 和 process.stderr

可寫流有兩個重要的方法:

  •  writableStream.write(chunk[, encoding, callback]) —— 往可寫流里寫入數據;
  •  writableStream.end([chunk, encoding, callback]) —— 停止寫入數據,結束可寫流。在調用 .end() 后,再調用 .write() 方法會產生錯誤。

上方兩方法的 encoding 參數表示編碼字符串(chunk為String時才可以用)。

write 方法的 callback 回調參數會在 chunk 被消費后(從緩存中移除后)被觸發;end 方法的 callback 回調參數則在 Stream 結束時觸發。

另外,如同通過 readable._read() 方法可以處理可讀流,我們可以通過 writable._write(chunk, enc, next) 方法在系統底層處理流寫入的邏輯中,對數據進行處理。

其中參數 chunk 代表寫進來的數據;enc 代表編碼的字符串;next(err) 則是一個回調函數,調用它可以告知消費者進行下一輪的數據流寫入。

示例:

//demo4
const Writable = require('stream').Writable;
const writable = Writable();

writable._write = (chunck, enc, next) => {
    // 輸出打印
    process.stdout.write(chunck.toString().toUpperCase());
    // 寫入完成時,調用`next()`方法通知流傳入下一個數據
    process.nextTick(next)
};

// 所有數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'));

// 將一個數據寫入流中
writable.write('a' + '\n');
writable.write('b' + '\n');
writable.write('c' + '\n');

// 再無數據寫入流時,需要調用`end`方法
writable.end();

執行如下:

2.3 Duplex Streams

Duplex 是雙工的意思,因此很容易猜到 Duplex 流就是既能讀又能寫的一類流,它繼承了 Readable 和 Writable 的接口。

常見的 Duplex Streams 有:

  • TCP sockets
  • zlib streams
  • crypto streams

示例:

//demo5
const Duplex = require('stream').Duplex;
const duplex = Duplex();

duplex._read = function () {
    var date = new Date();
    this.push( date.getFullYear().toString() );
    this.push(null)
};

duplex._write = function (buf, enc, next) {
    console.log( buf.toString() + '\n' );
    next()
};

duplex.on('data', data => console.log( data.toString() ));

duplex.write('the year is');

duplex.end();

執行結果:

2.4 Transform Streams

Transform Stream 是在繼承了 Duplex Streams 的基礎上再進行了擴展,它可以把寫入的數據和輸出的數據,通過 ._transform 接口關聯起來。

常見的 Transform Streams 有:

  • zlib streams
  • crypto streams

示例:

//demo6
const Transform = require('stream').Transform;
class SetName extends Transform {
    constructor(name, option) {
        super(option || {});
        this.name = name || ''
    }
    // .write接口寫入的數據,處理后直接從 data 事件的回調中可取得
    _transform(buf, enc, next) {
        var res = buf.toString().toUpperCase();
        this.push(res + this.name + '\n');
        next()
    }

}

var transform = new SetName('VaJoy');
transform.on('data', data => process.stdout.write(data));

transform.write('my name is ');
transform.write('here is ');
transform.end();

執行結果:

其中的 _transform 是 Transform Streams 的內置方法,所有 Transform Streams 都需要使用該接口來接收輸入和處理輸出,且該方法只能由子類來調用。

_transform 接口格式如下:

transform._transform(chunk, encoding, callback)

第一個參數表示被轉換(transformed)的數據塊(chunk),除非構造方法 option 參數(可選)傳入了 “decodeString : false”,否則其類型均為 Buffer;

第二個參數用于設置編碼,但只有當 chunck 為 String 格式(即構造方法傳入 “decodeString : false”參數)的時候才可配置,否則默認為“buffer”;

第三個參數 callback 用于在 chunk 被處理后調用,通知系統進入下一輪 _transform 調用。該回調方法接收兩個可選參數 —— callback([error, data]),其中的 data 參數可以將 chunck 寫入緩存中(供更后面的消費者去消費)

transform.prototype._transform = function(data, encoding, callback){
    this.push(data);
    callback()
};
///////等價于
transform.prototype._transform = function(data, encoding, callback){
    callback(null, data)
};

另外 Transform Streams 還有一個 _flush(callback) 內置方法,它會在沒有更多可消耗的數據時、在“end”事件之前被觸發,而且會清空緩存數據并結束 Stream。

該內置方法同樣只允許由子類來調用,而且執行后,不能再調用 .push 方法。

關于 Transform Streams 的更多細節還可以參考這篇文章,推薦閱讀。

2.5 Classic Streams

在較早版本的 NodeJS 里,Stream 的實現相較簡陋,例如上文提及的“Stream.Readable”接口均是從 Node 0.9.4 開始才有,因此我們往往需要對其進行多次封裝擴展才能更好地用來開發。

而 Classic Streams 便是對這種古舊模式的 Stream 接口的統稱。

需要留意的是,只要往任意一個 stream 注冊一個“data”事件監聽器,它就會自動切換到“classic”模式,并按照舊的 API 去執行。

classic 流可以當作一個帶有 .pipe 接口的事件發射器(event emitter),當它要為消耗者提供數據時會發射“data”事件,當要結束生產數據時,則發射“end”事件。

另外只有當設置 Stream.readable 為 true 時,.pipe 接口才會將當前流視作可讀流:

//demo7
var Stream = require('stream');
var stream = new Stream();
stream.readable = true; //告訴 .pipe 這是個可讀流

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

另外,Classic readable streams 還有 .pause() 和 .resume() 兩個接口可用于暫停/恢復流的讀取:

createServer(function(q,s) {
  // ADVISORY only!
  q.pause()
  session(q, function(ses) {
    q.on('data', handler)
    q.resume()
  })
})

3. Object Mode

對于可讀流來說,push(data) 時,data 的類型只能是 String 或Buffer,且消耗時 data 事件輸出的數據類型都為 Buffer;

對于可寫流來說,write(data) 時,data 的類型也只能是 String 或 Buffer,_write(data) 調用時所傳進來的 data 類型都為 Buffer。

示例:

//demo8
writable._write = (chunck, enc, next) => {
    // 輸出打印
    console.log(chunck);   //Buffer
    //console.log(chunck.toString());  //轉為String

    process.nextTick(next)
};

writable.write('Happy Chinese Year');
writable.end();

執行結果:

不過,為了增強數據類型的靈活性,無論是可讀流或是可寫流,只需要往其構造函數里傳入配置參數“{ objectMode: true }”,便可往流里傳入/獲取任意類型(null除外)的數據:

const objectModeWritable = Writable({ objectMode: true });

objectModeWritable._write = (chunck, enc, next) => {
    // 輸出打印
    console.log(typeof chunck);
    console.log(chunck);
    process.nextTick(next)
};

objectModeWritable.write('Happy Chinese Year');
objectModeWritable.write( { year : 2017 } );
objectModeWritable.end( 2017 );

執行結果:

4. Stream的兼容問題

在前文我們介紹了 classic streams,它屬于陳舊版本的 Node 上的 Stream 接口,可以把它稱為 Streams1。而從 Node 0.10 開始,Stream 新增了系列實用的新接口,可以做更多除了 .pipe() 之外的事情,我們把其歸類為 Streams2(事實上,在 Node 0.11+開始,Stream有些許新的變動,從該版本開始的 Stream 也可稱為 Streams3)

那么這里存在一個問題 —— 那些使用了 Stream1 的項目(特別是 npm 包),想升級使用環境的 Node 版本到 0.10+,會否導致兼容問題呢?

還好 Streams2 雖然改頭換面,但本質上是設計為向后兼容的。

打個比方,如果你同時推送了一條 Streams2 流和一條舊格式的、基于事件發射器的流,Stream2 將降級為舊模式(shim mode)來向后兼容。

但是,如果我們的開發環境使用的是 Node 0.8(且因為某些原因不能升級),但又想使用 Streams2 的API怎么辦呢?或者比如 npm 上的某些開源的工具包,想要擁抱 Streams2 的便利,又想保持對使用 Node 0.8 的用戶進行兼容處理,這樣又得怎么處理?

針對上述問題,早在 Node 0.10 釋放之前,Issacs 就把 Node-core 中操作 Stream 的核心接口獨立拷貝了一份出來,開源到了 npm 上并持續更新,它就是 readable-stream

通過使用 readable-stream,我們就可以在那些核心里沒有 Streams2/3 的低版本 Node 中,直接使用 Streams2/3:

var Readable = require('stream').Readable || require('readable-stream').Readable

readable-stream 現在有 v1.0.x 和 v1.1.x 兩個主要版本,前者跟進 Streams2 的迭代,后者跟進 Streams3 的迭代,用戶可以根據需求使用對應版本的包。

5. through2

readable-stream 雖然提供了一個 Streams 的兼容方案,但我們也希望能對 Stream 復雜的API進行精簡。

through2 便基于 readable-stream 對 Stream 接口進行了封裝,并提供了更簡單和靈活的方法。

through2 會為你生成 Transform Streams(貌似舊版本是 Duplex Streams)來處理任意你想使用的流 —— 如前文介紹,相比其它流,Transform 流處理起數據會更加靈活方便。

來看下 through2 的示例:

//demo9
const fs = require('fs');
const through2 = require('through2');
fs.createReadStream('data.txt')
    .pipe(through2(function (chunk, enc, callback) {
        for (var i = 0; i < chunk.length; i++)
            if (chunk[i] == 97)
                chunk[i] = 122; // 把 'a' 替換為 'z'

        this.push(chunk);

        callback()
    }))
    .pipe(fs.createWriteStream('out.txt'))
    .on('finish', ()=> {
        console.log('DONE')
    });

使用 through2.obj 接口操作 Object Mode 下的流:

//demo10
const fs = require('fs');
const through2 = require('through2');
const csv2 = require('csv2');

let all = [];

fs.createReadStream('list.csv')
    .pipe(csv2())
    // through2.obj(fn) 是 through2({ objectMode: true }, fn) 的簡易封裝
    .pipe(through2.obj(function (chunk, enc, callback) {
        var data = {
            name: chunk[0],
            sex: chunk[1],
            addr: chunk[2]
        };
        this.push(data);

        callback()
    }))
    .on('data', function (data) {
        all.push(data)
    })
    .on('end', function () {
        console.log(all)
    });

對比原生的 Stream API,through2 簡潔了不少,加上有 readable-stream 依賴加持,也很好理解為何像 gulp 及其插件都會使用 through2 來操作和處理 stream 了。

順便貼下對 through2 的源碼注釋:

var Transform = require('readable-stream/transform'),
    inherits = require('util').inherits,
    xtend = require('xtend');

//構造方法,繼承了Transform
function DestroyableTransform(opts) {
    Transform.call(this, opts);
    this._destroyed = false
}

inherits(DestroyableTransform, Transform);

//原型接口 destroy,用于關閉當前流
DestroyableTransform.prototype.destroy = function (err) {
    if (this._destroyed) return;
    this._destroyed = true;

    var self = this;
    process.nextTick(function () {
        if (err)
            self.emit('error', err);
        self.emit('close')
    })
};

// a noop _transform function
function noop(chunk, enc, callback) {
    callback(null, chunk)
}


// 閉包,用于返回對外接口方法
function through2(construct) {
    //最終返回此匿名函數
    return function (options, transform, flush) {
        if (typeof options == 'function') {
            flush = transform
            transform = options
            options = {}
        }

        if (typeof transform != 'function')
            transform = noop

        if (typeof flush != 'function')
            flush = null

        return construct(options, transform, flush)
    }
}


// 出口,執行 throuh2 閉包函數,返回一個 DestroyableTransform 的實例(t2)
module.exports = through2(function (options, transform, flush) {
    //t2 為 Transform Stream 對象
    var t2 = new DestroyableTransform(options);

    //Transform Streams 的內置接口 _transform(chunk, encoding, next) 方法
    t2._transform = transform;

    if (flush)
        t2._flush = flush;

    return t2
});


// 對外暴露一個可以直接 new (或者不加 new)來創建實例的的構造函數
module.exports.ctor = through2(function (options, transform, flush) {
    function Through2(override) {
        if (!(this instanceof Through2))
            return new Through2(override)

        this.options = xtend(options, override)

        DestroyableTransform.call(this, this.options)
    }

    inherits(Through2, DestroyableTransform)

    Through2.prototype._transform = transform

    if (flush)
        Through2.prototype._flush = flush

    return Through2
})

//Object Mode接口的簡易封裝
module.exports.obj = through2(function (options, transform, flush) {
    var t2 = new DestroyableTransform(xtend({objectMode: true, highWaterMark: 16}, options))

    t2._transform = transform

    if (flush)
        t2._flush = flush

    return t2
})
View Code

以上是本文對 Stream 的一個介紹,但事實上 Stream 還有許多未露面的 API,感興趣的同學可以直接閱讀官方 API文檔做進一步了解。

本篇文章是對后續 gulp 源碼解析系列的一個基礎鋪墊,想了解更多 gulp 相關內容的話可以留意我的博客。最后恭祝大家雞年大吉!共勉~

Reference

⑴ Stream API Doc - https://nodejs.org/api/stream.html

⑵ stream-handbook - https://github.com/substack/stream-handbook

⑶ Node.js Stream - 基礎篇 - http://www.cnblogs.com/zapple/p/5759670.html

⑷ Why I don't use Node's core 'stream' module - https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html


文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()