Bruce Eckel(著有多部編程書籍)和Jonas Boner(Akka的締造者和Typesafe的CTO)發表了“反應性宣言”,在其中嘗試著定義什么是反應性應用。
這樣的應用應該能夠:
- 對事件做出反應:事件驅動的本質,讓反應性應用能夠支持文中提到的若干特性。
- 對負載做出反應:聚焦于可擴展性,而不是單用戶性能。
- 對失敗做出反應:建立彈性系統,能夠從各個層級進行恢復。
- 對用戶做出反應:綜合上述特征,實現交互式用戶體驗。
在這份宣言公布之后,Scala的創造者Martin Odersky、Reactive Extensions的創造者Erik Meijer和Akka科技公司的領導者Roland Kuhn,在Coursera上發布了一套免費課程,名為“反應性編程原理”:
該課程的目標在于講授反應性編程的原理。反應性編程是一門新興的學科,結合了并發、事件驅動和異步系統。對于編寫任何類型的Web服務或分布式系統來說,它都至關重要;同時它在眾多高性能并發系統中占有核心位置。反應性變成可以被視作高階函數式編程對并發系統的自然拓展,通過協調和編排Actor交換的異步數據流,來處理分布的狀態。
Reactive Extensions(Rx)的優點在于能夠將傳統的異步編程方式從支離破碎的代碼調用中解放出來。Rx能夠使的我們可以將異步代碼寫到一個單獨的方法中,使得代碼可讀性和可維護性大大增強。
《Reactive Extensions介紹》我們了解了Rx中的一些比較重要的操作符,本文中我們將會學習如何將Reactive Extensions(Rx)應用到我們的應用程序中。
同步方法調用是阻塞式的,在很多場景下這是不合適的。我們能夠用Rx改造成異步調用。一個最簡單的方法就是使用IObservable.Start方法,使得Rx為我們來管理這些異步調用。
public static void ObservableStart(int x, int y) { PlusTwoNumberAsync(x, y).Subscribe(Console.WriteLine); Console.ReadKey(); } private static IObservable<int> PlusTwoNumberAsync(int x, int y) { return Observable.Start(() => PlusTwoNumber(x, y)); } private static int PlusTwoNumber(int x, int y) { Thread.Sleep(5000); return x + y; }
除了Observable.Start外也可以使用Observable.Return來將同步方法改造為異步方法。只需要將上面的PlusTwoNumberAsync方法改為下面即可,運行程序的效果相同。
private static IObservable<int> PlusTwoNumberReturnAsync(int x, int y) { return Observable.Return(PlusTwoNumber(x, y)); }
使用SelectMany可以很方便的實現諸如在一個異步方法中調用另外一個異步方法的功能。
public static void ObservableSelectMany(int x, int y)
{
PlusTwoNumberStartAsync(x, y).SelectMany(aPlusB => MultiplyByFiveAsync(aPlusB)).Subscribe(Console.WriteLine);
}
private static IObservable<int> MultiplyByFiveAsync(int x)
{
return Observable.Return(MultiplyByFive(x));
}
private static int MultiplyByFive(int x)
{
Thread.Sleep(5000);
return x * 5;
}
完整代碼如下:
// ----------------------------------------------------------------------- // <copyright file="RxAsyncCall.cs" company=""> // TODO: Update copyright text. // </copyright> // ----------------------------------------------------------------------- namespace RxPractice { using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reactive.Linq; using System.Threading; /// <summary> /// 異步調用 /// </summary> public class RxAsyncCall { public static void ObservableStart(int x, int y) { PlusTwoNumberStartAsync(x, y).Subscribe(Console.WriteLine); } public static void ObservableReturn(int x, int y) { PlusTwoNumberReturnAsync(x, y).Subscribe(Console.WriteLine); } public static void ObservableSelectMany(int x, int y) { PlusTwoNumberStartAsync(x, y).SelectMany(aPlusB => MultiplyByFiveAsync(aPlusB)).Subscribe(Console.WriteLine); } private static IObservable<int> PlusTwoNumberStartAsync(int x, int y) { return Observable.Start(() => PlusTwoNumber(x, y)); } private static int PlusTwoNumber(int x, int y) { Thread.Sleep(2000); return x + y; } private static IObservable<int> MultiplyByFiveAsync(int x) { return Observable.Return(MultiplyByFive(x)); } private static int MultiplyByFive(int x) { Thread.Sleep(5000); return x * 5; } private static IObservable<int> PlusTwoNumberReturnAsync(int x, int y) { return Observable.Return(PlusTwoNumber(x, y)); } } }
Implementing the GeoCoordinateWatcher as a Reactive Service
Using Reactive Extensions for Streaming Data from Database
Bing it on, Reactive Extensions! – Story, code and slides
IntroToRx.com is the online resource for getting started with the Reactive Extensions to .Net
文章列表