1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        SpringBoot @Async異步注解上下文透傳

        共 21160字,需瀏覽 43分鐘

         ·

        2021-07-09 14:55

        公眾號關注 “GitHub今日熱榜
        設為 “星標”,帶你挖掘更多開發(fā)神器!






        上一篇文章說到,之前使用了@Async注解,子線程無法獲取到上下文信息,導致流量無法打到灰度,然后改成 線程池的方式,每次調(diào)用異步調(diào)用的時候都手動透傳 上下文(硬編碼)解決了問題。


        后面查閱了資料,找到了方案不用每次硬編碼,來上下文透傳數(shù)據(jù)了。


        方案一:


        繼承線程池,重寫相應的方法,透傳上下文。


        方案二:(推薦)


        線程池ThreadPoolTaskExecutor,有一個TaskDecorator裝飾器,實現(xiàn)這個接口,透傳上下文。


        方案一:繼承線程池,重寫相應的方法,透傳上下文。


        1、ThreadPoolTaskExecutor  spring封裝的線程池


        @Bean(ExecutorConstant.simpleExecutor_3)
            public Executor asyncExecutor3() {
                MyThreadPoolTaskExecutor taskExecutor = new MyThreadPoolTaskExecutor();
                taskExecutor.setCorePoolSize(corePoolSize);
                taskExecutor.setMaxPoolSize(maxPoolSize);
                taskExecutor.setQueueCapacity(queueCapacity);
                taskExecutor.setThreadNamePrefix(threadNamePrefix_3);
                taskExecutor.initialize();
                return taskExecutor;
            }

            //------- 繼承父類 重寫對應的方法 start
            class MyCallable<T> implements Callable<T> {
                private Callable<T> task;
                private RequestAttributes context;

                public MyCallable(Callable<T> task, RequestAttributes context) {
                    this.task = task;
                    this.context = context;
                }

                @Override
                public T call() throws Exception {
                    if (context != null) {
                        RequestContextHolder.setRequestAttributes(context);
                    }

                    try {
                        return task.call();
                    } finally {
                        RequestContextHolder.resetRequestAttributes();
                    }
                }
            }
            class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor{

                @Override
                public <T> Future<T> submit(Callable<T> task) {
                    return super.submit(new MyCallable(task, RequestContextHolder.currentRequestAttributes()));
                }

                @Override
                public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
                    return super.submitListenable(new MyCallable(task, RequestContextHolder.currentRequestAttributes()));
                }
            }
            //------- 繼承父類 重寫對應的方法 end


        1、MyCallable是繼承Callable,創(chuàng)建MyCallable對象的時候已經(jīng)把Attributes對象賦值給屬性context了(創(chuàng)建MyCallable對象的時候因為實在當前主線程創(chuàng)建的,所以是能獲取到請求的Attributes),在執(zhí)行call方法前,先執(zhí)行了RequestContextHolder.setRequestAttributes(context); 【把這個MyCallable對象的屬性context 設置到setRequestAttributes中】 所以在執(zhí)行具體業(yè)務時,當前線程(子線程)就能取得主線程的Attributes


        2、MyThreadPoolTaskExecutor類是繼承了ThreadPoolTaskExecutor 重寫了submit和submitListenable方法


        為什么是重寫submit和submitListenable這兩個方法?


        @Async AOP源碼的方法位置是在:AsyncExecutionInterceptor.invoke

        doSubmit方法能看出來


        無返回值調(diào)用的是線程池方法:submit()

        有返回值,根據(jù)不同的返回類型也知道:


        1. 返回值類型是:Future.class 調(diào)用的是方法:submit()

        2. 返回值類型是:ListenableFuture.class 調(diào)用的方法是:submitListenable(task)

        3. 返回值類型是:CompletableFuture.class調(diào)用的是CompletableFuture.supplyAsync這個在異步注解中暫時用不上的,就不考慮重寫了。


        public Object invoke(final MethodInvocation invocation) throws Throwable {
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
            Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
            final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

            AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
            if (executor == null) {
              throw new IllegalStateException(
                  "No executor specified and no default executor set on AsyncExecutionInterceptor either");
            }

            Callable<Object> task = () -> {
              try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                  return ((Future<?>) result).get();
                }
              }
              catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
              }
              catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
              }
              return null;
            };

            return doSubmit(task, executor, invocation.getMethod().getReturnType());
          }

          @Nullable
          protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
            if (CompletableFuture.class.isAssignableFrom(returnType)) {
              return CompletableFuture.supplyAsync(() -> {
                try {
                  return task.call();
                }
                catch (Throwable ex) {
                  throw new CompletionException(ex);
                }
              }, executor);
            }
            else if (ListenableFuture.class.isAssignableFrom(returnType)) {
              return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
            }
            else if (Future.class.isAssignableFrom(returnType)) {
              return executor.submit(task);
            }
            else {
              executor.submit(task);
              return null;
            }
          }


        2、ThreadPoolExecutor 原生線程池


        ThreadPoolExecutor線程池代碼如下:


        //------- ThreadPoolExecutor 繼承父類 重寫對應的方法 start
            class MyRunnable implements Runnable {
                private Runnable runnable;
                private RequestAttributes context;

                public MyRunnable(Runnable runnable, RequestAttributes context) {
                    this.runnable = runnable;
                    this.context = context;
                }

                @Override
                public void run() {
                    if (context != null) {
                        RequestContextHolder.setRequestAttributes(context);
                    }
                    try {
                        runnable.run();
                    } finally {
                        RequestContextHolder.resetRequestAttributes();
                    }
                }
            }

            class MyThreadPoolExecutor extends ThreadPoolExecutor{
                @Override
                public void execute(Runnable command) {
                    if(!(command instanceof MyRunnable)){
                        command = new MyRunnable(command,RequestContextHolder.currentRequestAttributes())
                    }
                    super.execute(command);
                }

                public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
                }

                public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
                }

                public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
                }

                public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
                }
            }
            //------- ThreadPoolExecutor 繼承父類 重寫對應的方法 end


        像ThreadPoolExecutor主要重寫execute方法,在啟動新線程的時候先把Attributes取到放到MyRunnable對象的一個屬性中,MyRunnable在具體執(zhí)行run方法的時候,把屬性Attributes賦值到子線程中,當run方法執(zhí)行完了在把Attributes清空掉。


        為什么只要重寫了execute方法就可以了?


        ThreadPoolExecutor大家都知道主要是由submit和execute方法來執(zhí)行的。


        看ThreadPoolExecutor類的submit具體執(zhí)行方法是由父類AbstractExecutorService#submit來實現(xiàn)。


        具體代碼在下面貼出來了,可以看到submit實際上最后調(diào)用的還是execute方法,所以我們重寫execute方法就好了。


        submit方法路徑及源碼:


        public Future<?> submit(Runnable task) {
                if (task == null) throw new NullPointerException();
                RunnableFuture<Void> ftask = newTaskFor(task, null);
                execute(ftask);
                return ftask;
            }

            /**
             * @throws RejectedExecutionException {@inheritDoc}
             * @throws NullPointerException {@inheritDoc}
             */

            public <T> Future<T> submit(Runnable task, T result) {
                if (task == null) throw new NullPointerException();
                RunnableFuture<T> ftask = newTaskFor(task, result);
                execute(ftask);
                return ftask;
            }

            /**
             * @throws RejectedExecutionException {@inheritDoc}
             * @throws NullPointerException {@inheritDoc}
             */

            public <T> Future<T> submit(Callable<T> task) {
                if (task == null) throw new NullPointerException();
                RunnableFuture<T> ftask = newTaskFor(task);
                execute(ftask);
                return ftask;
            }


        方案二:(推薦)ThreadPoolTaskExecutor線程池


        實現(xiàn)TaskDecorator接口,把實現(xiàn)類設置到taskExecutor.setTaskDecorator(new MyTaskDecorator());


        //------- 實現(xiàn)TaskDecorator 接口 start

            @Bean(ExecutorConstant.simpleExecutor_4)
            public Executor asyncExecutor4() {
                MyThreadPoolTaskExecutor taskExecutor = new MyThreadPoolTaskExecutor();
                taskExecutor.setCorePoolSize(corePoolSize);
                taskExecutor.setMaxPoolSize(maxPoolSize);
                taskExecutor.setQueueCapacity(queueCapacity);
                taskExecutor.setThreadNamePrefix(threadNamePrefix_4);
                taskExecutor.setTaskDecorator(new MyTaskDecorator());
                taskExecutor.initialize();
                return taskExecutor;
            }

            class MyTaskDecorator implements TaskDecorator{

                @Override
                public Runnable decorate(Runnable runnable) {
                    try {
                        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
                        return () -> {
                            try {
                                RequestContextHolder.setRequestAttributes(attributes);
                                runnable.run();
                            } finally {
                                RequestContextHolder.resetRequestAttributes();
                            }
                        };
                    } catch (IllegalStateException e) {
                        return runnable;
                    }
                }
            }
            //------- 實現(xiàn)TaskDecorator 接口 end


        為什么設置了setTaskDecorator就能實現(xiàn)透傳數(shù)據(jù)了?


        主要還是看taskExecutor.initialize()方法,主要是重寫了ThreadPoolExecutor的execute方法,用裝飾器模式 增強了Runnable接口,源代碼如下:


        @Nullable
          private ThreadPoolExecutor threadPoolExecutor;

          //初始化方法
          public void initialize() {
            if (logger.isDebugEnabled()) {
              logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
            }
            if (!this.threadNamePrefixSet && this.beanName != null) {
              setThreadNamePrefix(this.beanName + "-");
            }
            this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
          }

          @Override
          protected ExecutorService initializeExecutor(
              ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler)
         
        {

            BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

            ThreadPoolExecutor executor;

            //判斷是否設置了,taskDecorator裝飾器
            if (this.taskDecorator != null) {
              executor = new ThreadPoolExecutor(
                  this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                  queue, threadFactory, rejectedExecutionHandler) {
                @Override
                public void execute(Runnable command) {
                  //執(zhí)行裝飾器方法包裝Runnable接口
                  Runnable decorated = taskDecorator.decorate(command);
                  if (decorated != command) {
                    decoratedTaskMap.put(decorated, command);
                  }
                  super.execute(decorated);
                }
              };
            }
            else {
              executor = new ThreadPoolExecutor(
                  this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                  queue, threadFactory, rejectedExecutionHandler);

            }

            if (this.allowCoreThreadTimeOut) {
              executor.allowCoreThreadTimeOut(true);
            }
            //把初始化好的ThreadPoolExecutor線程池賦值給 當前類屬性threadPoolExecutor
            this.threadPoolExecutor = executor;
            return executor;
          }


        總結


        無論是方案1還是方案2,原理都是先在當前線程獲取到Attributes,然后把Attributes賦值到Runnable的一個屬性中,在起一個子線程后,具體執(zhí)行run方法的時候,把Attributes設置給當子線程,當run方法執(zhí)行完了,在清空Attributes。


        方案2實現(xiàn)比較優(yōu)雅,所以推薦使用它。



        出處:cnblogs.com/x-kq/p/14911497.html










        關注GitHub今日熱榜,專注挖掘好用的開發(fā)工具,致力于分享優(yōu)質高效的工具、資源、插件等,助力開發(fā)者成長!








        點個在看,你最好看



        瀏覽 143
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            伊人精品A片一区二区三区| 高潮流水视频| 亚洲天堂无码视频| 成人性在线| 国产探花在线观看| 国产久久在线| A视频免费在线观看| 黄色电影视频网站| 久草福利在线| 中文字幕无码免费|