• 技术文章 >Java >java教程

    深入浅出RxJava_02[订阅深入和异常处理]的代码示例

    黄舟黄舟2017-03-04 09:41:19原创493

    本教程基于RxJava1.x版本进行全面讲解,后续课程将陆续更新,敬请关注…

    8.可连接的被观察者

    前几节提到的了响应用过程中的三个细节:被观察者 观察者 和订阅。 接下来这一节继续理解下订阅的其他知识点。

    这里介绍一个叫做ConnectableObservable的可连接被观察者。一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。

    Publish

    该操作符可以将普通的Observable转化成可连接的Observable。

    可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

    RxJava中connect是ConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable。

    //创建了一个普通的Observable对象
    Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });
    
    //将一个被观察者转换成一个可连接的被观察者
    ConnectableObservable<Integer> connectableObservable =observable.publish();
    
    //为可连接的被观察者订阅事件,但这里并不会马上发送事件
    connectableObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

    与普通的Observable对象订阅不同,上面的代码中并没直接调用Action1对象的call()方法。

    Connect

    可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。

    //创建了一个普通的Observable对象
    Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });
    
    //将一个被观察者转换成一个可连接的被观察者
    ConnectableObservable<Integer> connectableObservable =observable.publish();
    
    //为可连接的被观察者订阅事件,但这里并不会马上发送事件
    connectableObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });
    
    //当调用可连接的被观察者connect()方法后 开始发送所有的数据。
    connectableObservable.connect(new Action1<Subscription>() {
        @Override
        public void call(Subscription subscription) {
            Log.i(TAG, "call: "+subscription);
        }
    });

    输出:

    IT520: call: OperatorPublish$PublishSubscriber@20dce78
    IT520: call: 1
    IT520: call: 2
    IT520: call: 3

    RefCount

    RefCount操作符可以让一个可连接的Observable转换为普通的Observable。

    //创建了一个普通的Observable对象
    Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });
    
    //将一个被观察者转换成一个可连接的被观察者
    ConnectableObservable<Integer> connectableObservable =observable.publish();
    
    //将一个可链接的被观察者转换成一个普通观察者
    Observable<Integer> integerObservable = connectableObservable.refCount();
    
    
    //为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
    integerObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

    Replay

    保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅.

    先来看一个例子:

    Observable<Integer> observable =
                Observable.create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        subscriber.onNext(1);
                        subscriber.onNext(2);
                        subscriber.onNext(3);
                    }
                });
    
    ConnectableObservable<Integer> connectableObservable =observable.publish();
    
    connectableObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call--1--: "+integer);
        }
    });
    
    connectableObservable.connect();
    
    connectableObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call--2--: "+integer);
        }
    });

    输出:

    com.m520it.rxjava I/IT520: call--1--: 1
    com.m520it.rxjava I/IT520: call--1--: 2
    com.m520it.rxjava I/IT520: call--1--: 3

    首先我们通过publish()将一个普通的Observable转换成ConnectableObservable。当调用connect()的时候,则connect()上面已经订阅的观察者会收到数据。而connect()后面订阅的观察者则无法接收到数据。 如果我们想让所有的观察者在调用connect()的时候同时接收到数据而跟订阅的顺序无关,则需要通过replay()。

    Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });
    
    //这里将publish()改为replay()
    ConnectableObservable<Integer> connectableObservable =observable.replay();
    
    connectableObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call--1--: "+integer);
        }
    });
    
    connectableObservable.connect();
    
    connectableObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call--2--: "+integer);
        }
    });

    输出:

    com.m520it.rxjava I/IT520: call--1--: 1
    com.m520it.rxjava I/IT520: call--1--: 2
    com.m520it.rxjava I/IT520: call--1--: 3
    com.m520it.rxjava I/IT520: call--2--: 1
    com.m520it.rxjava I/IT520: call--2--: 2
    com.m520it.rxjava I/IT520: call--2--: 3

    9.“冷Observable”&“热Observable”

    前面我们提到,订阅的时候(如果观察者有发送数据的),观察者有直接接收数据的,有等过了一段时间才接收数据的。

    10.错误处理

    很多操作符可用于对Observable发射的onError通知做出响应或者从错误中恢复

    Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

    RxJava将Catch实现为三个不同的操作符:

    onErrorReturn

    onErrorReturn方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法。

    下面的代码发送1,2,3 并在发送的过程中模拟发送一个异常,只要有异常发送,onErrorReturn()就会被调用 并发送44.代码如下:

    Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        }).onErrorReturn(new Func1<Throwable, Integer>() {
            @Override
            public Integer call(Throwable throwable) {
                return 44;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });

    输出:

    com.m520it.rxjava I/IT520: call: 1
    com.m520it.rxjava I/IT520: call: 2
    com.m520it.rxjava I/IT520: call: 44

    onErrorResumeNext

    让Observable在遇到错误时开始发射第二个Observable的数据序列。

    下面的代码在发送的时候,模拟发送一个异常。接着onErrorResumeNext就会被调用 并开始发射新的Observable对象。

    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
        @Override
        public Observable<? extends Integer> call(Throwable throwable) {
                Observable<Integer> innerObservable =
                        Observable.create(new Observable.OnSubscribe<Integer>() {
                            @Override
                            public void call(Subscriber<? super Integer> subscriber) {
                                subscriber.onNext(4);
                                subscriber.onNext(5);
                            }
                        });
            return innerObservable;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

    输出:

    com.m520it.rxjava I/IT520: call: 1
    com.m520it.rxjava I/IT520: call: 2
    com.m520it.rxjava I/IT520: call: 3
    com.m520it.rxjava I/IT520: call: 4
    com.m520it.rxjava I/IT520: call: 5

    onExceptionResumeNext

    让Observable在遇到错误时继续发射后面的数据项。

    //创建一个错误处理的Observable对象
    Observable<Integer> exceptionObserver = Observable
            .create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(55);
                    subscriber.onNext(66);
                }
            });
    
    Observable
            .create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onError(new NullPointerException("mock exception !"));
                    subscriber.onNext(3);
                }
            })
            //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
            .onExceptionResumeNext(exceptionObserver)
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.i(TAG, "call: "+integer);
                }
            });

    输出:

    com.m520it.rxjava I/IT520: call: 1
    com.m520it.rxjava I/IT520: call: 2
    com.m520it.rxjava I/IT520: call: 55
    com.m520it.rxjava I/IT520: call: 66

    Retry重试机制

    如果原始Observable遇到错误,重新订阅它并期望它能正常终止。

    RxJava中的实现为retry和retryWhen。

    Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        .retry(3)//重复3次订阅
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });

    类似的函数还有:

    以上就是深入浅出RxJava_02[订阅深入和异常处理]的代码示例的内容,更多相关内容请关注PHP中文网(m.sbmmt.com)!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:深入浅出RxJava_01[什么是RxJava] 的详细介绍 下一篇:深入浅出RxJava_03[被观察者创建操作]的详细介绍
    Web大前端开发直播班

    相关文章推荐

    • JAVA详细解析之IO流、File、字节流以及字符流• 完全掌握Java单例模式• 图文详解!java中锁的整理总结• JAVA学习IO操作之字节流和字符流(总结分享)• JAVA面向对象之继承、构造方法、重写和重载(总结分享)

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网