RxJava简介

RxJava里的Rx是Reactive Extensions的缩写,起源于.NET。它是一种编程思想,巧妙地结合了观察者模式以及函数式编程,使得异步操作的代码写起来简单而清晰。现在有越来越多的语言实现了这种思想,比如RxJava、RxJS、RxSwift等等。这篇文章就基于RxJava,探究一下线程调度的实现。

RxJava中数据源被表示成Observable,它们会产生一个个数据或者说事件。你可以通过Subscriber来订阅或者说监听这些事件。一个简单的示例:

Observable.from(new String[]{"a","b","c"})
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String s) {

                }
            });

在这个例子中,Observable会读取给定String数组中的每一个元素并调用SubscriberonNext方法,最后调用SubscriberonCompleted。 可以看到Subscriber有三个回调方法:onNext可以订阅Observable产生的每一个事件;如果一切顺利,当所有事件都产生完后,onCompleted会被调用;如果中途发生错误,则onError会被调用,并且结束此次订阅。onCompletedonError不会同时被调用。

当然,我们可以自定义事件产生的方式和时机。Observable提供了一个静态方法create(OnSubscribe<T>),参数OnSubscribe是一个接口,我们需要实现其中的call。例如把上面的例子改造一下:

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("a");
                subscriber.onNext("b");
                subscriber.onNext("c");
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        });

可以看到在OnSubscribecall里调用了三次subscriber.onNext,分别传入”a”、”b”和”c”,最后调用subscriber.onCompleted。这个例子的效果和上面的例子是一样的。

一个使用RxJava的例子

假设现在有一个这样的需求:app要向服务器发送两个请求,服务器的返回的结果要么有效要么无效,而我们只需要取其中一个有效的结果就行。也就是说,如果第一个请求返回的结果是有效的,我们就可以直接使用它而忽略第二个请求的返回结果;如果第一个结果是无效的,那就需要等待第二个请求的结果。

如果不用RxJava,可能这样实现这个需求:

boolean gotOne;
new TestTask1().execute();
new TestTask2().execute();


class TestTask1 extends AsyncTask<Void, Void, String> {

    @Override
    protected String doInBackground(Void... params) {
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "1";
    }

    @Override
    protected void onPostExecute(String s) {
        super.onPostExecute(s);
        if (!gotOne && !TextUtils.isEmpty(s)) {
            Log.i("Test", "get:"+s);
            gotOne = true;
        }
    }
}

class TestTask2 extends AsyncTask<Void, Void, String> {

    @Override
    protected String doInBackground(Void... params) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "2";
    }

    @Override
    protected void onPostExecute(String s) {
        super.onPostExecute(s);
        if (!gotOne && !TextUtils.isEmpty(s)) {
            Log.i("Test", "get:"+s);
            gotOne = true;
        }
    }
}

这里使用两个AsyncTask来发送两个请求。在每个doInBackground方法里用sleep来表示网络请求的延迟。假设服务器会返回一个字符串,字符串非空表示结果有效。为了实现只取其中一个有效结果的需求,需要一个bool变量gotOne来记录是否已经得到了一个有效结果。

再看看用RxJava可以怎么实现:

Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        subscriber.onNext("1");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        subscriber.onNext("2");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io());

Observable.merge(observable1, observable2)
        .filter(new Func1<String, Boolean>() { //过滤掉无效结果
            @Override
            public Boolean call(String s) {
                return s != null;
            }
        })
        .take(1) // 只取第一个有效结果
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.i("Test", "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("Test", "onError:"+e.getMessage());
            }

            @Override
            public void onNext(String s) {
                Log.i("Test", "onNext"+s);
            }
        });

使用两个Observable来进行两个网络请求。在链式调用中使用了一些操作符来实现需求:首先merge两个Observable表示要发送两个网络请求,然后通过filter过滤掉无效结果,再用take(1)取出第一个有效结果。注意到这里还使用了后面将分析到的两个线程调度的方法:创建Observable的时候使用subscribeOn(Schedulers.io())表示在子线程中进行网络请求;后面调用observeOn(AndroidSchedulers.mainThread())表示在主线程中处理请求结果。

乍一看,两种方法的代码量似乎没有太大差别。当然使用RxJava并不是说一定能减少代码量,但是它可以使代码逻辑变得简洁清晰。而且在需求改变的时候,我们可以快速进行修改。比如说现在需求变成从n个请求里取出前k个有效的结果。使用AsyncTask的方法需要将bool变量改成一个计数器;而用RxJava的话只需把take的参数改成k。如果难度再次升级,现在需要所有请求都在同一个线程中进行。使用AsyncTask的方法就要把所有发送请求的代码放到同一个doInBackground里,而且每得到一个结果我们还要想办法去通知主线程,可想而知改动的代码很多;用RxJava只需把创建Observable的时候调用的subscribeOn(Schedulers.io())移到下面的链式调用里即可,一切就是这么简单!

线程调度

RxJava中有两个方法跟线程调度有关,分别是subscribeOnobserveOn. 在分析这两个方法之前,先来看一下最简单的一个事件产生-订阅的流程是怎样的。代码如下:

Observable.create(new Observable.OnSubscribe<Object>(){/*...*/})
          .subscribe(new Subscriber<Object>(){/*...*/});

Observable.create非常简单,直接返回一个Observable对象:

public static <T> Observable<T> create(OnSubscribe<T> f) {
     return new Observable<T>(hook.onCreate(f));
}

这里的hook定义为:

static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

RxJavaObservableExecutionHook的JavaDoc解释是

Abstract ExecutionHook with invocations at different lifecycle points of Observable execution with a default no-op implementation.

也就是说在Observable执行期间的某些时间点会调用这个ExecutionHook的回调函数,你可以在这些回调函数里做一些事情。默认的实现是什么都不做。因此hook.onCreate(f)直接返回f.

再看一下Observable的构造函数:

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

也很简单,将成员变量onSubscribe指向给定的OnSubscribe实现。

接着看一下subscribe函数做了什么事,这里将相关代码进行了简化,把最关键的几处提取了出来:

subscriber.onStart();
        
if (!(subscriber instanceof SafeSubscriber)) {
    subscriber = new SafeSubscriber<T>(subscriber);
}

try {
    // allow the hook to intercept and/or decorate
    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
    return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {/*...*/}

首先调用SubscriberonStart方法,你可以在这里做一些准备操作。然后将给定的Subscriber封装成SafeSubscriber以保证在整个订阅过程都能遵守Rx的规范。下面的hook.onSubscribeStart在默认情况下仍然是什么都不做而直接返回observable.onSubscribe,因此最后observable.onSubscribecall被调用。前面已经看到了在call中可以决定什么时候产生事件并通知Subscriber

subscribeOn

subscribeOn可以决定事件在哪个线程产生。看一下函数的实现:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

函数接收一个参数Scheduler. 这个Scheduler就是实现线程调度的关键类。它里面有一个抽象方法:

public abstract Worker createWorker();

这个方法返回一个Worker类对象。Worker的解释是

Sequential Scheduler for executing actions on a single thread or event loop.

也就是说Scheduler将线程调度交给了具体的Worker。在Worker中有两个关键方法:

public abstract Subscription schedule(Action0 action);
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);

在这两个方法的具体实现中就可以决定在哪个线程去执行action。RxJava提供了一系列接口ActionN(N取值从0到9)。这些接口里都只有一个返回值为void的方法call,而call所接受的参数个数就是由N确定。比如这里的Action0就表示没有参数。与ActionN相对应有一个系列接口叫FuncN<R>,它也只有一个函数call,所不同的是call会返回类型为R的返回值。

RxJava提供了一个工厂类Schedulers,它里面有一些静态方法返回可以满足不同需求的Scheduler。在上面的例子中就用到了Schedulers.io()。这个方法返回的Scheduler维护了一个线程池。Woker提交的Action,有可能是在一个新的线程中执行,也有可能在线程池中某一个空闲线程中执行。

回到subscribeOn的函数体。通常的Observable都不会是ScalarSynchronousObservable,所以跳过这个if语句直接看接下来的返回语句。调用create函数产生一个新的Observable。之前已经看到create接受的参数是OnSubscribe接口的实现,在这里就是OperatorSubscribeOn。看一下它的源码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                
                source.unsafeSubscribe(s);
            }
        });
    }
}

在构造函数里会记下传进来的Scheduler以及原来的Observable。然后是call方法。所做的事情很简单,通过给定的SchedulerWorker提交一个Action0。在Action0里,把传入的Subscriber封装到一个新的Subscriber里,然后调用原ObservableunsafeSubscribe方法去产生事件并通知Subscriber。从代码中可以看到这个封装的Subscriber做的事情也很简单,就是调用原Subscriber相应的方法。有一个较为复杂的方法是setProducer,这个函数的作用涉及到一个概念叫Backpressure,不在这篇文章里详述。通过代码可以看到重写这个方法也是为了让操作能在指定的线程执行。

subscribeOn所做的事情可以用下面一张图进行概括:

subscribeOn

observeOn

observeOn可以决定在哪个线程消费事件。Observable重载了多个observeOn方法,最终都是调用下面这个:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

直接看return语句,这里调用了lift方法返回一个新的Observablelift在RxJava中是一个比较关键的方法,很多操作符都是通过它实现。所以先看一下lift的实现。

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

lift接受一个参数OperatorOperator的定义为:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {}

它是一个接口继承自Func1<Subscriber<? super R>, Subscriber<? super T>>,也就是说它里面有一个call方法,参数是一个Subscriber,最后会返回一个Subscriber

回到lift,方法体非常简单,直接返回一个新的Observable。新ObservableonSubscribe是一个OnSubscribeLift对象,并且原ObservableonSubscribe以及lift的参数operator成为了OnSubscribeLift的构造函数的参数。接下来就看一下OnSubscribeLift的实现。

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                revents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

call方法里,首先调用operator.call将传入的Subscriber对象o转换成一个新的Subscriber对象st。然后调用parent.call(st),这条语句的效果相当于让新的Subscriber订阅原Observable

lift的主要功能可以总结如下:

  1. 生成一个新Observable,记下原ObservableonSubscribe
  2. 当一个Subscriber订阅这个新Observable时,先通过给定的OperatorSubscriber转换成一个新的Subscriber
  3. 用新的Subscriber去订阅原Observable

现在回到observeOn,看一下它的return语句

return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));

这里的OperatorObserveOn就是Operator的具体实现,它的call方法如下:

public Subscriber<? super T> call(Subscriber<? super T> child) {
    if (scheduler instanceof ImmediateScheduler) {
        // avoid overhead, execute directly
        return child;
    } else if (scheduler instanceof TrampolineScheduler) {
        // avoid overhead, execute directly
        return child;
    } else {
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
        parent.init();
        return parent;
    }
}

如果提供的Scheduler既不是ImmediateScheduler也不是TrampolineScheduler的话,则返回ObserveOnSubscriber对象,否则直接返回原Subscriber

再来看一下ObserveOnSubscriber的实现,这里只摘取关键代码。

private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
    final Subscriber<? super T> child;
    final Scheduler.Worker recursiveScheduler;
    ......

    public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
        this.child = child;
        this.recursiveScheduler = scheduler.createWorker();
        ......
    }

    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        if (!queue.offer(on.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        schedule();
    }

    @Override
    public void onCompleted() {
        if (isUnsubscribed() || finished) {
            return;
        }
        finished = true;
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        if (isUnsubscribed() || finished) {
            RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
            return;
        }
        error = e;
        finished = true;
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    ......
}

首先看一下构造函数。成员变量child指向原SubscriberrecursiveScheduler指向给定SchedulerWorker。 再看一下onNextonError以及onComplete这三个函数,注意到最后它们都调用了schedule方法。而schedule也很简单, 通过recursiveScheduler提交一个Action0。由于OperatorObserveOn实现了Action0,所以这里就是OperatorObserveOn对象本身。 在它的call方法里会调用原SubscriberonNextonError或者onComplete,具体代码在这就不作详述。 这个时候原Subscriber相应的那些方法已经是在Worker指定的线程里执行了,因此实现了线程调度的效果。

总结

这篇文章通过分析subscribeOnobserveOn两个主要函数的源码讲解了RxJava的线程调度。其实RxJava还是很博大精深的,可讲解的内容有很多,例如各种操作符的使用、backpressure策略、并发处理等等。我也只是刚刚入门,还需要花时间继续研究。希望在后续的文章中可以带来更多RxJava的分析。