Home  >  Article  >  Java  >  A code example of RxJava_02 [Subscription depth and exception handling]

A code example of RxJava_02 [Subscription depth and exception handling]

黄舟
黄舟Original
2017-03-04 09:41:191010browse

This tutorial is a comprehensive explanation based on the RxJava1.x version. Subsequent courses will be updated one after another, so stay tuned...

8. Connectable Observables

The previous sections mentioned three details in the response application process: observed observers and subscriptions. The next section continues to understand other knowledge points about subscription.

Here we introduce a connectable observable called ConnectableObservable. A connectable Observable is just like a normal Observable, except for this: a connectable Observable does not start emitting data when it is subscribed to, only when its connect() is called. In this way, you can wait until all potential subscribers are subscribed to the Observable before starting to emit data.

Publish

This operator can convert an ordinary Observable into a connectable Observable.

Connectable Observable (connectable Observable) is similar to an ordinary Observable, but it does not start emitting data when it is subscribed, but does not start until the Connect operator is used. In this way, you can have an Observable start emitting data at any time.

Connect in RxJava is a method of the ConnectableObservable interface. You can use the publish operator to convert an ordinary Observable into a ConnectableObservable.

//创建了一个普通的Observable对象
Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Different from ordinary Observable object subscriptions, the call() method of the Action1 object is not directly called in the above code.

Connect

A connectable Observable (connectable Observable) is similar to a normal Observable, but it does not start emitting data when it is subscribed, but not until the Connect operator is used. will start. With this method, you can wait until all observers are subscribed to the Observable before starting to emit data.

//创建了一个普通的Observable对象
Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableObservable.connect(new Action1() {
    @Override
    public void call(Subscription subscription) {
        Log.i(TAG, "call: "+subscription);
    }
});

Output:

IT520: call: OperatorPublish$PublishSubscriber@20dce78
IT520: call: 1
IT520: call: 2
IT520: call: 3

RefCount

The RefCount operator can convert a connectable Observable into an ordinary Observable.

//创建了一个普通的Observable对象
Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable connectableObservable =observable.publish();

//将一个可链接的被观察者转换成一个普通观察者
Observable integerObservable = connectableObservable.refCount();


//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Replay

Ensure that all observers receive the same data sequence, even if they subscribe after the Observable starts emitting data.

Let’s look at an example first:

Observable observable =
            Observable.create(new Observable.OnSubscribe() {
                @Override
                public void call(Subscriber subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });

ConnectableObservable connectableObservable =observable.publish();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

Output:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3

First we convert an ordinary Observable into a ConnectableObservable through publish(). When connect() is called, the observers subscribed above connect() will receive the data. Observers subscribed after connect() cannot receive data. If we want all observers to receive data at the same time when calling connect(), regardless of the order of subscriptions, we need to use replay().

Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//这里将publish()改为replay()
ConnectableObservable connectableObservable =observable.replay();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

Output:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3
com.m520it.rxjava I/IT520: call--2--: 1
com.m520it.rxjava I/IT520: call--2--: 2
com.m520it.rxjava I/IT520: call--2--: 3

9. "Cold Observable" & "Hot Observable"

We mentioned earlier that when subscribing (if the observer sends data) , some observers receive data directly, and some wait for a period of time before receiving data.

  • We call an observer that can receive data as soon as it is subscribed to an observer a "hot Observable".

  • For example, the ConnectableObservable above cannot send data even after being subscribed. Only by calling connect() can the observer receive the data. We call this observer a "cold Observable"

10. Error handling

Many operators can be used to respond to onError notifications emitted by an Observable or from errors. Recovery

The Catch operator intercepts the onError notification of the original Observable and replaces it with other data items or data sequences, so that the generated Observable can terminate normally or not at all.

RxJava implements Catch as three different operators:

onErrorReturn

The onErrorReturn method returns a new Observable that mirrors the behavior of the original Observable, The latter will ignore the former's onError call and will not pass the error to the observer. Instead, it will emit a special item and call the observer's onCompleted method.

The following code sends 1, 2, 3 and simulates sending an exception during the sending process. As long as an exception is sent, onErrorReturn() will be called and 44 will be sent. The code is as follows:

Observable.create(new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorReturn(new Func1() {
        @Override
        public Integer call(Throwable throwable) {
            return 44;
        }
    }).subscribe(new Action1() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

Output:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 44

onErrorResumeNext

Let the Observable start emitting the data sequence of the second Observable when it encounters an error.

The following code simulates sending an exception when sending. Then onErrorResumeNext will be called and start emitting new Observable objects.

Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onError(new NullPointerException("mock exception !"));
        subscriber.onNext(3);
    }
}).onErrorResumeNext(new Func1>() {
    @Override
    public Observable call(Throwable throwable) {
            Observable innerObservable =
                    Observable.create(new Observable.OnSubscribe() {
                        @Override
                        public void call(Subscriber subscriber) {
                            subscriber.onNext(4);
                            subscriber.onNext(5);
                        }
                    });
        return innerObservable;
    }
}).subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Output:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 3
com.m520it.rxjava I/IT520: call: 4
com.m520it.rxjava I/IT520: call: 5

onExceptionResumeNext

Let the Observable continue to emit subsequent data items when encountering an error.

//创建一个错误处理的Observable对象
Observable exceptionObserver = Observable
        .create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(55);
                subscriber.onNext(66);
            }
        });

Observable
        .create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
        .onExceptionResumeNext(exceptionObserver)
        .subscribe(new Action1() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });

Output:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66

Retry retry mechanism

If the original Observable encounters an error, resubscribe to it and expect it to terminate gracefully.

The implementation in RxJava is retry and retryWhen.

Observable.create(new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    })
    .retry(3)//重复3次订阅
    .subscribe(new Action1() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

Similar functions are:

  • Javadoc: retry()) No matter how many onError notifications are received, it will continue to subscribe and launch Primitive Observable.

  • Javadoc: retry(long)) retry will re-subscribe up to the specified number of times. If the number of times exceeds, it will not try to subscribe again

  • Javadoc: retry(Func2))

  • retryWhen

The above is an in-depth explanation of the code example of RxJava_02 [Subscription depth and exception handling]. For more related content, please pay attention to the PHP Chinese website (m.sbmmt.com)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn