【Android】异步任务调度库 AsyncTask 源码解析

AsyncTask 是 Android SDK 中提供的一个用于执行异步任务的框架,在 Android 兴起的早期被广泛使用,但如今已经被 RxJava、协程等新兴框架所取代。虽然它存在着一些不足,但我们还是可以尝试了解一下它的实现原理以及存在的不足。

功能概述

首先,让我们来简单地了解一下它的设计初衷:

AsyncTask 的设计初衷是能够帮助用户方便地完成异步任务的线程调度,它对用户提供了如下的几个接口:

public abstract class AsyncTask<Params, Progress, Result> {
        @WorkerThread
    protected abstract Result doInBackground(Params... params);

    @MainThread
    protected void onPreExecute() {
    }

    @MainThread
    protected void onPostExecute(Result result) {
    }

    @MainThread
    protected void onProgressUpdate(Progress... values) {
    }

    @MainThread
    protected void onCancelled(Result result) {
        onCancelled();
    }    
}

首先看到它的三个范型参数:Params 代表了传递给它的参数类型,Progress 代表了用于反馈进度的数据类型,而 Result 则代表了异步执行的结果类型。

用户可以通过实现 doInBackground 方法来编写在异步线程所要进行的处理,并且通过 onPostExecute 方法的重写实现对异步请求结果的获取。并且,用户可以通过 onPreExecute 实现了在 doInBackground 之前进行一些预处理,并且可以通过 onProgressUpdate 实现对进度的监听。以及通过 onCancelled 实现对任务中断的监听。

当我们编写一个 AsyncTask 后,只需要调用它的 execute 方法即可,而背后的线程调度的过程都会由它替我们完成,看上去是十分美好的。

同时从上面的代码中可以看到,每个方法都有被类似 @MainThread 的注解标注,这些注解主要的作用是用来标注这个方法运行所处的线程。可以看出,只有 doInBackground 是在异步线程进行执行。

创建

接着,让我们看看它的创建过程,我们来到它的构造函数:

public AsyncTask() {
        this((Looper) null);
}

它的无参构造函数转调到了它的有参构造函数,这个构造函数需要以 Looper 作为一个参数:

public AsyncTask(@Nullable Looper callbackLooper) {
    mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
        ? getMainHandler()
        : new Handler(callbackLooper);
    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };
    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                postResultIfNotInvoked(get());
            } catch (InterruptedException e) {
                android.util.Log.w(LOG_TAG, e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An error occurred while executing doInBackground()",
                        e.getCause());
            } catch (CancellationException e) {
                postResultIfNotInvoked(null);
            }
        }
    };
}

首先它根据传递进来的 Looper 来构建了一个 Handler,若没有指定 Looper 或指定的 Looper 是主线程的 Looper ,则指定内置的 InternalHandler 对消息进行处理,否则构造一个对应的 Handler。可以看出,AsyncTask 不是一定回到主线程的

接着它构建了一个 WorkerRunnableWorkerRunnable 实际上就是对 Callable 的简单包装,区别仅仅在于可以传入参数。当 mWorker 被执行时,它首先将当前 Task 设置为了被执行,然后进行了线程优先级的设置,并调用了 doInBackground 方法并获取了返回值。看来这个 WorkerRunnable 是在异步线程中执行的。不论成功与否,它最后都会调用 postResult 进行结果的交付,并返回结果。

之后它基于前面的 mWorker 构建了一个 FutureTask,当执行完成或被取消时,会调用 postResultIfNotInvoked 方法传入 mWorker 的执行结果。

这里有两个类似的方法:postResultpostResultIfNotInvoked,至于为什么要这样设计我们后面会提到。

总的来说,其构造过程主要是对 HandlerWorkerRunnable 以及 FutureTask 进行了构建。

执行

接着让我们看看当我们调用 execute 之后它做了什么:

@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}

可以看到,它转调到了 executeOnExecutor 方法,并且传入了一个 Executor 对象 sDefaultExecutor,我们先不去关注这个 Executor 的设计,让我们先看看 executeOnExecutor 方法:

@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }
    mStatus = Status.RUNNING;
    onPreExecute();
    mWorker.mParams = params;
    exec.execute(mFuture);
    return this;
}

首先,它对当前 Task 的状态进行了检查,AsyncTask 共有三种状态:PENDINGRUNNINGFINISHED,分别代表了待执行、正在执行以及已执行。这里只有 PENDING 的 Task 才能被 execute

之后它首先改变了当前 Task 的状态并调用了 onPreExecute 方法进行了用户实现的预处理,之后将用户的参数交给 mWorker 后,将 mFuture 交给了传入的线程池进行处理。

首先,FutureTask 执行后,会使得 mWorker 被执行,它执行后会将 mTaskInvoked 置为 true 并调用 postResult 进行结果的交付:

private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}

这里实际上就是把结果包装成了 AsyncTaskResult 类并放入了消息队列,这样就实现了线程的切换,将消息通过 Handler 由子线程发送给了主线程(指 AsyncTask 被指定的线程),当 Handler 收到消息后,就会对消息进行处理。

我们可以看到在构造时没有指定 Looper的情况下,默认的 InternalHandler 是如何进行处理的:

private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }
    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}

可以看到,在 MESSAGE_POST_RESULT 消息下,它在收到 AsyncTaskResult 后会调用 finish 方法进行整个任务的完成处理:

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}

finish 方法中,若任务已经取消,它会调用 onCancelled 方法进行回调,若任务正常完成,则会调用 onPostExecute 方法,并最后设置它的状态为 FINISHED

而当 FutureTask 执行完成后,会调用 postResultIfNotInvoked 方法,并传入 mWorker 的执行结果:

private void postResultIfNotInvoked(Result result) {
    final boolean wasTaskInvoked = mTaskInvoked.get();
    if (!wasTaskInvoked) {
        postResult(result);
    }
}

可以看到,这里实际上是在 Task 没有被 Invoke 的情况下才会调用到,由于 mWorker 在被执行时首先就会将 wasTaskInvoked 置为 true,因此实际上这里是很少能被调用到的。

我们再看看 onProgressUpdated 何时会被调用到,我们可以找到 MESSAGE_POST_PROGRESS 的消息是何时被发出的:

protected final void publishProgress(Progress... values) {
    if (!isCancelled()) {
        getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                new AsyncTaskResult<Progress>(this, values)).sendToTarget();
    }
}

它是在 publishProgress 方法中被调用的,这个方法是提供给用户的,因此进度的更新需要用户自行在 doInBackground 中调用 publishProgress 从而实现。

取消

接着让我们看看任务的取消,我们看到 cancel 方法:

public final boolean cancel(boolean mayInterruptIfRunning) {
    mCancelled.set(true);
    return mFuture.cancel(mayInterruptIfRunning);
}

可以看到,cancel 方法最后会调用到 mFuturecancel,并且它需要传递一个参数 mayInterruptIfRunning,当该参数为 true 时,即使任务在运行也会被打断,而如果这个参数是 false,则 doInBackground 仍然会一直执行到结束。

这个设计非常奇怪,按道理来说取消的场景只有任务正在执行时需要取消,完全可以提供一个这个参数默认为 true 的方法供外部调用。并且这个方法只能尽量让任务尽快结束,如果执行的过程中有一些不可打断的操作,则这个方法调用后仍然不会使得任务停止。

线程池

对于 AsyncTask,我们还有最后一个问题没有研究,那就是它执行任务的sDefaultExecutor

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

可以看到它的默认值为 SERIAL_EXECUTOR,而 SERIAL_EXECUTOR 则是 SerialExecutor 的实例:

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;
    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

可以看到,这个 Executor 内部维护了一个 ArrayDeque 队列,这个队列是一个任务缓存队列,里面存放了对真正要执行的 Runnable 进行包装的 Runnable

执行时,当 mActive 为 null,它会调用下面的 scheduleNext 方法,在该方法中会从 mTasks 队首取出任务赋值给 mActive,之后通过 THREAD_POOL_EXECUTOR 这个线程池对该任务进行执行。当任务执行完成后,会继续调用 scheduleNext 对队列中的下一个任务进行执行。通过这样的设计,Runnable 的执行变成了一种按序执行,只有前一个执行结束,才会进行下一个任务的执行,因此 AsyncTask 的执行顺序实际上是一种串行执行

实际上在 Android 1.6 之前,AsyncTask 都是通过一个单独的 background 线程对任务进行串行执行,但在 Android 1.6 之后,设计人员认为串行执行效率太低,因此引入了一个线程池从而支持对它进行并行执行。

而由于这个改动使得很多应用出现了并发问题,因此 Android 3.0 之后又把它改回了串行执行(引入了 SerialExecutor),同时对并行执行进行了支持。但它默认仍然是以串行的方式对任务进行执行,如果需要并行执行可以调用 executeOnExecutor 方法并将 THREAD_POOL_EXECUTOR 传入。

那么 THREAD_POOL_EXECUTOR 又是一个怎样的线程池呢?让我们看看它的声明:

// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;

可以看到,THREAD_OOL_EXECUTOR 的配置如下:

  • corePoolSize:CPU 核心数 - 1,但保持在 2-4 个。
  • maxPoolSize:2 * CPU 核心数 + 1
  • keepAliveSeconds:30 秒
  • workQueue:容量为 128 的阻塞队列。

设置核心线程数为 CPU 核心数 - 1 的主要原因是除开主线程外这个线程数已经足够了,CPU 最多也就能同时进行 CPU 核心数个线程的真正并行执行,超出了也只是将任务按时间分片执行,带不来太大的效率提高。

不足之处

AsyncTask 看似十分美好,但实际上存在着非常多的不足,这些不足使得它逐渐退出了历史舞台:

  • 生命周期:AsyncTask 没有与 Activity、Fragment 的生命周期绑定,即使 Activity 被销毁,它的 doInBackground 任务仍然会继续执行。
  • 取消任务:AsyncTaskcancel 方法的参数 mayInterruptIfRunning 存在的意义不大,并且它无法保证任务一定能取消,只能尽快让任务取消(比如如果正在进行一些无法打断的操作时,任务就仍然会运行)
  • 内存泄漏:由于它没有与 Activity 等生命周期进行绑定,因此它的生命周期仍然可能比 Activity 长,如果将它作为 Activity 的非 static 内部类,则它会持有 Activity 的引用,导致 Activity 的内存无法释放。(与 Handler 的内存泄漏问题类似)
  • 并行/串行:由于 AsyncTask 的串行和并行执行在多个版本上都进行了修改,所以当多个 AsyncTask 依次执行时,它究竟是串行还是并行执行取决于用户手机的版本。具体修改如下:
    • Android 1.6 之前:各个 AsyncTask 按串行的顺序进行执行。
    • Android 3.0 之前:由于设计者认为串行执行效率太低,因此改为了并行执行,最多五个 AsyncTask 同时执行。
    • Android 3.0 之后:由于之前的改动,很多应用出现了并发问题,因此引入 SerialExecutor 改回了串行执行,但对并行执行进行了支持。

总结

到了这里,我们基本上能对 AsyncTask 的实现原理有一个大致的了解了,它的原理实际上还是挺简单的:通过 ExecutorFutureTask 配合,从而实现任务的异步执行,最后在任务结束后通过 Handler 进行线程的切换从而实现了整个线程调度的功能。并且在 Android 3.0 之后的版本中,默认情况下 AsyncTask 的执行顺序是串行进行的

参考资料

Android 中糟糕的 AsyncTask

点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注

%d 博主赞过: