0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

用这4招 优雅的实现Spring Boot异步线程间数据传递

jf_ro2CN3Fa 来源:码猿技术专栏 2023-01-30 10:40 次阅读

Spring Boot 自定义线程池实现异步开发相信看过陈某的文章都了解,但是在实际开发中需要在父子线程之间传递一些数据,比如用户信息,链路信息等等

比如用户登录信息使用ThreadLocal存放保证线程隔离,代码如下:

/**
*@description用户上下文信息
*/
publicclassOauthContext{
privatestaticfinalThreadLocalloginValThreadLocal=newThreadLocal<>();

publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}

那么子线程想要获取这个LoginVal如何做呢?

今天就来介绍几种优雅的方式实现Spring Boot 内部的父子线程的数据传递。

7bd85a70-9688-11ed-bfe3-dac502259ad0.png

1. 手动设置

每执行一次异步线程都要分为两步:

获取父线程的LoginVal

将LoginVal设置到子线程,达到复用

代码如下:

publicvoidhandlerAsync(){
//1.获取父线程的loginVal
LoginValloginVal=OauthContext.get();
log.info("父线程的值:{}",OauthContext.get());
CompletableFuture.runAsync(()->{
//2.设置子线程的值,复用
OauthContext.set(loginVal);
log.info("子线程的值:{}",OauthContext.get());
});
}

虽然能够实现目的,但是每次开异步线程都需要手动设置,重复代码太多,看了头疼,你认为优雅吗?

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

项目地址:https://github.com/YunaiV/ruoyi-vue-pro

2. 线程池设置TaskDecorator

TaskDecorator是什么?官方api的大致意思:这是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。

知道有这么一个东西,如何去使用?

TaskDecorator是一个接口,首先需要去实现它,代码如下:

/**
*@description上下文装饰器
*/
publicclassContextTaskDecoratorimplementsTaskDecorator{
@Override
publicRunnabledecorate(Runnablerunnable){
//获取父线程的loginVal
LoginValloginVal=OauthContext.get();
return()->{
try{
//将主线程的请求信息,设置到子线程中
OauthContext.set(loginVal);
//执行子线程,这一步不要忘了
runnable.run();
}finally{
//线程结束,清空这些信息,否则可能造成内存泄漏
OauthContext.clear();
}
};
}
}

这里我只是设置了LoginVal,实际开发中其他的共享数据,比如SecurityContext,RequestAttributes....

TaskDecorator需要结合线程池使用,实际开发中异步线程建议使用线程池,只需要在对应的线程池配置一下,代码如下:

@Bean("taskExecutor")
publicThreadPoolTaskExecutortaskExecutor(){
ThreadPoolTaskExecutorpoolTaskExecutor=newThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(xx);
poolTaskExecutor.setMaxPoolSize(xx);
//设置线程活跃时间(秒)
poolTaskExecutor.setKeepAliveSeconds(xx);
//设置队列容量
poolTaskExecutor.setQueueCapacity(xx);
//设置TaskDecorator,用于解决父子线程间的数据复用
poolTaskExecutor.setTaskDecorator(newContextTaskDecorator());
poolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());
//等待所有任务结束后再关闭线程池
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
returnpoolTaskExecutor;
}

此时业务代码就不需要去设置子线程的值,直接使用即可,代码如下:

publicvoidhandlerAsync(){
log.info("父线程的用户信息:{}",OauthContext.get());
//执行异步任务,需要指定的线程池
CompletableFuture.runAsync(()->log.info("子线程的用户信息:{}",OauthContext.get()),taskExecutor);
}

来看一下结果,如下图:

7bee3aca-9688-11ed-bfe3-dac502259ad0.png

这里使用的是CompletableFuture执行异步任务,使用@Async这个注解同样是可行的。

注意 :无论使用何种方式,都需要指定线程池

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

项目地址:https://github.com/YunaiV/yudao-cloud

3. InheritableThreadLocal

这种方案不建议使用,InheritableThreadLocal虽然能够实现父子线程间的复用,但是在线程池中使用会存在复用的问题。

这种方案使用也是非常简单,直接用InheritableThreadLocal替换ThreadLocal即可,代码如下:

/**
*@description用户上下文信息
*/
publicclassOauthContext{
privatestaticfinalInheritableThreadLocalloginValThreadLocal=newInheritableThreadLocal<>();

publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}

4. TransmittableThreadLocal

TransmittableThreadLocal是阿里开源的工具,弥补了InheritableThreadLocal的缺陷,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

使用起来也是非常简单,添加依赖如下:


com.alibaba
transmittable-thread-local
2.14.2

OauthContext改造代码如下:

/**
*@description用户上下文信息
*/
publicclassOauthContext{
privatestaticfinalTransmittableThreadLocalloginValThreadLocal=newTransmittableThreadLocal<>();

publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}

TransmittableThreadLocal原理

从定义来看,TransimittableThreadLocal继承于InheritableThreadLocal,并实现TtlCopier接口,它里面只有一个copy方法。所以主要是对InheritableThreadLocal的扩展。

publicclassTransmittableThreadLocalextendsInheritableThreadLocalimplementsTtlCopier

在TransimittableThreadLocal中添加holder属性。这个属性的作用就是被标记为具备线程传递资格的对象都会被添加到这个对象中。

要标记一个类,比较容易想到的方式,就是给这个类新增一个Type字段,还有一个方法就是将具备这种类型的的对象都添加到一个静态全局集合中。之后使用时,这个集合里的所有值都具备这个标记。

//1.holder本身是一个InheritableThreadLocal对象
//2.这个holder对象的value是WeakHashMap,?>
//2.1WeekHashMap的value总是null,且不可能被使用。
//2.2WeekHasshMap支持value=null
privatestaticInheritableThreadLocal,?>>holder=newInheritableThreadLocal,?>>(){
@Override
protectedWeakHashMap,?>initialValue(){
returnnewWeakHashMap,Object>();
}

/**
*重写了childValue方法,实现上直接将父线程的属性作为子线程的本地变量对象。
*/
@Override
protectedWeakHashMap,?>childValue(WeakHashMap,?>parentValue){
returnnewWeakHashMap,Object>(parentValue);
}
};

应用代码是通过TtlExecutors工具类对线程池对象进行包装。工具类只是简单的判断,输入的线程池是否已经被包装过、非空校验等,然后返回包装类ExecutorServiceTtlWrapper。根据不同的线程池类型,有不同和的包装类。

@Nullable
publicstaticExecutorServicegetTtlExecutorService(@NullableExecutorServiceexecutorService){
if(TtlAgent.isTtlAgentLoaded()||executorService==null||executorServiceinstanceofTtlEnhanced){
returnexecutorService;
}
returnnewExecutorServiceTtlWrapper(executorService);
}

进入包装类ExecutorServiceTtlWrapper。可以注意到不论是通过ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都会将线程对象包装成TtlCallable或者TtlRunnable,用于在真正执行run方法前做一些业务逻辑。

/**
*在ExecutorServiceTtlWrapper实现submit方法
*/
@NonNull
@Override
publicFuturesubmit(@NonNullCallabletask){
returnexecutorService.submit(TtlCallable.get(task));
}

/**
*在ExecutorTtlWrapper实现execute方法
*/
@Override
publicvoidexecute(@NonNullRunnablecommand){
executor.execute(TtlRunnable.get(command));
}

所以,重点的核心逻辑应该是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable为例,TtlRunnable同理类似。在分析call()方法之前,先看一个类Transmitter

publicstaticclassTransmitter{
/**
*捕获当前线程中的是所有TransimittableThreadLocal和注册ThreadLocal的值。
*/
@NonNull
publicstaticObjectcapture(){
returnnewSnapshot(captureTtlValues(),captureThreadLocalValues());
}

/**
*捕获TransimittableThreadLocal的值,将holder中的所有值都添加到HashMap后返回。
*/
privatestaticHashMap,Object>captureTtlValues(){
HashMap,Object>ttl2Value=
newHashMap,Object>();
for(TransmittableThreadLocalthreadLocal:holder.get().keySet()){
ttl2Value.put(threadLocal,threadLocal.copyValue());
}
returnttl2Value;
}

/**
*捕获注册的ThreadLocal的值,也就是原本线程中的ThreadLocal,可以注册到TTL中,在
*进行线程池本地变量传递时也会被传递。
*/
privatestaticHashMap,Object>captureThreadLocalValues(){
finalHashMap,Object>threadLocal2Value=
newHashMap,Object>();
for(Map.Entry,TtlCopier>entry:threadLocalHolder.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
finalTtlCopiercopier=entry.getValue();
threadLocal2Value.put(threadLocal,copier.copy(threadLocal.get()));
}
returnthreadLocal2Value;
}

/**
*将捕获到的本地变量进行替换子线程的本地变量,并且返回子线程现有的本地变量副本backup。
*用于在执行run/call方法之后,将本地变量副本恢复。
*/
@NonNull
publicstaticObjectreplay(@NonNullObjectcaptured){
finalSnapshotcapturedSnapshot=(Snapshot)captured;
returnnewSnapshot(replayTtlValues(capturedSnapshot.ttl2Value),
replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}

/**
*替换TransmittableThreadLocal
*/
@NonNull
privatestaticHashMap,Object>replayTtlValues(@NonNullHashMap,Object>captured){
//创建副本backup
HashMap,Object>backup=
newHashMap,Object>();

for(finalIterator>iterator=holder.get().keySet().iterator();iterator.hasNext();){
TransmittableThreadLocalthreadLocal=iterator.next();
//对当前线程的本地变量进行副本拷贝
backup.put(threadLocal,threadLocal.get());

//若出现调用线程中不存在某个线程变量,而线程池中线程有,则删除线程池中对应的本地变量
if(!captured.containsKey(threadLocal)){
iterator.remove();
threadLocal.superRemove();
}
}
//将捕获的TTL值打入线程池获取到的线程TTL中。
setTtlValuesTo(captured);
//是一个扩展点,调用TTL的beforeExecute方法。默认实现为空
doExecuteCallback(true);
returnbackup;
}

privatestaticHashMap,Object>replayThreadLocalValues(@NonNullHashMap,Object>captured){
finalHashMap,Object>backup=
newHashMap,Object>();
for(Map.Entry,Object>entry:captured.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
backup.put(threadLocal,threadLocal.get());
finalObjectvalue=entry.getValue();
if(value==threadLocalClearMark)threadLocal.remove();
elsethreadLocal.set(value);
}
returnbackup;
}

/**
*清除单线线程的所有TTL和TL,并返回清除之气的backup
*/
@NonNull
publicstaticObjectclear(){
finalHashMap,Object>ttl2Value=
newHashMap,Object>();

finalHashMap,Object>threadLocal2Value=
newHashMap,Object>();
for(Map.Entry,TtlCopier>entry:threadLocalHolder.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
threadLocal2Value.put(threadLocal,threadLocalClearMark);
}
returnreplay(newSnapshot(ttl2Value,threadLocal2Value));
}

/**
*还原
*/
publicstaticvoidrestore(@NonNullObjectbackup){
finalSnapshotbackupSnapshot=(Snapshot)backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}

privatestaticvoidrestoreTtlValues(@NonNullHashMap,Object>backup){
//扩展点,调用TTL的afterExecute
doExecuteCallback(false);

for(finalIterator>iterator=holder.get().keySet().iterator();iterator.hasNext();){
TransmittableThreadLocalthreadLocal=iterator.next();

if(!backup.containsKey(threadLocal)){
iterator.remove();
threadLocal.superRemove();
}
}

//将本地变量恢复成备份版本
setTtlValuesTo(backup);
}

privatestaticvoidsetTtlValuesTo(@NonNullHashMap,Object>ttlValues){
for(Map.Entry,Object>entry:ttlValues.entrySet()){
TransmittableThreadLocalthreadLocal=entry.getKey();
threadLocal.set(entry.getValue());
}
}

privatestaticvoidrestoreThreadLocalValues(@NonNullHashMap,Object>backup){
for(Map.Entry,Object>entry:backup.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
threadLocal.set(entry.getValue());
}
}

/**
*快照类,保存TTL和TL
*/
privatestaticclassSnapshot{
finalHashMap,Object>ttl2Value;
finalHashMap,Object>threadLocal2Value;

privateSnapshot(HashMap,Object>ttl2Value,
HashMap,Object>threadLocal2Value){
this.ttl2Value=ttl2Value;
this.threadLocal2Value=threadLocal2Value;
}
}

进入TtlCallable#call()方法。

@Override
publicVcall()throwsException{
Objectcaptured=capturedRef.get();
if(captured==null||releaseTtlValueReferenceAfterCall&&
!capturedRef.compareAndSet(captured,null)){
thrownewIllegalStateException("TTLvaluereferenceisreleasedaftercall!");
}
//调用replay方法将捕获到的当前线程的本地变量,传递给线程池线程的本地变量,
//并且获取到线程池线程覆盖之前的本地变量副本。
Objectbackup=replay(captured);
try{
//线程方法调用
returncallable.call();
}finally{
//使用副本进行恢复。
restore(backup);
}
}

到这基本上线程池方式传递本地变量的核心代码已经大概看完了。总的来说在创建TtlCallable对象是,调用capture()方法捕获调用方的本地线程变量,在call()执行时,将捕获到的线程变量,替换到线程池所对应获取到的线程的本地变量中,并且在执行完成之后,将其本地变量恢复到调用之前。

总结

上述列举了4种方案,陈某这里推荐方案2和方案4,其中两种方案的缺点非常明显,实际开发中也是采用的方案2或者方案4。

审核编辑:汤梓红
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接口
    +关注

    关注

    33

    文章

    8611

    浏览量

    151247
  • spring
    +关注

    关注

    0

    文章

    340

    浏览量

    14353
  • Boot
    +关注

    关注

    0

    文章

    149

    浏览量

    35844
  • 线程
    +关注

    关注

    0

    文章

    505

    浏览量

    19695
  • 数据传递
    +关注

    关注

    1

    文章

    3

    浏览量

    1760

原文标题:用这4招 优雅的实现Spring Boot 异步线程间数据传递

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Spring Boot如何实现异步任务

    Spring Boot 提供了多种方式来实现异步任务,这里介绍三种主要实现方式。 1、基于注解 @Async @Async 注解是
    的头像 发表于 09-30 10:32 1443次阅读

    Spring Boot虚拟线程和Webflux性能对比

    早上看到一篇关于Spring Boot虚拟线程和Webflux性能对比的文章,觉得还不错。内容较长,抓重点给大家介绍一下这篇文章的核心内容,方便大家快速阅读。
    发表于 09-24 14:54 933次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>虚拟<b class='flag-5'>线程</b>和Webflux性能对比

    LabVIEW多线程编程数据传递教程

    很多时候在一个VI的不同线程或者不同VI的不同线程中需要有一些交互——这些线程并不能完全独立运行,需要一定的数据通信才能正确执行,这时就需要在编程时使用LabVIEW提供的
    的头像 发表于 11-24 10:05 6972次阅读
    LabVIEW多<b class='flag-5'>线程</b>编程<b class='flag-5'>数据传递</b>教程

    通过队列实现vi之间数据传递

    `各位高手,请教下如何用队列实现vi之间的数据传递,最好能给出个例子,我是初学者,谢谢`
    发表于 09-08 11:01

    请问C6678核间数据传递方式是什么?为什么是这样?

    ,但是给的例子都是SYS/BIOS下面使用的。我想请问一下是否qmss,CPPI只能在操作系统下才能使用,没有操作系统可以吗?还有别的核间数据传递方式吗?谢谢,请指教!
    发表于 06-19 02:42

    启动Spring Boot项目应用的三种方法

    ,从而使开发人员不再需要定义样板化的配置。我的话来理解,就是spring boot其实不是什么新的框架,它默认配置了很多框架的使用方式,就像maven整合了所有的jar包,spring
    发表于 01-14 17:33

    基于Spring Cloud和Euraka的优雅下线以及灰度发布

    该方式借助的是 Spring Boot 应用的 Shutdown hook,应用本身的下线也是优雅的,但如果你的服务发现组件使用的是 Eureka,那么默认最长会有 90 秒的延迟,其他应用才会感知到该服务下线
    的头像 发表于 04-20 09:52 1933次阅读

    Spring Boot Web相关的基础知识

    Boot的第一个接口。接下来将会将会介绍使用Spring Boot开发Web应用的相关内容,其主要包括使用spring-boot-starter-web组件来
    的头像 发表于 03-17 15:03 660次阅读

    简述Spring Boot数据校验

    上一篇文章我们了解了Spring Boot Web相关的知识,初步了解了spring-boot-starter-web,还了解了@Contrler和@RestController的差别,如果
    的头像 发表于 03-17 15:07 782次阅读

    Spring Boot如何优雅实现数据加密存储、模糊匹配和脱敏

    近来我们都在围绕着使用Spring Boot开发业务系统时如何保证数据安全性这个主题展开总结,当下大部分的B/S架构的系统也都是基于Spring B
    的头像 发表于 06-19 14:42 1965次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>如何<b class='flag-5'>优雅</b><b class='flag-5'>实现</b><b class='flag-5'>数据</b>加密存储、模糊匹配和脱敏

    Spring Boot Actuator快速入门

    不知道大家在写 Spring Boot 项目的过程中,使用过 Spring Boot Actuator 吗?知道 Spring
    的头像 发表于 10-09 17:11 644次阅读

    Spring Boot启动 Eureka流程

    在上篇中已经说过了 Eureka-Server 本质上是一个 web 应用的项目,今天就来看看 Spring Boot 是怎么启动 Eureka 的。 Spring Boot 启动 E
    的头像 发表于 10-10 11:40 895次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>启动 Eureka流程

    Spring Boot的启动原理

    可能很多初学者会比较困惑,Spring Boot 是如何做到将应用代码和所有的依赖打包成一个独立的 Jar 包,因为传统的 Java 项目打包成 Jar 包之后,需要通过 -classpath 属性
    的头像 发表于 10-13 11:44 653次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>的启动原理

    Spring Boot 的设计目标

    什么是Spring Boot Spring BootSpring 开源组织下的一个子项目,也是 S
    的头像 发表于 10-13 14:56 589次阅读
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> 的设计目标

    Spring Boot 3.2支持虚拟线程和原生镜像

    Spring Boot 3.2 前几日发布,让我们 Java 21、GraalVM 和虚拟线程来尝试一下。
    的头像 发表于 11-30 16:22 742次阅读