Home  >  Article  >  Java  >  A detailed introduction to RxJava_01[What is RxJava]

A detailed introduction to RxJava_01[What is RxJava]

黄舟
黄舟Original
2017-03-04 09:38:141281browse


This tutorial is a comprehensive explanation based on RxJava1.x version. Subsequent courses will be updated one after another, so stay tuned...

1. What is RxJava

  • Rx is the abbreviation of Reactive Extensions, translated as response extension. That is, through the core framework code where one party sends information and the other party responds to the information and processes it.

  • The framework was developed by a team led by Microsoft architect Erik Meijer and was open sourced in November 2012.

  • The Rx library supports .NET, JavaScript, C++, etc., and now supports almost all popular programming languages.

  • Most of Rx's language libraries are maintained by the ReactiveX organization. The more popular ones are RxJava/RxJS/Rx.NET, and the community website is reactivex.io.

  • RxJava is a popular framework, and its source code is based on GitHub. In addition to supporting RxJava, there is also a support framework for Android systems, RxAndroid

2. RxJava simplified code

Generally in Android projects, if we want to get data from the background and refresh the interface, the code is roughly as follows. Let’s take a look at an example:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

The above code goes through The readability is too poor after multiple levels of nesting! If you use RxJava, you can write it like this:

Observable.from(folders)
    .flatMap(new Func1>() {
        @Override
        public Observable call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

The advantage of writing this way is to reduce the level of nesting and improve the readability of the code. In addition to simplifying the code, RxJava can also provide a specific running thread for each method.

3. Introduction of the framework

Currently RxJava has been upgraded to version 2.0, but in order to better understand RxJava, we can start learning from version 1.0. In order to make our Android project better able to use RxJava, we can introduce the gradle script dependency into the project:

compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'

Now our project already supports the functions of RxJava.

4. The core of responsiveness

The so-called responsiveness is nothing more than the existence of two parts, one part is responsible for sending events/messages, and the other part is responsible for responding to events/messages.

In the past, if we wanted to read the news, we usually had to read newspapers. For example, if you are interested in a newspaper or magazine, you must first do three things:

  1. Provide your home address

  2. Find the corresponding newspaper office

  3. Go to the newspaper office to subscribe to the newspaper for the entire month

After going through the above process, new newspaper materials will be released every day. , the newspaper will send the magazine to your home.

A detailed introduction to RxJava_01[What is RxJava]

Code abstraction of the above example, the steps are as follows:

  1. Provide observers (because you You are a person who cares about the content of the magazine, so you are the person who observes the event)

  2. # Provides the person being observed (as long as a new magazine comes out, the people who care about it need to be notified, so the newspaper office is under observation Object)

  3. Subscription (that is, the observer & the observed must be related to each other so that as soon as the observed object changes, the object observing the event will be notified immediately)

A detailed introduction to RxJava_01[What is RxJava]

The demonstration code of the above example is as follows:

//1.创建被观察者
Observable observable = 
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                //4.开始发送事件 
                //事件有3个类型 分别是onNext() onCompleted() onError()
                //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。
                subscriber.onNext("Hello Android !");
                subscriber.onNext("Hello Java !");
                subscriber.onNext("Hello C !");
                subscriber.onCompleted();
            }
        });

//2.创建观察者
Subscriber subscriber = new Subscriber() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//3.订阅
observable.subscribe(subscriber);

The output is as follows:

com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted

The principle of code operation

  • In the above code, when the observer subscriber subscribes to the observed observable, the system will automatically call back the call() inside the observable object.

  • In the call() method entity of observable, after sending events such as onNext/onCompleted/onError.

  • The subscriber can then call back to the corresponding method.

5. Observable variant

Ordinary Observable sending requires three methods onNext, onError, and onCompleted, while Single, as a variant of Observable, only requires two methods. :

  • onSuccess - Single emits a single value to this method

  • onError - If the required value cannot be emitted, Single emits a Throwable object to This method

Single will only call one of these two methods, and it will only be called once. After calling any method, the subscription relationship will be terminated.

final Single single = Single.create(new Single.OnSubscribe() {
            @Override
            public void call(SingleSubscriber singleSubscriber) {
                //先调用onNext() 最后调用onCompleted() 
                //singleSubscriber.onSuccess("Hello Android !");
                //只调用onError();
                singleSubscriber.onError(new NullPointerException("mock Exception !"));
            }
        });

Observer observer = new Observer() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};
single.subscribe(observer);

6. Observer variant

Observer observer object, above we use Subscriber object instead. Because the object itself inherits Observer.

This object implements onNext()&onCompleted()&onError() events. If we are more concerned about which event, we only need to implement the corresponding method. The code is as follows:

//创建观察者
Subscriber subscriber = new Subscriber() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//订阅
observable.subscribe(subscriber);

The above In the code, if you only care about the onNext() event, but have to implement onCompleted()&onError() events, such code will appear very bloated. In view of this demand, the RxJava framework has made specific adjustments in subscription. The code is as follows:

//为指定的onNext事件创建独立的接口
Action1 onNextAction = new Action1() {
    @Override
    public void call(String s) {
        Log.i(TAG, "call: "+s);
    }
};

//订阅
observable.subscribe(onNextAction);

I don’t know if you have noticed that subscribe() no longer subscribes to observers, but to a specific onNext interface. object. Similar functions are as follows, we can implement the corresponding subscription as needed:

  • public Subscription subscribe(final Observer observer)

  • public Subscription subscribe(final Action1 onNext)

  • public Subscription subscribe(final Action1 onNext, Action1 onError)

  • public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

这里还有一个forEach函数有类似的功能:

  • public void forEach(final Action1 onNext)

  • public void forEach(final Action1 onNext, Action1 onError)

  • public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

7.Subject变种

上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

以下贴出代码:

//创建被观察者
final AsyncSubject subject = AsyncSubject.create();
//创建观察者
Subscriber subscriber = new Subscriber() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "s:" + s);

    }
};
//订阅事件
subject.subscribe(subscriber);
//被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();

输出:

s:Hello Java 
onCompleted

然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
//因为发送了异常 所以onNext()无法被打印
subject.onError(null);

BehaviorSubject

当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:

BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext: " + o);
    }
};

//subject.onNext("Hello Android !");
//subject.onNext("Hello Java !");
//subject.onNext("Hello C !");
//这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效
//如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
subject.subscribe(subscriber);

subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");

PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

代码如下:

PublishSubject subject= PublishSubject.create();

Action1 onNextAction1 = new Action1(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction1 call: "+s);
    }
};

Action1 onNextAction2 = new Action1(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction2 call: "+s);
    }
};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

代码如下:

ReplaySubject subject= ReplaySubject.create();

Action1 onNextAction1 = new Action1(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction1 call: "+s);
    }
};

Action1 onNextAction2 = new Action1(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction2 call: "+s);
    }
};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !

Subject总结

  • AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()

  • BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()

  • PublishSubject只会打印订阅后的任何事件。

  • ReplaySubject无论订阅在何时都会调用发送的事件。

 以上就是深入浅出RxJava_01[什么是RxJava] 的详细介绍的内容,更多相关内容请关注PHP中文网(m.sbmmt.com)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn