1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        支付寶:多線程事務(wù)怎么回滾?說用 @Transactional 可以回去等通知了!

        共 20021字,需瀏覽 41分鐘

         ·

        2023-02-04 18:29

        點(diǎn)擊藍(lán)色“程序員黃小斜”關(guān)注我喲

        加個(gè)“星標(biāo)”,每天和你一起多進(jìn)步一點(diǎn)點(diǎn)


        來源:blog.csdn.net/weixin_43225491/article/details/117705686

        • 背景介紹
        • 公用的類和方法
        • 示例事務(wù)不成功操作

        背景介紹

        1,最近有一個(gè)大數(shù)據(jù)量插入的操作入庫的業(yè)務(wù)場(chǎng)景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會(huì)很多,用到多線程去拆分?jǐn)?shù)據(jù)并行處理來提高響應(yīng)時(shí)間,如果有一個(gè)線程執(zhí)行失敗,則全部回滾。

        2,在spring中可以使用@Transactional注解去控制事務(wù),使出現(xiàn)異常時(shí)會(huì)進(jìn)行回滾,在多線程中,這個(gè)注解則不會(huì)生效,如果主線程需要先執(zhí)行一些修改數(shù)據(jù)庫的操作,當(dāng)子線程在進(jìn)行處理出現(xiàn)異常時(shí),主線程修改的數(shù)據(jù)則不會(huì)回滾,導(dǎo)致數(shù)據(jù)錯(cuò)誤。

        3,下面用一個(gè)簡(jiǎn)單示例演示多線程事務(wù)。

        公用的類和方法

        /**
         * 平均拆分list方法.
         * @param source
         * @param n
         * @param <T>
         * @return
         */
        public static <T> List<List<T>> averageAssign(List<T> source,int n){
            List<List<T>> result=new ArrayList<List<T>>();
            int remaider=source.size()%n; 
            int number=source.size()/n; 
            int offset=0;//偏移量
            for(int i=0;i<n;i++){
                List<T> value=null;
                if(remaider>0){
                    value=source.subList(i*number+offset, (i+1)*number+offset+1);
                    remaider--;
                    offset++;
                }else{
                    value=source.subList(i*number+offset, (i+1)*number+offset);
                }
                result.add(value);
            }
            return result;
        }
        /**  線程池配置
         * @version V1.0
         */
        public class ExecutorConfig {
            private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
            private volatile static ExecutorService executorService;
            public static ExecutorService getThreadPool() {
                if (executorService == null){
                    synchronized (ExecutorConfig.class){
                        if (executorService == null){
                            executorService =  newThreadPool();
                        }
                    }
                }
                return executorService;
            }

            private static  ExecutorService newThreadPool(){
                int queueSize = 500;
                int corePool = Math.min(5, maxPoolSize);
                return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
            }
            private ExecutorConfig(){}
        }
        /** 獲取sqlSession
         * @author 86182
         * @version V1.0
         */
        @Component
        public class SqlContext {
            @Resource
            private SqlSessionTemplate sqlSessionTemplate;

            public SqlSession getSqlSession(){
                SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
                return sqlSessionFactory.openSession();
            }
        }

        示例事務(wù)不成功操作

          /**
         * 測(cè)試多線程事務(wù).
         * @param employeeDOList
         */
        @Override
        @Transactional
        public void saveThread(List<EmployeeDO> employeeDOList) {
            try {
                //先做刪除操作,如果子線程出現(xiàn)異常,此操作不會(huì)回滾
                this.getBaseMapper().delete(null);
                //獲取線程池
                ExecutorService service = ExecutorConfig.getThreadPool();
                //拆分?jǐn)?shù)據(jù),拆分5份
                List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                //執(zhí)行的線程
                Thread []threadArray = new Thread[lists.size()];
                //監(jiān)控子線程執(zhí)行完畢,再執(zhí)行主線程,要不然會(huì)導(dǎo)致主線程關(guān)閉,子線程也會(huì)隨著關(guān)閉
                CountDownLatch countDownLatch = new CountDownLatch(lists.size());
                AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                for (int i =0;i<lists.size();i++){
                    if (i==lists.size()-1){
                        atomicBoolean.set(false);
                    }
                    List<EmployeeDO> list  = lists.get(i);
                    threadArray[i] =  new Thread(() -> {
                        try {
                         //最后一個(gè)線程拋出異常
                            if (!atomicBoolean.get()){
                                throw new ServiceException("001","出現(xiàn)異常");
                            }
                            //批量添加,mybatisPlus中自帶的batch方法
                            this.saveBatch(list);
                        }finally {
                            countDownLatch.countDown();
                        }

                    });
                }
                for (int i = 0; i <lists.size(); i++){
                    service.execute(threadArray[i]);
                }
                //當(dāng)子線程執(zhí)行完畢時(shí),主線程再往下執(zhí)行
                countDownLatch.await();
                System.out.println("添加完畢");
            }catch (Exception e){
                log.info("error",e);
                throw new ServiceException("002","出現(xiàn)異常");
            }finally {
                 connection.close();
             }
        }

        數(shù)據(jù)庫中存在一條數(shù)據(jù):

        圖片
        //測(cè)試用例
        @RunWith(SpringRunner.class)
        @SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
        public class ThreadTest01 {

            @Resource
            private EmployeeBO employeeBO;

            /**
             *   測(cè)試多線程事務(wù).
             * @throws InterruptedException
             */
            @Test
            public  void MoreThreadTest2() throws InterruptedException {
                int size = 10;
                List<EmployeeDO> employeeDOList = new ArrayList<>(size);
                for (int i = 0; i<size;i++){
                    EmployeeDO employeeDO = new EmployeeDO();
                    employeeDO.setEmployeeName("lol"+i);
                    employeeDO.setAge(18);
                    employeeDO.setGender(1);
                    employeeDO.setIdNumber(i+"XX");
                    employeeDO.setCreatTime(Calendar.getInstance().getTime());
                    employeeDOList.add(employeeDO);
                }
                try {
                    employeeBO.saveThread(employeeDOList);
                    System.out.println("添加成功");
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

        測(cè)試結(jié)果:

        圖片

        圖片

        可以發(fā)現(xiàn)子線程組執(zhí)行時(shí),有一個(gè)線程執(zhí)行失敗,其他線程也會(huì)拋出異常,但是主線程中執(zhí)行的刪除操作,沒有回滾,@Transactional注解沒有生效。

        使用sqlSession控制手動(dòng)提交事務(wù)

         @Resource
          SqlContext sqlContext;
         /**
         * 測(cè)試多線程事務(wù).
         * @param employeeDOList
         */
        @Override
        public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
            // 獲取數(shù)據(jù)庫連接,獲取會(huì)話(內(nèi)部自有事務(wù))
            SqlSession sqlSession = sqlContext.getSqlSession();
            Connection connection = sqlSession.getConnection();
            try {
                // 設(shè)置手動(dòng)提交
                connection.setAutoCommit(false);
                //獲取mapper
                EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
                //先做刪除操作
                employeeMapper.delete(null);
                //獲取執(zhí)行器
                ExecutorService service = ExecutorConfig.getThreadPool();
                List<Callable<Integer>> callableList  = new ArrayList<>();
                //拆分list
                List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                for (int i =0;i<lists.size();i++){
                    if (i==lists.size()-1){
                        atomicBoolean.set(false);
                    }
                    List<EmployeeDO> list  = lists.get(i);
                    //使用返回結(jié)果的callable去執(zhí)行,
                    Callable<Integer> callable = () -> {
                        //讓最后一個(gè)線程拋出異常
                        if (!atomicBoolean.get()){
                            throw new ServiceException("001","出現(xiàn)異常");
                        }
                      return employeeMapper.saveBatch(list);
                    };
                    callableList.add(callable);
                }
                //執(zhí)行子線程
               List<Future<Integer>> futures = service.invokeAll(callableList);
                for (Future<Integer> future:futures) {
                //如果有一個(gè)執(zhí)行不成功,則全部回滾
                    if (future.get()<=0){
                        connection.rollback();
                         return;
                    }
                }
                connection.commit();
                System.out.println("添加完畢");
            }catch (Exception e){
                connection.rollback();
                log.info("error",e);
                throw new ServiceException("002","出現(xiàn)異常");
            }finally {
                 connection.close();
             }
        }
        // sql
        <insert id="saveBatch" parameterType="List">
         INSERT INTO
         employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
         values
             <foreach collection="list" item="item" index="index" separator=",">
             (
             #{item.employeeId},
             #{item.age},
             #{item.employeeName},
             #{item.birthDate},
             #{item.gender},
             #{item.idNumber},
             #{item.creatTime},
             #{item.updateTime},
             #{item.status}
                 )
             </foreach>
         </insert>

        數(shù)據(jù)庫中一條數(shù)據(jù):

        圖片

        測(cè)試結(jié)果:拋出異常,

        圖片

        刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫中的數(shù)據(jù)依舊存在,說明事務(wù)成功了。

        圖片

        成功操作示例:

         @Resource
        SqlContext sqlContext;
        /**
         * 測(cè)試多線程事務(wù).
         * @param employeeDOList
         */
        @Override
        public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
            // 獲取數(shù)據(jù)庫連接,獲取會(huì)話(內(nèi)部自有事務(wù))
            SqlSession sqlSession = sqlContext.getSqlSession();
            Connection connection = sqlSession.getConnection();
            try {
                // 設(shè)置手動(dòng)提交
                connection.setAutoCommit(false);
                EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
                //先做刪除操作
                employeeMapper.delete(null);
                ExecutorService service = ExecutorConfig.getThreadPool();
                List<Callable<Integer>> callableList  = new ArrayList<>();
                List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                for (int i =0;i<lists.size();i++){
                    List<EmployeeDO> list  = lists.get(i);
                    Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
                    callableList.add(callable);
                }
                //執(zhí)行子線程
               List<Future<Integer>> futures = service.invokeAll(callableList);
                for (Future<Integer> future:futures) {
                    if (future.get()<=0){
                        connection.rollback();
                         return;
                    }
                }
                connection.commit();
                System.out.println("添加完畢");
            }catch (Exception e){
                connection.rollback();
                log.info("error",e);
                throw new ServiceException("002","出現(xiàn)異常");
               // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
            }
        }

        測(cè)試結(jié)果:

        圖片

        數(shù)據(jù)庫中數(shù)據(jù):

        刪除的刪除了,添加的添加成功了,測(cè)試成功。

        圖片

        清華讀研三年再拿字節(jié)offer,沒想到月薪比三年前更低


        這樣寫代碼,同事樂開花


        程序員面試,能不能不考“八股文”?


        — 【 THE END 】—
        公眾號(hào)[程序員黃小斜]全部博文已整理成一個(gè)目錄,請(qǐng)?jiān)诠娞?hào)里回復(fù)「m」獲?。?/span>

        最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊(cè),覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

        獲取方式:點(diǎn)“在看”,關(guān)注公眾號(hào)并回復(fù) PDF 領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

        文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。

        謝謝支持喲 (*^__^*)


        瀏覽 64
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            看黄片的网页 | 好湿好紧太硬了我太爽了软件 | 91色噜噜狠狠色婷婷 | AV黄色小说 | 一本一道A片无码视频下载 | 久久久日韩影视噜噜 | 人妻专区 | 中文字幕avav | 国产91对白在线播放九色 | 中年艳妇乱小说 |