Maison > Java > javaDidacticiel > Cas d'utilisation de base de RxJava partageant l'analyse du code source [avec code]

Cas d'utilisation de base de RxJava partageant l'analyse du code source [avec code]

php是最好的语言
Libérer: 2018-07-24 15:02:23
original
1537 Les gens l'ont consulté

Cet article détaille l'utilisation de base de RxJava. RxJava est un framework magique avec une utilisation très simple, mais l'implémentation interne est un peu compliquée et la logique du code est un peu alambiquée. Les articles en ligne sur l'analyse du code source de RxJava et les publications sur le code source sont peu nombreux et incomplets. L'analyse complète du code source est répertoriée ci-dessous à titre de référence.

1. Utilisation de base de RxJava

  Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {

            }
        });
Copier après la connexion

2. Regardez d'abord la méthode .subscribe(new Observer())

 public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }
Copier après la connexion

Voici. juste le passage Les paramètres de l'observateur entrant sont simplement encapsulés (ObserverableSubscriber)

Continuez à regarder la méthode d'abonnement

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
Copier après la connexion
 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that&#39;s not the appropriate approach
             * so I won&#39;t mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can&#39;t listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren&#39;t we throwing the hook&#39;s return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }
Copier après la connexion

Il vous suffit de faire attention ici

  RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
Copier après la connexion
    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }
Copier après la connexion
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 
这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象,
所以在subscribe 方法里面调用了  onSubscribe.call(subscriber)方法
Copier après la connexion

L'abonné ici est le paramètre passé dans

protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
Copier après la connexion
public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
Copier après la connexion

On peut voir que l'objet onSubscribe est le paramètre passé par create, puis tout le processus est très clair

Seulement L'ensemble du processus ne sera exécuté qu'après l'appel de la méthode d'abonnement : Subscribe ===> Appelez la méthode onSubscribe.call(observer) et transmettez également l'observateur dans

Recommandations associées :

Tutoriel RxJava Operator (8) Aggregate_PHP

Vidéo : Tutoriel vidéo sur l'amélioration de base de JavaScript

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal