高并发之——两种异步模型与深度解析Future接口

发布时间:2021-08-02 22:46 来源:网络整理 阅读:121 作者:mb5fe18fed96438 栏目: Mysql 欢迎投稿:712375056

 

        }

            }
    } finally {
                    t.interrupt();
    if (!(state == NEW &&
            try {
2.有返回结果的异步模型

尽管使用回调接口能够获取异步任务的结果,但是这种方式使用起来略显复杂。在JDK中提供了可以直接返回异步结果的处理方案。最常用的就是使用Future接口或者其实现类FutureTask来接收任务的返回结果。

(7)run()方法与runAndReset()方法

接下来,就是run()方法了,run()方法的源代码如下所示。

        else if (q == null)
 * @version 1.0.0

    LockSupport.parkNanos(this, nanos);

任务的执行类是具体执行任务的类,实现Runnable接口,在此类中定义一个回调接口类型的成员变量和一个String类型的任务参数(模拟任务的参数),并在构造方法中注入回调接口和任务参数。在run方法中执行任务,任务完成后将任务的结果数据封装成TaskResult对象,调用回调接口的方法将TaskResult对象传递到回调方法中。

        executorService.execute(futureTask);


        }
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                setException(ex);
            set(result);

看来,这里还要看下report()方法啊,点进去看下report()方法的实现,如下所示。

接下来,定义了其他几个成员变量,如下所示.

        // runner must be non-null until state is settled to


        while (state == INTERRUPTING)
        }

可以看到,handlePossibleCancellationInterrupt()方法的实现比较简单,当任务的状态为INTERRUPTING时,使用while()循环,条件为当前任务状态为INTERRUPTING,将当前线程占用的CPU资源释放,也就是说,当任务运行完成后,释放线程所占用的资源。

        return;

        // state must be re-read after nulling runner to prevent
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
    V get(long timeout, TimeUnit unit)
    }
/**
public boolean isDone() {

                if (next == null)

waiters:WaitNode类型的变量,表示等待线程的堆栈,在FutureTask的实现中,会通过CAS结合此堆栈交换任务的运行状态。

        node.thread = null;
     * 任务结果数据
          mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
else if (timed) {
if (state != NEW ||
    if (q != null)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
NEW -> CANCELLED
    this.callable = callable;
/**
        new Thread(taskExecutor).start();

可以看到,report()方法的实现比较简单,首先,将outcome数据赋值给x变量,接下来,主要是判断接收到的任务状态,如果状态为NORMAL,则将x强转为泛型类型返回;当任务的状态大于或者等于CANCELLED,也就是任务已经取消,则抛出CancellationException异常,其他情况则抛出ExecutionException异常。

        if (s > COMPLETING) {
        }
        System.out.println(future.get());
        });
 * @description 测试回调
     * 任务状态
            } catch (Throwable ex) {
                                     null, Thread.currentThread()))

本文有点长,但是满满的干货,以实际案例的形式分析了两种异步模型,并从源码角度深度解析Future接口和FutureTask类,希望大家踏下心来,打开你的IDE,跟着文章看源码,相信你一定收获不小!

            public String call() throws Exception {
        return taskResult;
        // leaked interrupts

    try {    // in case call to interrupt throws exception

        V result;
protected void done() { }
这里需要注意一个细节:只有任务未启动,或者在完成之前被取消,才会返回true,表示任务已经被成功取消。其他情况都会返回false。

            setException(ex);
}
    return state >= CANCELLED;
            for (;;) {
protected void setException(Throwable t) {
 * @author binghe
    Thread.yield();

这里,发现变更任务状态使用的是UNSAFE.putOrderedInt()方法,这个方法是个什么鬼呢?点进去看一下,如下所示。

                else if (pred != null) {

}
    runner = null;
private static final long stateOffset;
 * @description 测试Future获取异步结果

运行结果如下所示。

private void removeWaiter(WaitNode node) {

当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成并返回任务的结果数据。

finally {
    int s = state;
    WaitNode q = null;

接下来,程序会进入finally代码块中,如下所示。

    int s = state;
 * @version 1.0.0
        Callable<V> c = callable;
        throw new CancellationException();
    }
    boolean isDone();

没参数的get()方法为当任务未运行完成时,会阻塞,直到返回任务结果。有参数的get()方法为当任务未运行完成,并且等待时间超出了超时时间,会TimeoutException异常。

一、两种异步模型

在Java的并发编程中,大体上会分为两种异步编程模型,一类是直接以异步的形式来并行运行其他的任务,不需要返回任务的结果数据。一类是以异步的形式运行其他任务,需要返回结果。

    try {

     */
        else if (!queued)
            throw new InterruptedException();
                Thread t = runner;
                                 null, Thread.currentThread()))
接下来,cancel(boolean)方法会进入finally代码块,如下所示。

    for (WaitNode q; (q = waiters) != null;) {

runner:运行Callable的线程,运行期间会使用CAS保证线程安全,这里大家只需要知道CAS是Java保证线程安全的一种方式,后续文章中会深度分析CAS如何保证线程安全。

    }
            try {
        } finally { // final state
return report(s);

接下来,拆解awaitDone()方法。在awaitDone()方法中,最重要的就是for自旋循环,在循环中首先判断当前线程是否被中断,如果已经被中断,则调用removeWaiter()将当前线程从堆栈中移除,并且抛出InterruptedException异常,如下所示。

        Callable<V> c = callable;
        ExecutorService executorService = Executors.newSingleThreadExecutor();

可以看到,done()方法是一个空的方法体,交由子类来实现具体的业务逻辑。
}
这个接口比较简单,run()方法就是运行任务时调用的方法。

            return s;

结合线程池的使用示例如下。

            Thread.yield();
        if (mayInterruptIfRunning) {

        s = state;

当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成,并设置了超时等待时间。在超时时间内任务完成,则返回结果;否则,抛出TimeoutException异常。

        if (s >= INTERRUPTING)
    @Override
 * @author binghe
        int s = state;
 * @author binghe
        q.thread = null;
            public String call() throws Exception {

        runner = null;
 * @author binghe

runAndReset()方法的逻辑与run()差不多,只是runAndReset()方法会在finally代码块中将任务状态重置为NEW。runAndReset()方法的源代码如下所示,就不重复说明了。

}
public void run() {
            }
    boolean queued = false;
// Unsafe mechanics

定义任务结果数据的封装类

public class TaskHandler implements TaskCallable<TaskResult> {

至于finishCompletion()方法,前面已经分析过。

}
                result = null;
else if (s == COMPLETING)
}

            (k.getDeclaredField("state"));
}
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
        if (c != null && state == NEW) {
        // runner must be non-null until state is settled to

get(long, TimeUnit)

                    if (pred.thread == null) // check for race
public class FutureTest {
                Thread t = q.thread;
    }

判断任务在完成之前是否被取消,如果在任务完成之前被取消,则返回true;否则,返回false。
    return true;
            }
else if (q == null)
                WaitNode next = q.next;

运行结果如下所示。

两个get()方法的主要逻辑差不多,一个没有超时设置,一个有超时设置,这里说一下主要逻辑。判断任务的当前状态是否小于或者等于COMPLETING,也就是说,任务是NEW状态或者COMPLETING,调用awaitDone()方法,看下awaitDone()方法的实现,如下所示。

 * @version 1.0.0
}
        runner = null;
import java.util.concurrent.*;
            result = null;
1.无返回结果的异步模型

无返回结果的异步任务,可以直接将任务丢进线程或线程池中运行,此时,无法直接获得任务的执行结果数据,一种方式是可以使用回调方法来获取任务的运行结果。

}
TaskResult{taskStatus=1, taskMessage='测试回调任务', taskResult='异步回调成功'}
    if (s == NORMAL)
 */
//TODO 拿到结果数据后进一步处理
    try {
 * @description 定义回调接口
        TaskResult result = new TaskResult();
    if (callable == null)
private void finishCompletion() {

到这里,整个大的框架算是完成了,接下来,就是测试看能否获取到异步任务的结果了。

        this.taskCallable = taskCallable;
                return "测试FutureTask获取异步结果";
            handlePossibleCancellationInterrupt(s);
            result = c.call();
                if (t != null)
}

    throw new ExecutionException((Throwable)x);
    Object x = outcome;
    // assert state > COMPLETING;
}
    }
    } finally {

使用Future接口获取异步结果

                ", taskMessage='" + taskMessage + '\'' +
    }
 */
    private static final long serialVersionUID = 8678277072402730062L;
(2)构造方法

接下来,是FutureTask的两个构造方法,比较简单,如下所示。

    V get() throws InterruptedException, ExecutionException;
二、深度解析Future接口 1.Future接口

免责声明:本站发布的内容(图片、视频和文字)以原创、来自本网站内容采集于网络互联网转载等其它媒体和分享为主,内容观点不代表本网站立场,如侵犯了原作者的版权,请告知一经查实,将立刻删除涉嫌侵权内容,联系我们QQ:712375056,同时欢迎投稿传递力量。