0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Java CompletableFuture 异步超时实现探索

京东云 来源:jf_75140285 作者:jf_75140285 2024-07-25 14:06 次阅读

简介

JDK 8 中 CompletableFuture 没有超时中断任务的能力。现有做法强依赖任务自身的超时实现。本文提出一种异步超时实现方案,解决上述问题。

前言

JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是 CompletableFuture。自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future 的缺陷。

在我们的日常优化中,最常用手段便是多线程并行执行。这时候就会涉及到 CompletableFuture 的使用。

常见使用方式

下面举例一个常见场景。

假如我们有两个 RPC 远程调用服务,我们需要获取两个 RPC 的结果后,再进行后续逻辑处理。

public static void main(String[] args) {
    // 任务 A,耗时 2 秒
    int resultA = compute(1);
    // 任务 B,耗时 2 秒
    int resultB = compute(2);

    // 后续业务逻辑处理
    System.out.println(resultA + resultB);
}

可以预估到,串行执行最少耗时 4 秒,并且 B 任务并不依赖 A 任务结果。

对于这种场景,我们通常会选择并行的方式优化,Demo 代码如下:

public static void main(String[] args) {
    // 仅简单举例,在生产代码中可别这么写!

    // 统计耗时的函数
    time(() -> {
        CompletableFuture< Integer > result = Stream.of(1, 2)
                                                  // 创建异步任务
                                                  .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                  // 聚合
                                                  .reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));

        // 等待结果
        try {
            System.out.println("结果:" + result.get());
        } catch (ExecutionException | InterruptedException e) {
            System.err.println("任务执行异常");
        }
    });
}

输出:
[async-1]: 任务执行开始:1
[async-2]: 任务执行开始:2
[async-1]: 任务执行完成:1
[async-2]: 任务执行完成:2
结果:3
耗时:2 秒

可以看到耗时变成了 2 秒。

存在的问题

分析

看上去 CompletableFuture 现有功能可以满足我们诉求。但当我们引入一些现实常见情况时,一些潜在的不足便暴露出来了。

compute(x) 如果是一个根据入参查询用户某类型优惠券列表的任务,我们需要查询两种优惠券并组合在一起返回给上游。假如上游要求我们 2 秒内处理完毕并返回结果,但 compute(x) 耗时却在 0.5 秒 ~ 无穷大波动。这时候我们就需要把耗时过长的 compute(x) 任务结果放弃,仅处理在指定时间内完成的任务,尽可能保证服务可用。

那么以上代码的耗时由耗时最长的服务决定,无法满足现有诉求。通常我们会使用 get(long timeout, TimeUnit unit) 来指定获取结果的超时时间,并且我们会给 compute(x) 设置一个超时时间,达到后自动抛异常来中断任务。

public static void main(String[] args) {
    // 仅简单举例,在生产代码中可别这么写!

    // 统计耗时的函数
    time(() -> {
        List< CompletableFuture< Integer >> result = Stream.of(1, 2)
                                                        // 创建异步任务,compute(x) 超时抛出异常
                                                        .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                        .toList();

        // 等待结果
        int res = 0;
        for (CompletableFuture< Integer > future : result) {
            try {
                res += future.get(2, SECONDS);
            } catch (ExecutionException | InterruptedException | TimeoutException e) {
                System.err.println("任务执行异常或超时");
            }
        }

        System.out.println("结果:" + res);
    });
}

输出:
[async-2]: 任务执行开始:2
[async-1]: 任务执行开始:1
[async-1]: 任务执行完成:1
任务执行异常或超时
结果:1
耗时:2 秒

可以看到,只要我们能够给 compute(x) 设置一个超时时间将任务中断,结合 get、getNow 等获取结果的方式,就可以很好地管理整体耗时。

那么问题也就转变成了,如何给任务设置异步超时时间呢

现有做法

当异步任务是一个 RPC 请求时,我们可以设置一个 JSF 超时,以达到异步超时效果。

当请求是一个 R2M 请求时,我们也可以控制 R2M 连接的最大超时时间来达到效果。

这么看好像我们都是在依赖三方中间件的能力来管理任务超时时间?那么就存在一个问题,中间件超时控制能力有限,如果异步任务是中间件 IO 操作 + 本地计算操作怎么办?

用 JSF 超时举一个具体的例子,反编译 JSF 的获取结果代码如下:

public V get(long timeout, TimeUnit unit) throws InterruptedException {
    // 配置的超时时间
    timeout = unit.toMillis(timeout);
    // 剩余等待时间
    long remaintime = timeout - (this.sentTime - this.genTime);
    if (remaintime <= 0L) {
        if (this.isDone()) {
            // 反序列化获取结果
            return this.getNow();
        }
    } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
        // 等待时间内任务完成,反序列化获取结果
        return this.getNow();
    }

    this.setDoneTime();
    // 超时抛出异常
    throw this.clientTimeoutException(false);
}

当这个任务刚好卡在超时边缘完成时,这个任务的耗时时间就变成了超时时间 + 获取结果时间。而获取结果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。

某些 CPU 使用率高的情况下,就会出现异步任务没能触发抛出异常中断,导致我们无法准确控制超时时间。对上游来说,本次请求全部失败。

解决方式

JDK 9

这类问题非常常见,如大促场景,服务器 CPU 瞬间升高就会出现以上问题。

那么如何解决呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture 正式提供了 orTimeout、completeTimeout 方法,来准确实现异步超时控制。

public CompletableFuture< T > orTimeout(long timeout, TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
    return this;
}

JDK 9 orTimeout 其实现原理是通过一个定时任务,在给定时间之后抛出异常。如果任务在指定时间内完成,则取消抛异常的操作。

以上代码我们按执行顺序来看下:

首先执行 new Timeout(this)。

static final class Timeout implements Runnable {
    final CompletableFuture< ? > f;
    Timeout(CompletableFuture< ? > f) { this.f = f; }
    public void run() {
        if (f != null && !f.isDone())
            // 抛出超时异常
            f.completeExceptionally(new TimeoutException());
    }
}

通过源码可以看到,Timeout 是一个实现 Runnable 的类,run() 方法负责给传入的异步任务通过 completeExceptionally CAS 赋值异常,将任务标记为异常完成。

那么谁来触发这个 run() 方法呢?我们看下 Delayer 的实现。

static final class Delayer {
    static ScheduledFuture< ? > delay(Runnable command, long delay,
                                    TimeUnit unit) {
        // 到时间触发 command 任务
        return delayer.schedule(command, delay, unit);
    }

    static final class DaemonThreadFactory implements ThreadFactory {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("CompletableFutureDelayScheduler");
            return t;
        }
    }

    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }
}

Delayer 其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit) 通过 ScheduledThreadPoolExecutor 实现指定时间后触发 Timeout 的 run() 方法。

到这里就已经实现了超时抛出异常的操作。但当任务完成时,就没必要触发 Timeout 了。因此我们还需要实现一个取消逻辑。

static final class Canceller implements BiConsumer< Object, Throwable > {
    final Future< ? > f;
    Canceller(Future< ? > f) { this.f = f; }
    public void accept(Object ignore, Throwable ex) {
        if (ex == null && f != null && !f.isDone())
        // 3 未触发抛异常任务则取消
            f.cancel(false);
    }
}

当任务执行完成,或者任务执行异常时,我们也就没必要抛出超时异常了。因此我们可以把 delayer.schedule(command, delay, unit) 返回的定时超时任务取消,不再触发 Timeout。 当我们的异步任务完成,并且定时超时任务未完成的时候,就是我们取消的时机。因此我们可以通过 whenComplete(BiConsumer action) 来完成。

Canceller 就是一个 BiConsumer 的实现。其持有了 delayer.schedule(command, delay, unit) 返回的定时超时任务,accept(Object ignore, Throwable ex) 实现了定时超时任务未完成后,执行 cancel(boolean mayInterruptIfRunning) 取消任务的操作。

JDK 8

如果我们使用的是 JDK 9 或以上,我们可以直接用 JDK 的实现来完成异步超时操作。那么 JDK 8 怎么办呢?

其实我们也可以根据上述逻辑简单实现一个工具类来辅助。

以下是我们营销自己的工具类以及用法,贴出来给大家作为参考,大家也可以自己写的更优雅一些~

调用方式:

CompletableFutureExpandUtils.orTimeout(异步任务, 超时时间, 时间单位);

工具类源码:

package com.jd.jr.market.reduction.util;

import com.jdpay.market.common.exception.UncheckedException;

import java.util.concurrent.*;
import java.util.function.BiConsumer;

/**
 * CompletableFuture 扩展工具
 *
 * @author zhangtianci7
 */
public class CompletableFutureExpandUtils {

    /**
     * 如果在给定超时之前未完成,则异常完成此 CompletableFuture 并抛出 {@link TimeoutException} 。
     *
     * @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位
     * @param unit    一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间
     * @return 入参的 CompletableFuture
     */
    public static < T > CompletableFuture< T > orTimeout(CompletableFuture< T > future, long timeout, TimeUnit unit) {
        if (null == unit) {
            throw new UncheckedException("时间的给定粒度不能为空");
        }
        if (null == future) {
            throw new UncheckedException("异步任务不能为空");
        }
        if (future.isDone()) {
            return future;
        }

        return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
    }

    /**
     * 超时时异常完成的操作
     */
    static final class Timeout implements Runnable {
        final CompletableFuture< ? > future;

        Timeout(CompletableFuture< ? > future) {
            this.future = future;
        }

        public void run() {
            if (null != future && !future.isDone()) {
                future.completeExceptionally(new TimeoutException());
            }
        }
    }

    /**
     * 取消不需要的超时的操作
     */
    static final class Canceller implements BiConsumer< Object, Throwable > {
        final Future< ? > future;

        Canceller(Future< ? > future) {
            this.future = future;
        }

        public void accept(Object ignore, Throwable ex) {
            if (null == ex && null != future && !future.isDone()) {
                future.cancel(false);
            }
        }
    }

    /**
     * 单例延迟调度器,仅用于启动和取消任务,一个线程就足够
     */
    static final class Delayer {
        static ScheduledFuture< ? > delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }

        static final class DaemonThreadFactory implements ThreadFactory {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureExpandUtilsDelayScheduler");
                return t;
            }
        }

        static final ScheduledThreadPoolExecutor delayer;

        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }
    }
}

总结

在 JDK 8 场景下,现有超时中断的做法依赖于任务本身的超时实现,当任务本身的超时失效,或者不够精确时,并没有很好的手段来中断任务。因此本文给出一种让 CompletableFuture 支持异步超时的实现方案实现思路,仅供大家参考。

参考资料

JEP 266: JDK 9 并发包更新提案

审核编辑 黄宇

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • JAVA
    +关注

    关注

    19

    文章

    2967

    浏览量

    104745
  • RPC
    RPC
    +关注

    关注

    0

    文章

    111

    浏览量

    11534
  • 异步
    +关注

    关注

    0

    文章

    62

    浏览量

    18048
收藏 人收藏

    评论

    相关推荐

    Java 23功能介绍

    Java 23 包含全新和更新的 Java 语言功能、核心 API 以及 JVM,同时适合新的 Java 开发者和高级开发者。从 IntelliJ IDEA 2024.2 开始已支持 Java
    的头像 发表于 12-04 10:02 205次阅读
    <b class='flag-5'>Java</b> 23功能介绍

    socket 连接超时处理技巧

    在网络编程中,Socket连接超时是一个常见的问题。处理超时的关键在于确保程序能够优雅地处理这些情况,避免程序崩溃或者无响应。以下是一些处理Socket连接超时的技巧。 1. 设置合理的超时
    的头像 发表于 11-12 14:13 414次阅读

    Java中时间戳的使用

    Java中时间戳的使用
    的头像 发表于 11-06 16:04 201次阅读
    <b class='flag-5'>Java</b>中时间戳的使用

    socket连接超时如何处理

    实现以及网络环境。 1. 理解Socket连接超时 在TCP/IP协议中,socket连接超时通常指的是在建立连接、发送数据或接收数据的过程中,由于没有在预期时间内完成操作,系统自动终止连接。这个预期时间就是
    的头像 发表于 11-01 16:48 631次阅读

    记一次JSF异步调用引起的接口可用率降低

    前言 本文记录了由于JSF异步调用超时引起的接口可用率降低问题的排查过程,主要介绍了排查思路和JSF异步调用的流程,希望可以帮助大家了解JSF的异步调用原理以及提供一些问题排查思路。本
    的头像 发表于 08-05 13:40 250次阅读
    记一次JSF<b class='flag-5'>异步</b>调用引起的接口可用率降低

    华纳云:java web和java有什么区别java web和java有什么区别

    Java Web和Java是两个不同的概念,它们在功能、用途和实现方式上存在一些区别,下面将详细介绍它们之间的区别。 1. 功能和用途: – Java是一种编程语言,它提供了一种用于开
    的头像 发表于 07-16 13:35 803次阅读
    华纳云:<b class='flag-5'>java</b> web和<b class='flag-5'>java</b>有什么区别<b class='flag-5'>java</b> web和<b class='flag-5'>java</b>有什么区别

    中伟视界:智能监控和预警,静止超时AI算法如何提升非煤矿山安全?

    本文详细介绍了静止超时AI算法在非煤矿山的工作原理、技术实现细节和应用场景,并分析了其在安全管理中的实际效果。通过智能监控和预警,静止超时AI算法能够提高矿山的安全防控水平,提升管理效率,降低运营
    的头像 发表于 07-14 11:29 1071次阅读
    中伟视界:智能监控和预警,静止<b class='flag-5'>超时</b>AI算法如何提升非煤矿山安全?

    从多线程设计模式到对 CompletableFuture 的应用

    最近在开发 延保服务 频道页时,为了提高查询效率,使用到了多线程技术。为了对多线程方案设计有更加充分的了解,在业余时间读完了《图解 Java 多线程设计模式》这本书,觉得收获良多。本篇文章将介绍其中
    的头像 发表于 06-26 14:18 362次阅读
    从多线程设计模式到对 <b class='flag-5'>CompletableFuture</b> 的应用

    探索虚拟线程:原理与实现

    虚拟线程的引入与优势 在Loom项目之前,Java虚拟机(JVM)中的线程是通过java.lang.Thread类型来实现的,这些线程被称为平台线程。 然而,平台线程的创建和维护在资源使用上存在显著
    的头像 发表于 06-24 11:35 304次阅读
    <b class='flag-5'>探索</b>虚拟线程:原理与<b class='flag-5'>实现</b>

    已经安装了Java,且依然提示安装Java是为什么?

    我已经在机器上安装了最新版的 Java 10,打开 Cube 却得到要求安装 Java 1.7.0_45 的提示。何解?Eclipse CDT 依赖 Java,不可卸载重装。
    发表于 04-26 06:23

    飞凌ElfBoard ELF 1板卡-如何在ELF 1开发板上实现java的支持

    /mergesort# java MergeDemo其他测试例程方法类似。 完成上述一系列步骤,即可在ELF 1开发板上成功实现Java环境的搭建与运行,我们衷心希望这篇教程指南能成为各位小伙伴在
    发表于 03-20 09:51

    java实现多线程的几种方式

    Java实现多线程的几种方式 多线程是指程序中包含了两个或以上的线程,每个线程都可以并行执行不同的任务或操作。Java中的多线程可以提高程序的效率和性能,使得程序可以同时处理多个任务。 Jav
    的头像 发表于 03-14 16:55 707次阅读

    有办法实现TC367 QSPI 48位异步传输吗?

    有办法实现 QSPI 48 位异步传输吗?
    发表于 03-05 07:22

    M467的串口DMA超时判断,MCU无法实现TIMEOUT响应是为什么?

    请问关于M467的串口DMA超时判断问题,现在遇到的一个问题是: MCU通过RS422连接了一个对方设备,23400bps,100HZz帧率,使用DMA超时判断,发现重新上电对MCU和对方设备
    发表于 01-16 08:04

    常见的HTTP接口超时问题出现原因及解决办法

    HTTP接口超时问题是指在HTTP请求发送到服务器后,由于等待服务器响应的时间超过了预设的超时时间,导致请求被中断。以下是可能导致HTTP接口超时问题的原因和解决方法: 网络延迟或不稳定:网络延迟或
    的头像 发表于 01-12 13:42 2572次阅读