1. Brief
一直對Observer Pattern和Pub/Sub Pattern有所混淆,下面打算通過這兩篇Blog來梳理這兩種模式。若有紕漏請大家指正。
2. Role
Publisher:消息發布者,組裝原始消息實體并觸發消息傳遞的主體。
Subscriber:消息訂閱者,接收消息實體并作出響應的主體。
Message Broker or Event Bus:消息發布者 與 消息訂閱者 間的媒介,內含消息過濾和消息路由的功能,并可通過內置的消息隊列(message queue)現實優先級處理。
其中 過濾 功能又細分為topic-based和content-based兩種類型
topic-based就是為消息主題建立獨立通道,訂閱特定主題的訂閱者將通過對應的通道接收消息;
content-based就是以消息內容為處理源,訂閱者僅接收消息內容與目標關鍵詞匹配的消息。
此外還可以采用hybird=topic-based和content-based的方式。
3. Advantages
1. Loose coupling
由于引入Message Broker來處理消息過濾和路由等功能,而Subscriber又是通過Messsage Broker來訂閱消息,從而實現Publisher和Subscriber的低耦合。這種低耦合體現在兩個方面。
空間:Publisher和Subscriber可運行在兩個不同的進程,甚至是機器上;
時間:Publisher和Subscriber不必同時運行,通過Message Broker作為消息中轉站暫存消息(Store and forward)并實現消息異步處理。
2. Scalability
由于Publisher和Subscriber在空間和時間上松耦合,那么我們就可以通過增加進程/機器的方式來提高處理能力。
4. Disadvantages —— Semantic Coupling
"The most insidious kind of coupling occurs when one module makes use, not of some syntactic element of another module, but of some semantic knowledge of another module's inner workings" —— chapter 5 of Code Complete
語義耦合(Semantic Coupling),是一種隱晦的耦合類型,導致代碼重構、調試、修改復雜度急劇增加的主要原因。
類型如下:
1. 操作順序耦合:使用一個對象,需要先調用Init(),之后才能調用DoAnything()。這種順序耦合,即使在文檔中remark也是極為不優雅的做法;
2. 全局參數傳遞:模塊A修改了某個全局參數g_val,模塊B讀取該值。模塊B必須知道模塊A已經對該參數賦值;
3. 業務封裝不夠緊密:模塊A向模塊B傳一個參數,模塊B根據該參數選擇對應的操作。模塊A必須知道與業務相關的所有的操作類型。對于模塊A,僅傳遞模塊A自身可以理解的語義,或者通俗的概念作為參數,而不是被封裝的業務相關的參數;
4. 超越接口的數據類型約定:模塊A向模塊B傳遞一個接口的指針,模塊B將其強制轉換為派生類的指針。當模塊B知道該接口的實際類型時,封裝已經被破壞了。非相關模塊只能對接口操作,而不應對接口之外的職責進行約定。
public interface Customer{} public class VIP : Customer{ public void Serve(){} } public static void main(String[] args){ Customer customer = new VIP(); Serve(customer); } public static void Serve(Customer customer){ VIP vip = (VIP)customer; vip.Serve(); }
5. Simple implementation
/** * @class * @private * @description 幫助類 */ class Utils{ /* @method 對用戶輸入的filter進行加工 * @static * @package * @param {DOMString|Function} origFilter - 用戶輸出的filter * @returns {Function} * @exception */ static wrapFilter(origFilter){ var type = typeof origFilter if ('string' === type) return message => RegExp(origFilter, 'i').test(message.topic) else if ('function' === type) return origFilter throw Error('the type of argument(0) is wrong, which accepts string or function instance only!') } /* @method 添加消息到消息隊列 * @static * @package * @param {Array.<Message>} mq - 消息隊列 * @param {Message} message - 被添加的消息實例 * @param {Object.<{Number} GUIDofTimer, {Number} pause>} timer - 定時執行器 */ static addMessage(mq, message, timer){ timer.pause = 1 if (!(timer.pause = mq.length)) return mq.push(message) for(var i = 0, inserted = 0, m; !inserted && (m = mq[i]); ++i) if(inserted = m.priority < message.priority) mq.splice(i, 0, message) if(!inserted) mq.push(message) timer.pause = 0 } /* @method 分發消息給訂閱者 * @static * @package * @param {Array.<Message>} mq - 消息隊列 * @param {Array} subs - 訂閱者池 * @returns {Array.<Message>} - 未被響應的消息回收隊列 */ static dispatch(mq, subs){ var message, unresponsedMq = [] while(message = mq.shift()){ let found = 0 for(let sub of subs) if(sub.filter(message) && ++found) sub.handler(message) if (!found) unresponsedMq.push(message) } return unresponsedMq } static doUnsub(subs, pred){ var sub, remainSubs = [] while(sub = subs.shift()) if(pred());else remainSubs.push(sub) return remainSubs } } /** * @class * @private * @description 消息類 */ class Message{ constructor(topic, content, priority = 0){ this.topic = topic this.content = content this.priority = priority } } /** * @class * @public * @description 消息中轉站 */ export default class MrB{ constructor(during = 10){ this.mq = [] this.subs = [] // dispatch message to subscribers, then recycle unhandled messages this.timer = {timer: setInterval(()=>{ if (this.timer.pause) return this.mq = Utils.dispatch(this.mq, this.subs) }, during)} } /* @method 訂閱消息 * @public * @param {DOMString|Function} filter - 消息過濾器 * @param {Function} handler - 消息響應函數 * @returns {DOMString} - 訂閱編號,用于退訂 */ sub(filter, handler){ var guid = (+new Date()) + '' + 100000*Math.random() this.subs.push({guid: guid, filter: Utils.wrapFilter(filter), handler: handler}) return guid } /* @method 退訂 * @public * @param {DOMString|Function} filter/guid - 消息過濾器 或 訂閱編號 * @param {Function} [handler] - 消息響應函數 */ unsub(filter/*or guid*/, handler){ this.subs = Utils.doUnsub(this.subs, handler ? (sub)=>sub.filter.toString() === filter.toString() && sub.handler.toString() === handler.toString() : (sub)=>sub.guid === filter.toString() || sub.filter.toString() === filter.toString()) } /* @method 發布消息 * @public * @param {DOMString|Object|Message} topic - 消息主題 或 消息實體 * @param {DOMString} [content=''] - 消息內容 * @param {Number} [priority=0] - 消息優先級 */ pub(topic, content = '', priority = 0){ var message if(1 === arguments.length) message = (topic.priority = topic.priority || 0, topic) else message = new Message(topic, content, priority) // push message to mq Utils.addMessage(this.mq, message, this.timer) } }
6. Caution
1. Pub/Sub模式是Messaging模式的一種,而Messaging模式是一種基于網絡的架構模式(network-oriented architectural pattern),也就是說是以跨進程通信為應用范圍;而Observer模式則是基于對象事件的設計模式(object-event oriented pattern),并且其應用范圍是單進程內的。
2. Pub/Sub模式適用于非實時處理;
7. Idea
在頁面開發時我偏向于Component-Driven dev的開發模式,表面上是將頁面切割為一個個功能獨立的組件,本質上是將問題相關的依賴內聚,從而更好地識別、解決問題。而組件間的通信則是采用這種開發模式所必定要考慮的另一個問題。解決方案有好多,但我覺得基本原則應該是:
1. 由于組件是相互獨立的松耦合結構,它們之間的通信不應該帶來耦合度上揚的副作用;(若組件間通信是緊密的,應該考慮是否開發成子組件更為合適)
2. 組件間通信的通道應該是配置的,這樣才能靈活地對數據流作加工。(如:寫日志、數據轉換、類型轉換等)
而采用Pub/Sub模式,利用消息作為組件間通信的數據載體,Message Broker負責信息的過濾和路由,實現消息在組件間的流轉。另外可以通過定制Message Broker實現自定義組件通信通道,以AOP方式實現基礎服務功能。
而這種方式不可避免地會引入新問題:
1. Message Broker作為消息中轉站具有異步處理的特性,若需要同步執行,那么則需要引入另一種方式;
2. 由于組件間松耦合,必須通過良好的日志記錄方式來記錄消息流轉路徑,否則無法debug。
8. Conclusion
尊重原創,轉載請注明來自:http://www.cnblogs.com/fsjohnhuang/p/4624566.html ^_^肥子John
9. Thanks
https://en.wikipedia.org/wiki/Messaging_pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern
http://stackoverflow.com/questions/11857325/publisher-subscriber-vs-observer
https://en.wikipedia.org/wiki/Store_and_forward
文章列表