嵌入式技术
特别说明CountDownLatch
CountDownLatch的用法
CountDownLatch(num) 简单说明
主线程:mainThreadLatch.await() 和mainThreadLatch.countDown()
子线程:rollBackLatch.await() 和rollBackLatch.countDown()
为什么所有的子线程会在一瞬间就被所有都释放了?
事务的回滚是怎么结合进去的?
主线程类Entry
子线程类WorkThread
代码实际运用踩坑!!!!
特别说明CountDownLatch
CountDownLatch是一个类springboot自带的类,可以直接用 ,变量AtomicBoolean 也是可以直接使用
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/ruoyi-vue-pro
视频教程:https://doc.iocoder.cn/video/
CountDownLatch的用法
CountDownLatch典型用法:
1、某一线程在开始运行前等待n个线程执行完毕。 将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
2、实现多个线程开始执行任务的最大并行性。 注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/yudao-cloud
视频教程:https://doc.iocoder.cn/video/
CountDownLatch(num) 简单说明
new 一个 CountDownLatch(num) 对象
建立对象的时候 num 代表的是需要等待 num 个线程
// 建立对象的时候 num 代表的是需要等待 num 个线程 //主线程 CountDownLatch mainThreadLatch = new CountDownLatch(num); //子线程 CountDownLatch rollBackLatch = new CountDownLatch(1);
主线程:mainThreadLatch.await() 和mainThreadLatch.countDown()
新建对象
CountDownLatch mainThreadLatch = new CountDownLatch(num);
卡住主线程,让其等待子线程,代码mainThreadLatch.await(),放在主线程里
mainThreadLatch.await();
代码mainThreadLatch.countDown(),放在子线程里,每一个子线程运行一到这个代码,意味着CountDownLatch(num),里面的num-1(自动减一)
mainThreadLatch.countDown();
CountDownLatch(num)里面的num减到0,也就是CountDownLatch(0),被卡住的主线程mainThreadLatch.await(),就会往下执行
子线程:rollBackLatch.await() 和rollBackLatch.countDown()
新建对象,特别注意:子线程这个num就是1(关于只能为1的解答在后面)
CountDownLatch rollBackLatch = new CountDownLatch(1);
卡住子线程,阻止每一个子线程的事务提交和回滚
rollBackLatch.await();
代码rollBackLatch.countDown();放在主线程里,而且是放在主线程的等待代码mainThreadLatch.await();后面。
rollBackLatch.countDown();
为什么所有的子线程会在一瞬间就被所有都释放了?
事务的回滚是怎么结合进去的?
假设总共20个子线程,那么其中一个线程报错了怎么实现所有线程回滚。
引入变量
AtomicBoolean rollbackFlag = new AtomicBoolean(false)
和字面意思是一样的:根据 rollbackFlag 的true或者false 判断子线程里面,是否回滚。
首先我们确定的一点:rollbackFlag 是所有的子线程都用着这一个判断
主线程类Entry
package org.apache.dolphinscheduler.api.utils; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.api.controller.WorkThread; import org.apache.dolphinscheduler.common.enums.DbType; import org.springframework.web.bind.annotation.*; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.TimeZone; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @RestController @RequestMapping("importDatabase") public class Entry { /** * @param dbid 数据库的id * @param tablename 表名 * @param sftpFileName 文件名称 * @param head 是否有头文件 * @param splitSign 分隔符 * @param type 数据库类型 */ private static String SFTP_HOST = "192.168.1.92"; private static int SFTP_PORT = 22; private static String SFTP_USERNAME = "root"; private static String SFTP_PASSWORD = "rootroot"; private static String SFTP_BASEPATH = "/opt/testSFTP/"; @PostMapping("/thread") @ResponseBody public static JSONObject importDatabase(@RequestParam("dbid") int dbid ,@RequestParam("tablename") String tablename ,@RequestParam("sftpFileName") String sftpFileName ,@RequestParam("head") String head ,@RequestParam("splitSign") String splitSign ,@RequestParam("type") DbType type ,@RequestParam("heads") String heads ,@RequestParam("scolumns") String scolumns ,@RequestParam("tcolumns") String tcolumns ) throws Exception { JSONObject obForRetrun = new JSONObject(); try { JSONArray jsonArray = JSONArray.parseArray(tcolumns); JSONArray scolumnArray = JSONArray.parseArray(scolumns); JSONArray headsArray = JSONArray.parseArray(heads); ListlistInteger = getRrightDataNum(headsArray,scolumnArray); JSONArray bodys = SFTPUtils.getSftpContent(SFTP_HOST,SFTP_PORT,SFTP_USERNAME,SFTP_PASSWORD,SFTP_BASEPATH,sftpFileName,head,splitSign); int total = bodys.size(); int num = 20; //一个批次的数据有多少 int count = total/num;//周期 int lastNum =total- count*num;//余数 List list = new ArrayList (); SimpleDateFormat sdf = new SimpleDateFormat("HHss:SS"); TimeZone t = sdf.getTimeZone(); t.setRawOffset(0); sdf.setTimeZone(t); Long startTime=System.currentTimeMillis(); int countForCountDownLatch = 0; if(lastNum==0){//整除 countForCountDownLatch= count; }else{ countForCountDownLatch= count + 1; } //子线程 CountDownLatch rollBackLatch = new CountDownLatch(1); //主线程 CountDownLatch mainThreadLatch = new CountDownLatch(countForCountDownLatch); AtomicBoolean rollbackFlag = new AtomicBoolean(false); StringBuffer message = new StringBuffer(); message.append("报错信息:"); //子线程 for(int i=0;i getRrightDataNum(JSONArray headsArray, JSONArray scolumnArray){ List list = new ArrayList (); String arrayA [] = new String[headsArray.size()]; for(int i=0;i 子线程类WorkThread
package org.apache.dolphinscheduler.api.controller; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.api.service.DataSourceService; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.springframework.transaction.PlatformTransactionManager; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.TimeZone; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; /** * 多线程 */ public class WorkThread implements Runnable{ //建立线程的两种方法 1 实现Runnable 接口 2 继承 Thread 类 private DataSourceService dataSourceService; private DataSourceMapper dataSourceMapper; private Integer begin; private Integer end; private String tableName; private JSONArray columnArray; private Integer dbid; private DbType type; private JSONArray bodys; private ListlistInteger; private PlatformTransactionManager transactionManager; private CountDownLatch mainThreadLatch; private CountDownLatch rollBackLatch; private AtomicBoolean rollbackFlag; private StringBuffer message; /** * @param i * @param num * @param tableFrom * @param columnArrayFrom * @param dbidFrom * @param typeFrom */ public WorkThread(int i, int num, String tableFrom, JSONArray columnArrayFrom, int dbidFrom , DbType typeFrom, JSONArray bodysFrom, List listIntegerFrom ,CountDownLatch mainThreadLatch,CountDownLatch rollBackLatch,AtomicBoolean rollbackFlag ,StringBuffer messageFrom) { begin=i*num; end=begin+num; tableName = tableFrom; columnArray = columnArrayFrom; dbid = dbidFrom; type = typeFrom; bodys = bodysFrom; listInteger = listIntegerFrom; this.dataSourceMapper = SpringApplicationContext.getBean(DataSourceMapper.class); this.dataSourceService = SpringApplicationContext.getBean(DataSourceService.class); this.transactionManager = SpringApplicationContext.getBean(PlatformTransactionManager.class); this.mainThreadLatch = mainThreadLatch; this.rollBackLatch = rollBackLatch; this.rollbackFlag = rollbackFlag; this.message = messageFrom; } public void run() { DataSource dataSource = dataSourceMapper.queryDataSourceByID(dbid); String cp = dataSource.getConnectionParams(); Connection con=null; con = dataSourceService.getConnection(type,cp); if(con!=null) { SimpleDateFormat sdf = new SimpleDateFormat("HHss:SS"); TimeZone t = sdf.getTimeZone(); t.setRawOffset(0); sdf.setTimeZone(t); Long startTime = System.currentTimeMillis(); try { con.setAutoCommit(false); //---------------------------- 获取字段和类型 String columnString = null;//活动的字段 int intForType = 0; String type[] = new String[columnArray.size()];//类型集合 for(int i=0;i listInteger,String[] array){ String [] arrayFinal = new String [listInteger.size()]; for(int i=0;i 代码实际运用踩坑!!!!
还记得这里有个一批次处理多少数据么,我这边设置了20,实际到运用中的时候客户给了个20W的数据,我批次设置为20,那就有1W个子线程!!!!
这还不是最糟糕的,最糟糕的是每个子线程都会创建一个数据库连接,数据库直接被我搞炸了
所以这里需要把:
int num = 20; //一个批次的数据有多少
改成:
int num = 20000; //一个批次的数据有多少
编辑:黄飞
全部0条评论
快来发表一下你的评论吧 !