• 技术文章 >Java >java教程

    深入浅出RxJava_06[传输过程的条件&组合操作]的详情

    黄舟黄舟2017-03-04 09:58:12原创733


    本教程基于RxJava1.x版本进行全面讲解,后续课程将陆续更新,敬请关注…

    下面的一系列函数都是用来判断传输的数据是否符合某些条件。

    1. all - 判定是否Observable发射的所有数据都满足某个条件,如果原始Observable的任何一项数据不满足条件就返回False

    2. Amb - 给定2个Observable,只发送那个首先发送数据的Observable,后者会被忽略掉。

    3. Contains - 判定一个Observable是否发射一个特定的值

    4. DefaultIfEmpty - 发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值

    5. SequenceEqual - 判定两个Observables是否发射相同的数据序列

    6. SkipUntil - SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable.

    7. SkipWhile - SkipWhile订阅原始的Observable,但是忽略它的发射物,直到你指定的某个条件变为false的那一刻,它开始发射原始Observable。

    8. TakeUntil - 使用一个谓词函数而不是第二个Observable来判定是否需要终止发射数据,它的行为类似于takeWhile。

    9. TakeWhile - TakeWhile发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。

    1.all

    判定是否Observable发射的所有数据都满足某个条件,如果原始Observable的任何一项数据不满足条件就返回False

    如有群学生信息,数据如下:

    private ArrayList<Student> initStudents() {
        ArrayList<Student> persons = new ArrayList<>();
        persons.add(new Student("张三", 16));
        persons.add(new Student("李四", 21));
        persons.add(new Student("王二麻子", 18));
        return persons;
    }

    入学的时候要求岁数不大于20岁才能读书,这样,就可以使用all函数来判断。

    Observable<Student> observable = Observable.from(initStudents());
    Observable<Boolean> booleanObservable = 
                    observable.all(new Func1<Student, Boolean>() {
                        //设置判断规则
                        @Override
                        public Boolean call(Student student) {
                            return student.age<20;
                        }
                    });
    booleanObservable.subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            //这里打印了false  因为学生里面李四21岁
            Log.i(TAG, "call: "+aBoolean);
        }
    });

    2.Amb

    给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据,Amb将忽略和丢弃其它所有Observables的发射物。

    Observable<String> o1 = Observable.just("a", "b", "c");
    Observable<String> o2 = Observable.just("d", "f", "e");
    
    //判断上面的2个被观察者对象哪个先发送数据 发送数据慢的被观察者将被忽略
    Observable<String> observable = Observable.amb(o2,o1);
    
    observable.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: "+s);
        }
    });

    3.Contains

    给Contains传一个指定的值,如果原始Observable发射了那个值,它返回的Observable将发射true,否则发射false。

    Observable<String> o1 = Observable.just("a", "b", "c");
    Observable<Boolean> observable = o1.contains("a");
    
    observable.subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            //true
            Log.i(TAG, "call: "+aBoolean);
        }
    }

    4.DefaultIfEmpty

    DefaultIfEmpty简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompletedd的形式),DefaultIfEmpty返回的Observable就发射一个你提供的默认值。

    Observable<String> o1 = Observable
            .create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    //这里没有发送任何onNext事件,而是调用onCompleted()事件
                    subscriber.onCompleted();
                }
            });
        //如果上面的o1不发送任何onNext而结束 则发送一个默认的值d
        Observable<String> o2 = o1.defaultIfEmpty("d");
        o2.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                //打印d
                Log.i(TAG, "call: "+s);
            }
        });

    比如在被观察者访问网络请求的时候,后台服务器没有返回返回数据,可以使用一个默认的数据代替。

    5.SequenceEqual

    判定两个Observables是否发射相同的数据序列,它会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false。

    Observable<Integer> o1 = Observable.just(1,2,3);
    Observable<Integer> o2 = Observable.just(1,2,3);
    Observable<Boolean> observable = Observable.sequenceEqual(o1, o2);
    
    observable.subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            Log.i(TAG, "call: "+aBoolean);
        }
    });

    6.1 SkipUntil

    SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。

    Observable<Integer> o1 = Observable.just(1,2,3);
    Observable<Integer> o2 = o1.skipUntil(Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            //如果不发送如下信息  则上面的1,2,3都不会发送
            //如果发送了4 则直接发送1,2,3 但是4 5并没有发送
            subscriber.onNext(4);
            subscriber.onNext(5);
        }
    }));
    
    o2.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer value) {
            Log.i(TAG, "call: "+value);
        }
    });

    输出:

    call: 1
    call: 2
    call: 3

    6.2 SkipUntil

    SkipWhile订阅原始的Observable,但是忽略它的发射物,直到你指定的某个条件变为false的那一刻,它开始发射原始Observable。

    Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
    Observable<Integer> o2 = o1.skipWhile(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer value) {
            //直到发送的数据"等于"4 才开始从4发送数据 这里打印4,5,6
            return value!=4;
        }
    });
    
    o2.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer value) {
            Log.i(TAG, "call: "+value);
        }
    });

    输出:

    call: 4
    call: 5
    call: 6

    7.1 TakeUntil

    使用一个takeUntil内部的函数来决定是否终止后面的数据继续发送。代码如下:

    Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
    Observable<Integer> o2 = o1.takeUntil(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer value) {
            //发送数据直到等于4才停止发送
            return value==4;
        }
    });
    
    o2.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer value) {
            Log.i(TAG, "call: "+value);
        }
    });

    上面的代码类似于:

    ArrayList<Integer> datas=new ArrayList<>();
    for (int i = 1; i <= 6 ; i++) {
        datas.add(i);
    }
    int index=0;
    while(datas.get(index)<=4){
        Log.i(TAG, "onCreate: "+datas.get(index));
        index++;
    }

    7.2 TakeWhile

    TakeWhile发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。

    Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
    Observable<Integer> o2 = o1.takeWhile(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            //发送数据 直到发送的数据等于才停止发送
            return integer!=5;
        }
    });
    
    o2.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer value) {
            Log.i(TAG, "call: "+value);
        }
    });

    上面的代码打印出了1~4,类似于如下的while语句:

    ArrayList<Integer> datas=new ArrayList<>();
    for (int i = 1; i <= 6 ; i++) {
        datas.add(i);
    }
    int index=0;
    while(datas.get(index)!=5){
        Log.i(TAG, "onCreate: "+datas.get(index));
        index++;
    }

    下面展示的函数可用于组合多个Observables。

    1. Merge - 交错合并多个Observables的发射物

    2. StartWith - 在数据序列的开头插入指定的项

    3. Zip - 根据函数提供的规则合并两个observable的发射数据,发射数据的总数以发送最小的observable发射个数为准。

    8.Merge

    合并多个Observables的发射物。使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。

    Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

    示例代码

    Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
    Observable<Integer> evens = Observable.just(2, 4, 6);
    
    Observable.merge(odds, evens)
              .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });

    输出

    Next: 1
    Next: 3
    Next: 5
    Next: 2
    Next: 4
    Next: 6
    Sequence complete.

    9.StartWith

    在数据序列的开头插入指定的项

    如果你想要一个Observable在发射数据之前先发射一个指定的数据序列,可以使用StartWith操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用Concat操作符。)

    Observable<String> o1 = Observable.just("模拟网络请求");
    Observable<String> o2 = o1.startWith("网络请求之前的准备工作");
    o2.subscribe(new Action1<String>() {
        @Override
        public void call(String value) {
            Log.i(TAG, value);
        }
    });

    输出:

    网络请求之前的准备工作
    模拟网络请求

    类似的函数有:

    10.Zip

    假设被观察者o1发送的数据为1 2 ,被观察者o2发送的数据为3,4,5。那么zip压缩发送的数据个数以最低个也就是2个为准。并且 新发送的每个数据以Func2的返回数据为准。

    示例代码如下:

    Observable<Integer> o1 = Observable.just(1,2);
    Observable<Integer> o2 = Observable.just(3,4,5);
    
    Observable<Integer> zipObservable = Observable
            .zip(o1, o2, new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer o, Integer o2) {
                    return o + o2;
                }
            });
    
    zipObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer s) {
            Log.i(TAG, "call: "+s);
        }
    });

    输出如下:

    call: 4 // 1+3
    call: 6   // 2+4


    以上就是深入浅出RxJava_06[传输过程的条件&组合操作]的详情的内容,更多相关内容请关注PHP中文网(m.sbmmt.com)!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:深入浅出RxJava_05[转换操作&数学运算]的代码详细介绍 下一篇:深入浅出RxJava_07[多线程&辅助操作(完)]
    PHP编程就业班

    相关文章推荐

    • 完全掌握java异常处理机制原理和应用• 一起聊聊JAVA中字符串和数组做参数传递• 一起聊聊Java常用数据类型的输入输出• 详细解析Java反射机制原理和几种Class获取方式• 归纳整理Java并发知识点

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网