Subsribler在RxJava内里是一个抽象类,它实现了Observer接口。
public interface Observer<T> { void onCompleted(); void onError(Throwable throwable); void onNext(T value);}public abstract class Subscriber<T> implements Observer<T>{ public void onStart(){ }}二、构建被观察者
Observable(被观察者)拥有很多工厂方法和各式各样的利用符。每个Observable内里都维护了一个OnSubscribe对象,并通过subscribe()内里的call(Subscriber<? super T> subscriber)方法与观察者产生接洽。
public class Observable<T> { final OnSubscribe<T> onSubscribe; private Observable(OnSubscribe<T> onSubscribe){ this.onSubscribe = onSubscribe; } public static <T> Observable<T> create(OnSubscribe<T> onSubscribe){ return new Observable<T>(onSubscribe); } public void subscribe(Subscriber<T> subscriber){ subscriber.onStart(); onSubscribe.call(subscriber); } public interface OnSubscribe<T>{ void call(Subscriber<? super T> subscriber); }}三、RxJava的变乱流雏形产生
通过上面写的观察者和被观察者,即可写出一个没有利用符和线程切换功能的浅易版Rxjava。
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i = 0; i < 10; i++){ subscriber.onNext(i); } } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(Integer value) { System.out.println("Result: "+value); } });通过Observable.create将OnSubscribe的匿名类传给Observable,在subscribe()时回调OnSubscribe接口中的call方法,同时call方法参数即为subscribe的参数,即观察者,因此继续回调subscriber.onNext()即可完成观察者里的逻辑。
效果如下:
四、玩转RxJava里的利用符
RxJava之以是强盛好用,与其拥有丰富灵活的利用符是分不开的。那么我们就试着为这个框架添加一个最常用的利用符:map。先看代码:
public <R> Observable<R> map(final Fun1<T, R> transformer){ return create(new OnSubscribe<R>() { @Override public void call(final Subscriber<? super R> subscriber) { Observable.this.onSubscribe.call(new Subscriber<T>() { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onNext(T value) { subscriber.onNext(transformer.transfer(value)); } }); } }); } public interface Fun1<T, R>{ R transfer(T from); }测试代码
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i = 0; i < 10; i++){ subscriber.onNext(i); } } }).map(new Observable.Fun1<Integer, String>() { @Override public String transfer(Integer from) { return String.valueOf(from)+"_Map"; } } ).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(String value) { System.out.println("Result: "+value); } });效果如下:
public class Scheduler { private final static Scheduler ioScheduler = new Scheduler(Executors.newSingleThreadExecutor()); Executor executor; public Scheduler(Executor executor){ this.executor = executor; } public Worker createWorker(){ return new Worker(executor); } public static class Worker { Executor executor1; public Worker(Executor executor1){ this.executor1 = executor1; } public void schedule(Runnable runnable){ executor1.execute(runnable); } } public static Scheduler io(){ return ioScheduler; }}具体的Scheduler的实现类就不看了,但我们须要知道,能做到线程切换的关键是Worker的schedule方法,由于它会把传过来的任务放入线程池,并在新线程中实行。
5.2 实现observeOn
observeOn是作用于下层Subscriber的,须要让下层Subscriber的变乱处置惩罚方法放到新线程中实行。为此,在Observable类内里,添加如下代码:
public Observable<T> observeOn(final Scheduler scheduler){ return create(new OnSubscribe<T>() { @Override public void call(final Subscriber<? super T> subscriber) { subscriber.onStart(); final Scheduler.Worker worker = scheduler.createWorker(); Observable.this.onSubscribe.call(new Subscriber<T>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(final T value) { worker.schedule(new Runnable() { @Override public void run() { subscriber.onNext(value); } }); } }); } }); }测试代码如下:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for(int i = 0; i < 10; i++){ subscriber.onNext(i); } } }).map(new Observable.Fun1<Integer, String>() { @Override public String transfer(Integer from) { return String.valueOf(from)+"_Map"; } } ).observeOn(Scheduler.io()).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(String value) { System.out.println("Result: "+Thread.currentThread().getName()); } });效果如下:
5.3 实现subscribeOn
subscribeOn是作用于上层OnSubscribe的,可以让OnSubscribe的call方法在新线程中实行。
因此,在Observable类内里,添加如下代码:
public Observable<T> subscribeOn(final Scheduler scheduler){ return create(new OnSubscribe<T>() { @Override public void call(final Subscriber<? super T> subscriber) { scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Observable.this.onSubscribe.call(subscriber); } }); } }); }测试代码如下:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { System.out.println("Observable thread: "+Thread.currentThread().getName()); for(int i = 0; i < 10; i++){ subscriber.onNext(i); } } }).map(new Observable.Fun1<Integer, String>() { @Override public String transfer(Integer from) { System.out.println("Map Observable thread: "+Thread.currentThread().getName()); return String.valueOf(from)+"_Map"; } } ).observeOn(Scheduler.io()).subscribeOn(Scheduler.io()).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(String value) {// System.out.println("Result: "+Thread.currentThread().getName()); } });效果如下: