文章出處

初探RxJava


1. RxJava是什么?

“a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)。 —RxJava 在 GitHub 主頁上的自我介紹

一個擴展的觀察者模式 一個異步數據流

觀察者模式是一個觀察者(observer或者subscriber)和一個被觀察者(observable),觀察者模式定義了一種一對多的依賴關系,讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態發生變化時,會通知所有觀察者對象,使它們能夠自動更新自己。

觀察者模式:

Observable發出事件 Subscrible訂閱事件

傳統的觀察者模式有有明顯缺點導致一個被觀察者發出的事件如果發生異常后面的事件也就無法正常發出或被觀察者捕獲:

1. 無法獲知事件的終止

2. 沒有事件異常處理機制

RxJava的觀察者模式典型寫法是這樣的:

observable.subscribe(observer);// 或者:observable.subscribe(subscriber);

2.RxJava的基本實現

基于以上的概念, RxJava 的基本實現主要有三點:

1) 創建 Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 接口的實現方式:

Observer observer = new Observer() {    @Override    public void onNext(String s) {        Log.d(tag, "Item: " + s);    }    @Override    public void onCompleted() {        Log.d(tag, "Completed!");    }    @Override    public void onError(Throwable e) {        Log.d(tag, "Error!");    }};

除了 Observer 接口之外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:

Subscriber subscriber = new Subscriber() {    @Override    public void onNext(String s) {        Log.d(tag, "Item: " + s);    }    @Override    public void onCompleted() {        Log.d(tag, "Completed!");    }    @Override    public void onError(Throwable e) {        Log.d(tag, "Error!");    }};

2) 創建 Observable

Observable 即被觀察者,它決定什么時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來創建一個 Observable ,并為它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe() {    @Override    public void call(Subscriber subscriber) {        subscriber.onNext("Hello");        subscriber.onNext("Hi");        subscriber.onNext("Aloha");        subscriber.onCompleted();    }});

可以看到,這里傳入了一個 OnSubscribe 對象作為參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的作用相當于一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(對于上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

3) Subscribe (訂閱)

創建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯結起來,整條鏈子就可以工作了。代碼形式很簡單:

observable.subscribe(observer);// 或者:observable.subscribe(subscriber);

這里的subscrible有點怪,有點像observable訂閱了subscriber,就像雜志訂閱了讀者一樣。我想是基于作者對于流失API的設計,當然也可以取名字為subscribeBy.

4) 場景示例

設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView ,它的作用是顯示多張圖片,并能使用 addImage(Bitmap) 方法來任意增加顯示的圖片。現在需要程序將一個給出的目錄數組 File[] folders 中每個目錄下的 png 圖片都加載出來并顯示在 imageCollectorView 中。需要注意的是,由于讀取圖片的這一過程較為耗時,需要放在后臺執行,而圖片的顯示則必須在 UI 線程執行。常用的實現方式有多種,我這里貼出其中一種:

new Thread() {    @Override    public void run() {        super.run();        for (File folder : folders) {            File[] files = folder.listFiles();            for (File file : files) {                if (file.getName().endsWith(".png")) {                    final Bitmap bitmap = getBitmapFromFile(file);                    getActivity().runOnUiThread(new Runnable() {                        @Override                        public void run() {                            imageCollectorView.addImage(bitmap);                        }                    });                }            }        }    }}.start();

而使用RxJava的寫法是這樣的:

Observable.from(folders)    .flatMap(new Func1>() {        @Override        public Observable call(File file) {            return Observable.from(file.listFiles());        }    })    .filter(new Func1() {        @Override        public Boolean call(File file) {            return file.getName().endsWith(".png");        }    })    .map(new Func1() {        @Override        public Bitmap call(File file) {            return getBitmapFromFile(file);        }    })    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    .subscribe(new Action1() {        @Override        public void call(Bitmap bitmap) {            imageCollectorView.addImage(bitmap);        }    });,>,>,>

3. RxJava的異步數據處理指定被觀察者生產數據所在的線程 指定訂閱者接受數據所在的線程 強大的數據處理能力

先給大家一個實例,假設 /user 接口并不能直接訪問,而需要填入一個在線獲取的 token ,代碼應該怎么寫?

Callback 方式,可以使用嵌套的 Callback:

@GET("/token")public void getToken(Callback callback);@GET("/user")public void getUser(@Query("token") String token, @Query("userId") String userId, Callback callback);...getToken(new Callback() {    @Override    public void success(String token) {        getUser(token, userId, new Callback() {            @Override            public void success(User user) {                userView.setUser(user);            }            @Override            public void failure(RetrofitError error) {                // Error handling                ...            }        };    }    @Override    public void failure(RetrofitError error) {        // Error handling        ...    }});

倒是沒有什么性能問題,可是迷之縮進毀一生,據說做過大項目的人應該更懂,而使用 RxJava的話,代碼是這樣的:

@GET("/token")public Observable getToken();@GET("/user")public Observable getUser(@Query("token") String token, @Query("userId") String userId);...getToken()    .flatMap(new Func1>() {        @Override        public Observable onNext(String token) {            return getUser(token, userId);        })    .observeOn(AndroidSchedulers.mainThread())    .subscribe(new Observer() {        @Override        public void onNext(User user) {            userView.setUser(user);        }        @Override        public void onCompleted() {        }        @Override        public void onError(Throwable error) {            // Error handling            ...        }    });,>

1)數據處理

- 方便對數據進行各種變換操作

- 內置豐富的operators

- 自定義operator

eg:數據源返回一個一二三的整數數組,我們想把它轉換成一個字符串數組

observable.just(1,2,3)            .map(new Func1() {                @Override                public String call(Integer integer) {                    return Integer.toString(integer));                }            })            .subscribe(new Action1() {                @Override                public void call(String string) {                    System.out.println(string);                }            });

A.RxJava的操作符:

創建Observable create just from 變換Observable map flatMap 過濾Observable filter first last 合并Observable merge zip 聚合函數Observable reduce count

下面我們來看一個flatMap的例子:

首先假設這么一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:

Student[] students = ...;Subscriber subscriber = new Subscriber() {    @Override    public void onNext(String name) {        Log.d(tag, name);    }    ...};Observable.from(students)    .map(new Func1() {        @Override        public String call(Student student) {            return student.getName();        }    })    .subscribe(subscriber);,>

很簡單。那么再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區別在于,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現:

Student[] students = ...;Subscriber subscriber = new Subscriber() {    @Override    public void onNext(Student student) {        List courses = student.getCourses();        for (int i = 0; i < courses.size(); i++) {            Course course = courses.get(i);            Log.d(tag, course.getName());        }    }    ...};Observable.from(students)    .subscribe(subscriber);

依然很簡單。那么如果我不想在 Subscriber 中使用 for 循環,而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對于代碼復用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉化,而我現在的要求是一對多的轉化。那怎么才能把一個 Student 轉化成多個 Course 呢?

這個時候,就需要用 flatMap() 了:

Student[] students = ...;Subscriber subscriber = new Subscriber() {    @Override    public void onNext(Course course) {        Log.d(tag, course.getName());    }    ...};Observable.from(students)    .flatMap(new Func1>() {        @Override        public Observable call(Student student) {            return Observable.from(student.getCourses());        }    })    .subscribe(subscriber);,>

從上面的代碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數轉化之后返回另一個對象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發送到了 Subscriber 的回調方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件對象創建一個 Observable 對象;2. 并不發送這個 Observable, 而是將它激活,于是它開始發送事件;3. 每一個創建出來的 Observable 發送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,通過一組新創建的 Observable 將初始的對象『鋪平』之后通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。

flatMap示意圖:

flatMap示意圖

C.然后就是變換的原理:

Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。public Subscription subscribe(Subscriber subscriber) {    subscriber.onStart();    onSubscribe.call(subscriber);    return subscriber;}

可以看到,subscriber() 做了3件事:

1.調用 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。

2.調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發送的邏輯開始運行。從這也可以看出,在 RxJava 中, Observable 并不是在創建的時候就立即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。

3.將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().

這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基于同一個基礎的變換方法: lift(Operator)。首先看一下 lift() 的內部實現(僅核心代碼):

// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。public  Observable lift(Operator operator) {    return Observable.create(new OnSubscribe() {        @Override        public void call(Subscriber subscriber) {            Subscriber newSubscriber = operator.call(subscriber);            newSubscriber.onStart();            onSubscribe.call(newSubscriber);        }    });}

這段代碼很有意思:它生成了一個新的 Observable 并返回,而且創建新 Observable 所用的參數 OnSubscribe 的回調方法 call() 中的實現竟然看起來和前面講過的 Observable.subscribe() 一樣!然而它們并不一樣喲~不一樣的地方關鍵就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對象不同

subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對象,這個沒有問題,但是 lift() 之后的情況就復雜了點。 當含有 lift() 時:

1.lift() 創建了一個 Observable 后,加上之前的原始 Observable,已經有兩個 Observable 了;

2.而同樣地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個 OnSubscribe;

3.當用戶調用經過 lift() 后的 Observable 的 subscribe() 的時候,使用的是 lift() 所返回的新的 Observable ,于是它所觸發的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個 OnSubscribe;

4.而這個新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber(Operator 就是在這里,通過自己的 call() 方法將新 Subscriber 和原始 Subscriber 進行關聯,并插入自己的『變換』代碼以實現變換),然后利用這個新 Subscriber 向原始 Observable 進行訂閱。

這樣就實現了 lift() 過程,有點像一種代理機制,通過事件攔截和處理實現事件序列的變換

2)Scheduler:默認情況下RxJava中生產者和訂閱者在當前線程下運行,Scheduler就是方便切換生產者和訂閱者的線程。

Schedulers:

- Schdulers.immediate()

- Schdulers.newThead()

- Schdulers.computation()

- Schdulers.io()

- AndoridSchdulers.mainThead()

A.Scheduler 的 API

上代碼:

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定    .subscribeOn(Schedulers.io())    .observeOn(Schedulers.newThread())    .map(mapOperator) // 新線程,由 observeOn() 指定    .observeOn(Schedulers.io())    .map(mapOperator2) // IO 線程,由 observeOn() 指定    .observeOn(AndroidSchedulers.mainThread)     .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

B.Scheduler的原理

其實, subscribeOn() 和 observeOn() 的內部實現,也是用的 lift()。具體看圖(不同顏色的箭頭表示不同的線程):

subscribeOn() 原理圖:

subscrible原理圖

observeOn() 原理圖:

observeOn原理圖

從圖中可以看出,subscribeOn() 和 observeOn() 都做了線程切換的工作(圖中的 “schedule…” 部位)。不同的是, subscribeOn() 的線程切換發生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始發送,因此 subscribeOn() 的線程控制可以從事件發出的開端就造成影響;而 observeOn() 的線程切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級 Subscriber 發送事件時,因此 observeOn() 控制的是它后面的線程。

4. RxJava的適用場景出現多層嵌套回調 復雜的數據處理 響應式UI 復雜的線程切換就愛閱讀www.92to.com網友整理上傳,為您提供最全的知識大全,期待您的分享,轉載請注明出處。
歡迎轉載:http://www.kanwencang.com/bangong/20161102/33004.html

文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()