手写RxJava浅易框架领悟RxJava的美秒

源代码 2024-9-4 16:30:00 75 0 来自 中国
RxJava条记
前言

看此篇之前最好知道RxJava的利用。由于RxJava内部源码实现有点复杂,既然用拆轮子的方式来分析源码比力难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,只留下焦点代码,加上我个人的明白,带各人揭秘RxJava的实现原理(本文不涉及框架的利用先容)。
一、构建观察者类

Subsribler在RxJava内里是一个抽象类,它实现了Observer接口。
public interface Observer<T> {    void onCompleted();    void onError(Throwable throwable);    void onNext(T value);}public abstract class Subscriber<T> implements Observer<T>{    public void onStart(){    }}二、构建被观察者

Observable(被观察者)拥有很多工厂方法和各式各样的利用符。每个Observable内里都维护了一个OnSubscribe对象,并通过subscribe()内里的call(Subscriber<? super T> subscriber)方法与观察者产生接洽。
public class Observable<T> {    final OnSubscribe<T> onSubscribe;    private  Observable(OnSubscribe<T> onSubscribe){        this.onSubscribe = onSubscribe;    }    public static <T> Observable<T> create(OnSubscribe<T> onSubscribe){        return new Observable<T>(onSubscribe);    }    public void subscribe(Subscriber<T> subscriber){        subscriber.onStart();        onSubscribe.call(subscriber);    }    public interface OnSubscribe<T>{        void call(Subscriber<? super T> subscriber);    }}三、RxJava的变乱流雏形产生

通过上面写的观察者和被观察者,即可写出一个没有利用符和线程切换功能的浅易版Rxjava。
Observable.create(new Observable.OnSubscribe<Integer>() {            @Override            public void call(Subscriber<? super Integer> subscriber) {                for(int i = 0; i < 10; i++){                    subscriber.onNext(i);                }            }        }).subscribe(new Subscriber<Integer>() {            @Override            public void onCompleted() {            }            @Override            public void onError(Throwable throwable) {            }            @Override            public void onNext(Integer value) {                System.out.println("Result: "+value);            }        });通过Observable.create将OnSubscribe的匿名类传给Observable,在subscribe()时回调OnSubscribe接口中的call方法,同时call方法参数即为subscribe的参数,即观察者,因此继续回调subscriber.onNext()即可完成观察者里的逻辑。
效果如下:
1.png 四、玩转RxJava里的利用符

RxJava之以是强盛好用,与其拥有丰富灵活的利用符是分不开的。那么我们就试着为这个框架添加一个最常用的利用符:map。先看代码:
    public <R> Observable<R> map(final Fun1<T, R> transformer){        return create(new OnSubscribe<R>() {            @Override            public void call(final Subscriber<? super R> subscriber) {                Observable.this.onSubscribe.call(new Subscriber<T>() {                    @Override                    public void onCompleted() {                        subscriber.onCompleted();                    }                    @Override                    public void onError(Throwable throwable) {                        subscriber.onError(throwable);                    }                    @Override                    public void onNext(T value) {                        subscriber.onNext(transformer.transfer(value));                    }                });            }        });    }    public interface Fun1<T, R>{        R transfer(T from);    }测试代码
Observable.create(new Observable.OnSubscribe<Integer>() {            @Override            public void call(Subscriber<? super Integer> subscriber) {                for(int i = 0; i < 10; i++){                    subscriber.onNext(i);                }            }        }).map(new Observable.Fun1<Integer, String>() {             @Override             public String transfer(Integer from) {                 return String.valueOf(from)+"_Map";             }         }        ).subscribe(new Subscriber<String>() {            @Override            public void onCompleted() {            }            @Override            public void onError(Throwable throwable) {            }            @Override            public void onNext(String value) {                System.out.println("Result: "+value);            }        });效果如下:



  • 实在RxJava每调用一次利用符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的Observable。桥接的Observable内部会实例化新的OnSuscribe和Subscriber。
  • 新建的OnSuscribe的call方法负责持有目的Subscriber,此时就可以回调subscriber的方法来完成观察的活动了。但是这是还没有数据源,想要得到数据源必须调用源Observable.OnSubscribe的subscribe方法,传入一个新的Subscriber,如许就可以在它的onNext()方法中得到数据源,并颠末传入的接口处置惩罚后,发送给终极的Subscriber。
总体来说就是源Observable.OnSubscribe将Event往下发送给桥接Observable.Subscriber,终极桥接Observable.Subscriber将Event做相应处置惩罚后转发给目的Subscriber。
五、RxJava里的线程切换

RxJava中最冲动民气的功能是异步处置惩罚,可以或许自若地切换线程。
利用subscribeOn() 联合observeOn() 来实现线程控制,让变乱的产生和斲丧发生在差别的线程。 observeOn() 可以多次调用,Subscriber的实行线程与末了一次observeOn()的调用有关。但subscribeOn() 多次调用只有第一个subscribeOn() 起作用。
这是由于 observeOn() 作用的是Subscriber,而subscribeOn() 作用的是OnSubscribe,这时变乱还没开始发送,因此subscribeOn()的线程控制可以从变乱发出的开端就造成影响。
线程调治除了桥接Observable以外,RxJava还用到一个很关键的类Scheduler(调治器)。
5.1 Scheduler焦点代码如下:

public class Scheduler {    private final static Scheduler ioScheduler            = new Scheduler(Executors.newSingleThreadExecutor());    Executor executor;    public Scheduler(Executor executor){        this.executor = executor;    }    public Worker createWorker(){        return new Worker(executor);    }    public static class Worker {        Executor executor1;        public Worker(Executor executor1){            this.executor1 = executor1;        }        public void schedule(Runnable runnable){            executor1.execute(runnable);        }    }    public static Scheduler io(){        return ioScheduler;    }}具体的Scheduler的实现类就不看了,但我们须要知道,能做到线程切换的关键是Worker的schedule方法,由于它会把传过来的任务放入线程池,并在新线程中实行。
5.2  实现observeOn

observeOn是作用于下层Subscriber的,须要让下层Subscriber的变乱处置惩罚方法放到新线程中实行。为此,在Observable类内里,添加如下代码:
public Observable<T> observeOn(final Scheduler scheduler){        return create(new OnSubscribe<T>() {            @Override            public void call(final Subscriber<? super T> subscriber) {                subscriber.onStart();                final Scheduler.Worker worker = scheduler.createWorker();                Observable.this.onSubscribe.call(new Subscriber<T>() {                    @Override                    public void onCompleted() {                    }                    @Override                    public void onError(Throwable throwable) {                    }                    @Override                    public void onNext(final T value) {                        worker.schedule(new Runnable() {                            @Override                            public void run() {                                subscriber.onNext(value);                            }                        });                    }                });            }        });    }测试代码如下:
Observable.create(new Observable.OnSubscribe<Integer>() {            @Override            public void call(Subscriber<? super Integer> subscriber) {                for(int i = 0; i < 10; i++){                    subscriber.onNext(i);                }            }        }).map(new Observable.Fun1<Integer, String>() {                   @Override                   public String transfer(Integer from) {                       return String.valueOf(from)+"_Map";                   }               }        ).observeOn(Scheduler.io()).subscribe(new Subscriber<String>() {            @Override            public void onCompleted() {            }            @Override            public void onError(Throwable throwable) {            }            @Override            public void onNext(String value) {                System.out.println("Result: "+Thread.currentThread().getName());            }        });效果如下:
3.png 5.3 实现subscribeOn

subscribeOn是作用于上层OnSubscribe的,可以让OnSubscribe的call方法在新线程中实行。
因此,在Observable类内里,添加如下代码:
public Observable<T> subscribeOn(final Scheduler scheduler){        return create(new OnSubscribe<T>() {            @Override            public void call(final Subscriber<? super T> subscriber) {                scheduler.createWorker().schedule(new Runnable() {                    @Override                    public void run() {                        Observable.this.onSubscribe.call(subscriber);                    }                });            }        });    }测试代码如下:
Observable.create(new Observable.OnSubscribe<Integer>() {            @Override            public void call(Subscriber<? super Integer> subscriber) {                System.out.println("Observable thread: "+Thread.currentThread().getName());                for(int i = 0; i < 10; i++){                    subscriber.onNext(i);                }            }        }).map(new Observable.Fun1<Integer, String>() {                   @Override                   public String transfer(Integer from) {                       System.out.println("Map Observable thread: "+Thread.currentThread().getName());                       return String.valueOf(from)+"_Map";                   }               }        ).observeOn(Scheduler.io()).subscribeOn(Scheduler.io()).subscribe(new Subscriber<String>() {            @Override            public void onCompleted() {            }            @Override            public void onError(Throwable throwable) {            }            @Override            public void onNext(String value) {//                System.out.println("Result: "+Thread.currentThread().getName());            }        });效果如下:

六、总结

信任看RxJava这个浅易版的计划对各人的启示,比网上的一些源码剖析清楚的多,希望可以抛砖引玉。有时间我们总是以为看几篇博文貌似其时就懂了明白了,但是这种明白大概说影象貌似不长期。过了一段时间总是还给博主了。学习照旧得深入源码,从源码中学习,然后在联合其他人的博客查漏补缺,如许才是本身的东西。各人有爱好可以把flatMap等其他利用符来本身实现一下。
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-11-22 01:07, Processed in 0.157779 second(s), 35 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表