1. DUBBO消費異步化實例與原理

        共 57556字,需瀏覽 116分鐘

         ·

        2021-05-12 16:01


        JAVA前線 


        歡迎大家關(guān)注公眾號「JAVA前線」查看更多精彩分享,主要包括源碼分析、實際應(yīng)用、架構(gòu)思維、職場分享、產(chǎn)品思考等等,同時也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)



        1 文章概述

        我們在服務(wù)端開發(fā)時如果需要實現(xiàn)異步調(diào)用,首先聲明一個線程池,并將調(diào)用業(yè)務(wù)方法封裝成一個任務(wù)提交至線程池,如果不需要獲取返回值則封裝為Runnable,需要獲取返回值則封裝為Callable并通過Future對象接受結(jié)果。

        class CalcTask1 implements Callable<Integer{

            @Override
            public Integer call() throws Exception {
                System.out.println("task1耗時計算");
                Thread.sleep(1000L);
                return 100;
            }
        }

        class CalcTask2 implements Callable<Integer{

            @Override
            public Integer call() throws Exception {
                System.out.println("task2耗時計算");
                Thread.sleep(3000L);
                return 200;
            }
        }

        public class CallableTest {

            public static void test1() throws Exception {
                ExecutorService executor = Executors.newCachedThreadPool();
                CalcTask1 task1 = new CalcTask1();
                Future<Integer> f1 = executor.submit(task1);
                CalcTask2 task2 = new CalcTask2();
                Future<Integer> f2 = executor.submit(task2);
                Integer result1 = f1.get();
                Integer result2 = f2.get();
                System.out.println("final result=" + (result1 + result2));
                executor.shutdown();
            }

            public static void test2() throws Exception {
                ExecutorService executor = Executors.newCachedThreadPool();
                List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
                CalcTask1 task1 = new CalcTask1();
                CalcTask2 task2 = new CalcTask2();
                tasks.add(task1);
                tasks.add(task2);
                for (int i = 0; i < tasks.size(); i++) {
                    Future<Integer> future = executor.submit(tasks.get(i));
                    System.out.println("result=" + future.get());
                }
                executor.shutdown();
            }
        }

        1.1 什么是消費異步化

        在使用DUBBO進(jìn)行異步化調(diào)用時不需要這么麻煩,DUBBO基于NIO非阻塞能力使得服務(wù)消費者無需啟用多線程就可以實現(xiàn)并行調(diào)用多個服務(wù),在此我們給出基于2.7.0版本調(diào)用實例。

        1.1.1 生產(chǎn)者

        (1) 服務(wù)聲明

        public interface CalcSumService {
            public Integer sum(int a, int b);
        }

        public class CalcSumServiceImpl implements CalcSumService {

            @Override
            public Integer sum(int a, int b) {
                return a + b;
            }
        }

        public interface CalcSubtractionService {
            public Integer subtraction(int a, int b);
        }

        public class CalcSubtractionServiceImpl implements CalcSubtractionService {

            @Override
            public Integer subtraction(int a, int b) {
                return a - b;
            }
        }

        (2) 配置文件

        <beans>
          <dubbo:application name="java-front-provider" />
          <dubbo:registry address="zookeeper://127.0.0.1:2181" />
          <dubbo:protocol name="dubbo" port="9999" />
          <bean id="calcSumService" class="com.java.front.dubbo.demo.provider.service.CalcSumServiceImpl" />
          <bean id="calcSubtractionService" class="com.java.front.dubbo.demo.provider.service.CalcSubtractionServiceImpl" />
          <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSumService" ref="calcSumService" />
          <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" ref="calcSubtractionService" />
        </beans>

        (3) 服務(wù)發(fā)布

        public class Provider {
            public static void main(String[] args) throws Exception {
                ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:META-INF/spring/dubbo-provider.xml");
                context.start();
                System.out.println(context);
                System.in.read();
            }
        }

        1.1.2 消費者

        (1) 配置文件

        <beans>
          <dubbo:application name="java-front-consumer" />
          <dubbo:registry address="zookeeper://127.0.0.1:2181" />
          <dubbo:reference id="calcSumService" interface="com.java.front.dubbo.demo.provider.service.CalcSumService" timeout="10000">
            <dubbo:method name="sum" async="true" />
          </dubbo:reference>
          <dubbo:reference id="calcSubtractionService" interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" timeout="10000">
            <dubbo:method name="subtraction" async="true" />
          </dubbo:reference>
        </beans>

        (2) 服務(wù)消費

        public class Consumer {

            public static void main(String[] args) throws Exception {
                testAsync();
                System.in.read();
            }

            public static void testAsync() {
                try {
                    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                    System.out.println(context);
                    context.start();

                    /** 加法運算 **/
                    CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                    calcSumService.sum(32);
                    CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();

                    /** 減法運算 **/
                    CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                    calcSubtractionService.subtraction(32);
                    CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();

                    /** 輸出結(jié)果 **/
                    int sumResult = futureSum.get();
                    int subtractionResult = futureSubtraction.get();
                    System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        1.2 為什么消費異步化

        異步化可以將原本串行的調(diào)用并行化,減少執(zhí)行時間從而提升性能。假設(shè)上述實例加法服務(wù)需要100ms,減法服務(wù)需要200ms,那么串行化執(zhí)行時間為二者之和300ms:



        如果消費異步化那么執(zhí)行時間減少為二者最大值200ms,異步化所帶來的性能提升不言而喻:



        2 保護(hù)性暫停模式

        分析DUBBO源碼之前我們首先介紹一種多線程設(shè)計模式:保護(hù)性暫停模式。我們設(shè)想這樣一種場景:線程A生產(chǎn)數(shù)據(jù),線程B讀取這個數(shù)據(jù)。我們必須面對一種情況:線程B準(zhǔn)備讀取數(shù)據(jù)時,此時線程A還沒有生產(chǎn)出數(shù)據(jù)。在這種情況下線程B不能一直空轉(zhuǎn),也不能立即退出,線程B要等到生產(chǎn)數(shù)據(jù)完成并拿到數(shù)據(jù)之后才退出。

        那么在數(shù)據(jù)沒有生產(chǎn)出這段時間,線程B需要執(zhí)行一種等待機(jī)制,這樣可以達(dá)到對系統(tǒng)保護(hù)目的,這就是保護(hù)性暫停。

        public class MyData implements Serializable {
            private static final long serialVersionUID = 1L;
            private String message;

            public MyData(String message) {
                this.message = message;
            }
        }

        class Resource {
            private MyData data;
            private Object lock = new Object();

            public MyData getData() {
                synchronized (lock) {
                    while (data == null) {
                        try {
                            // 沒有數(shù)據(jù)則釋放鎖并暫停等待被喚醒
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return data;
                }
            }

            public void sendData(MyData data) {
                synchronized (lock) {
                    // 生產(chǎn)數(shù)據(jù)后喚醒消費線程
                    this.data = data;
                    lock.notifyAll();
                }
            }
        }

        public class ProtectDesignTest {
            public static void main(String[] args) {
                Resource resource = new Resource();
                new Thread(() -> {
                    try {
                        MyData data = new MyData("hello");
                        System.out.println(Thread.currentThread().getName() + "生產(chǎn)數(shù)據(jù)=" + data);
                        // 模擬發(fā)送耗時
                        TimeUnit.SECONDS.sleep(3);
                        resource.sendData(data);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }, "t1").start();

                new Thread(() -> {
                    MyData data = resource.getData();
                    System.out.println(Thread.currentThread().getName() + "接收到數(shù)據(jù)=" + data);
                }, "t2").start();
            }
        }

        在上述代碼實例中線程1生產(chǎn)數(shù)據(jù),線程2消費數(shù)據(jù),Resource類通過wait/notify實現(xiàn)了保護(hù)性暫停模式,關(guān)于保護(hù)性暫停模式請參看我之前《保護(hù)性暫停模式詳解以及其在DUBBO應(yīng)用源碼分析》這篇文章。


        3 源碼分析

        本章節(jié)我們分析對比2.6.9和2.7.0兩個版本源碼,之所以選取這兩個版本是因為2.7.0是一個里程碑版本,異步化能力得到了明顯增強(qiáng)。


        3.1 version_2.6.9

        3.1.1 異步調(diào)用

        我們首先看看這個版本異步調(diào)用使用方式,生產(chǎn)者內(nèi)容和消費者配置文件同第一章節(jié)不再贅述,我們重點分析服務(wù)消費代碼。

        public class AsyncConsumer {

            public static void main(String[] args) throws Exception {
                test1();
                System.in.read();
            }

            public static void test1() throws Exception {
                ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                System.out.println(context);
                context.start();

                /** 加法運算 **/
                CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                calcSumService.sum(32);
                Future<Integer> futureSum = RpcContext.getContext().getFuture();

                /** 減法運算 **/
                CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                calcSubtractionService.subtraction(32);
                Future<Integer> futureSubtraction = RpcContext.getContext().getFuture();

                /** 輸出結(jié)果 **/
                int sumResult = futureSum.get();
                int subtractionResult = futureSubtraction.get();
                System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
            }
        }

        消費者最終執(zhí)行DubboInvoker.doInvoke,這個方法包含異步調(diào)用核心:

        public class DubboInvoker<Textends AbstractInvoker<T{

            @Override
            protected Result doInvoke(final Invocation invocation) throws Throwable {
                RpcInvocation inv = (RpcInvocation) invocation;
                final String methodName = RpcUtils.getMethodName(invocation);
                inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
                inv.setAttachment(Constants.VERSION_KEY, version);

                ExchangeClient currentClient;
                if (clients.length == 1) {
                    currentClient = clients[0];
                } else {
                    currentClient = clients[index.getAndIncrement() % clients.length];
                }
                try {
                    boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                    int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                    // 單向調(diào)用
                    if (isOneway) {
                        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                        currentClient.send(inv, isSent);
                        RpcContext.getContext().setFuture(null);
                        return new RpcResult();
                    }
                    // 異步調(diào)用
                    else if (isAsync) {
                        // 發(fā)起請求給生產(chǎn)者
                        ResponseFuture future = currentClient.request(inv, timeout);
                        // 設(shè)置future對象至上下文
                        RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                        // 返回空結(jié)果
                        return new RpcResult();
                    }
                    // 同步調(diào)用
                    else {
                        RpcContext.getContext().setFuture(null);
                        return (Result) currentClient.request(inv, timeout).get();
                    }
                } catch (TimeoutException e) {
                    throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                } catch (RemotingException e) {
                    throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                }
            }
        }

        如果包含async屬性則表示異步調(diào)用,第一步發(fā)送調(diào)用請求給生產(chǎn)者,第二步設(shè)置Future對象至上下文,第三步立即返回空結(jié)果。那么在服務(wù)消費時關(guān)鍵一步就是獲取Future對象,所以我們在調(diào)用時要從上下文獲取Future對象:

        CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
        calcSumService.sum(32);
        Future<Integer> futureSum = RpcContext.getContext().getFuture();

        使用Future對象獲取結(jié)果:

        int sumResult = futureSum.get();

        進(jìn)入FutureAdapter.get()方法:

        public class FutureAdapter<Vimplements Future<V{
            private final ResponseFuture future;

            public V get() throws InterruptedException, ExecutionException {
                try {
                    return (V) (((Result) future.get()).recreate());
                } catch (RemotingException e) {
                    throw new ExecutionException(e.getMessage(), e);
                } catch (Throwable e) {
                    throw new RpcException(e);
                }
            }
        }

        進(jìn)入ResponseFuture.get()方法,我們可以看到保護(hù)性暫停模式應(yīng)用,當(dāng)生產(chǎn)者線程沒有返回數(shù)據(jù)則阻塞并等待被喚醒:

        public class DefaultFuture implements ResponseFuture {
            private final Lock lock = new ReentrantLock();
            private final Condition done = lock.newCondition();

            @Override
            public Object get() throws RemotingException {
                return get(timeout);
            }

            @Override
            public Object get(int timeout) throws RemotingException {
                if (timeout <= 0) {
                    timeout = Constants.DEFAULT_TIMEOUT;
                }
                if (!isDone()) {
                    long start = System.currentTimeMillis();
                    lock.lock();
                    try {
                        while (!isDone()) {
                            // 遠(yuǎn)程調(diào)用未完成則等待被喚醒
                            done.await(timeout, TimeUnit.MILLISECONDS);
                            // 超時時間未完成則退出
                            if (isDone() || System.currentTimeMillis() - start > timeout) {
                                break;
                            }
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } finally {
                        lock.unlock();
                    }
                    // 拋出超時異常
                    if (!isDone()) {
                        throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                    }
                }
                return returnFromResponse();
            }
        }

        當(dāng)消費者接收到生產(chǎn)者響應(yīng)時會調(diào)用received方法喚醒相關(guān)阻塞線程,這時阻塞在get方法中的線程即可獲取到數(shù)據(jù):

        public class DefaultFuture implements ResponseFuture {
            private final Lock lock = new ReentrantLock();
            private final Condition done = lock.newCondition();

            public static void received(Channel channel, Response response) {
                try {
                    // 根據(jù)唯一請求號獲取Future
                    DefaultFuture future = FUTURES.remove(response.getId());
                    if (future != null) {
                        future.doReceived(response);
                    } else {
                        logger.warn("The timeout response finally returned at "
                                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                                    + ", response " + response
                                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                                       + " -> " + channel.getRemoteAddress()));
                    }
                } finally {
                    CHANNELS.remove(response.getId());
                }
            }

            private void doReceived(Response res) {
                lock.lock();
                try {
                    response = res;
                    if (done != null) {
                        // 喚醒相關(guān)阻塞線程
                        done.signal();
                    }
                } finally {
                    lock.unlock();
                }
                if (callback != null) {
                    invokeCallback(callback);
                }
            }
        }

        3.1.2 設(shè)置回調(diào)函數(shù)

        我們現(xiàn)在調(diào)用get方法會阻塞在那里等到結(jié)果,那么有沒有一種方式當(dāng)結(jié)果返回時就立即調(diào)用我們設(shè)置的回調(diào)函數(shù)?答案是有。

        public class AsyncConsumer {

            public static void main(String[] args) throws Exception {
                test2();
                System.in.read();
            }

            public static void test2() throws Exception {
                ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                System.out.println(context);
                context.start();

                /** 加法運算 **/
                CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                calcSumService.sum(32);

                /** 執(zhí)行回調(diào)函數(shù) **/
                ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
                    @Override
                    public void done(Object response) {
                        System.out.println("sumResult=" + response);
                    }

                    @Override
                    public void caught(Throwable exception) {
                        exception.printStackTrace();
                    }
                });

                /** 減法運算 **/
                CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                calcSubtractionService.subtraction(32);

                /** 執(zhí)行回調(diào)函數(shù) **/
                ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
                    @Override
                    public void done(Object response) {
                        System.out.println("subtractionResult=" + response);
                    }

                    @Override
                    public void caught(Throwable exception) {
                        exception.printStackTrace();
                    }
                });
            }
        }

        DefaultFuture可以設(shè)置callback回調(diào)函數(shù),當(dāng)結(jié)果返回時如果回調(diào)函數(shù)不為空則執(zhí)行:

        public class DefaultFuture implements ResponseFuture {
            private volatile ResponseCallback callback;

            private void doReceived(Response res) {
                lock.lock();
                try {
                    response = res;
                    if (done != null) {
                        done.signal();
                    }
                } finally {
                    lock.unlock();
                }
                if (callback != null) {
                    // 執(zhí)行回調(diào)函數(shù)
                    invokeCallback(callback);
                }
            }

            private void invokeCallback(ResponseCallback c) {
                ResponseCallback callbackCopy = c;
                if (callbackCopy == null) {
                    throw new NullPointerException("callback cannot be null.");
                }
                c = null;
                Response res = response;
                if (res == null) {
                    throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
                }
                if (res.getStatus() == Response.OK) {
                    try {
                        // 執(zhí)行成功回調(diào)
                        callbackCopy.done(res.getResult());
                    } catch (Exception e) {
                        logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
                    }
                } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                    try {
                        TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
                        // 發(fā)生超時回調(diào)
                        callbackCopy.caught(te);
                    } catch (Exception e) {
                        logger.error("callback invoke error ,url:" + channel.getUrl(), e);
                    }
                } else {
                    try {
                        RuntimeException re = new RuntimeException(res.getErrorMessage());
                        callbackCopy.caught(re);
                    } catch (Exception e) {
                        logger.error("callback invoke error ,url:" + channel.getUrl(), e);
                    }
                }
            }
        }

        3.2 version_2.7.0

        CompletableFuture在這個版本中被引入實現(xiàn)異步調(diào)用,可以使用此類強(qiáng)大的異步編程API增強(qiáng)異步能力,我們首先回顧1.1.2章節(jié)實例:

        public class Consumer {

            public static void testAsync() {
                try {
                    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                    System.out.println(context);
                    context.start();

                    /** 加法運算 **/
                    CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                    calcSumService.sum(32);
                    CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();

                    /** 減法運算 **/
                    CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                    calcSubtractionService.subtraction(32);
                    CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();

                    /** 輸出結(jié)果 **/
                    int sumResult = futureSum.get();
                    int subtractionResult = futureSubtraction.get();
                    System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        在上述消費者代碼的實例中我們只是應(yīng)用了CompletableFuture.get()方法,并沒有發(fā)揮其強(qiáng)大功能。我們對上述實例稍加改造,兩個CompletionStage任務(wù)都執(zhí)行完成后,兩個任務(wù)結(jié)果會一起交給thenCombine進(jìn)行處理:

        public class Consumer {

            public static void testAsync() {
                try {
                    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer2.xml" });
                    System.out.println(context);
                    context.start();

                    /** 加法運算 **/
                    CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                    calcSumService.sum(32);
                    CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();

                    /** 減法運算 **/
                    CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                    calcSubtractionService.subtraction(32);
                    CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();

                    /** 乘法運算 **/
                    CompletableFuture<Integer> multiplyResult = futureSum.thenCombine(futureSubtraction, new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer t, Integer u) {
                            return (t * u);
                        }
                    });
                    System.out.println("multiplyResult=" + multiplyResult.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        DubboInvoker代碼有所變化:

        public class DubboInvoker<Textends AbstractInvoker<T{

            @Override
            protected Result doInvoke(final Invocation invocation) throws Throwable {
                RpcInvocation inv = (RpcInvocation) invocation;
                final String methodName = RpcUtils.getMethodName(invocation);
                inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
                inv.setAttachment(Constants.VERSION_KEY, version);
                ExchangeClient currentClient;
                if (clients.length == 1) {
                    currentClient = clients[0];
                } else {
                    currentClient = clients[index.getAndIncrement() % clients.length];
                }
                try {
                    // 是否為異步調(diào)用
                    boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);

                    // 是否為future異步方式
                    boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);

                    // 是否需要響應(yīng)結(jié)果
                    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

                    // 超時時間
                    int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

                    // 單向調(diào)用
                    if (isOneway) {
                        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                        currentClient.send(inv, isSent);
                        RpcContext.getContext().setFuture(null);
                        return new RpcResult();
                    }
                    // 異步請求
                    else if (isAsync) {
                        ResponseFuture future = currentClient.request(inv, timeout);
                        FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                        RpcContext.getContext().setFuture(futureAdapter);
                        Result result;
                        if (isAsyncFuture) {
                            result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                        } else {
                            result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                        }
                        return result;
                    }
                    // 同步請求
                    else {
                        RpcContext.getContext().setFuture(null);
                        Result result = (Result) currentClient.request(inv, timeout).get();
                        return result;
                    }
                } catch (TimeoutException e) {
                    throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                } catch (RemotingException e) {
                    throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                }
            }
        }

        我們看到與2.6.9版本相同的是FutureAdapter同樣會被設(shè)置到上下文,但是FutureAdapter本身已經(jīng)發(fā)生了變化:

        public class FutureAdapter<Vextends CompletableFuture<V{
            private final ResponseFuture future;
            private CompletableFuture<Result> resultFuture;

            public FutureAdapter(ResponseFuture future) {
                this.future = future;
                this.resultFuture = new CompletableFuture<>();

                // 設(shè)置回調(diào)函數(shù)至DefaultFuture
                future.setCallback(new ResponseCallback() {

                    // 設(shè)置響應(yīng)結(jié)果至CompletableFuture
                    @Override
                    public void done(Object response) {
                        Result result = (Result) response;
                        FutureAdapter.this.resultFuture.complete(result);
                        V value = null;
                        try {
                            value = (V) result.recreate();
                        } catch (Throwable t) {
                            FutureAdapter.this.completeExceptionally(t);
                        }
                        FutureAdapter.this.complete(value);
                    }

                    // 設(shè)置異常結(jié)果至FutureAdapter
                    @Override
                    public void caught(Throwable exception) {
                        FutureAdapter.this.completeExceptionally(exception);
                    }
                });
            }

            public ResponseFuture getFuture() {
                return future;
            }

            public CompletableFuture<Result> getResultFuture() {
                return resultFuture;
            }
        }

        我們在服務(wù)消費時通過getResultFuture方法獲取CompletableFuture,這個對象值在回調(diào)時被設(shè)置,回調(diào)時機(jī)同樣在DefaultFuture.doReceived方法里面:

        public class DefaultFuture implements ResponseFuture {
            private volatile ResponseCallback callback;

            private void doReceived(Response res) {
                lock.lock();
                try {
                    response = res;
                    if (done != null) {
                        done.signal();
                    }
                } finally {
                    lock.unlock();
                }
                if (callback != null) {
                    // 執(zhí)行回調(diào)函數(shù)代碼同version_2.6.9
                    invokeCallback(callback);
                }
            }
        }

        4 文章總結(jié)

        本文第一介紹了DUBBO消費異步化是什么,以及異步化為什么會帶來性能提升。第二介紹了保護(hù)性暫停模式,這是實現(xiàn)異步化的基礎(chǔ)。最后我們閱讀了兩個不同版本異步化源碼,了解了DUBBO異步化演進(jìn)過程,希望本文對大家有所幫助。




        JAVA前線 


        歡迎大家關(guān)注公眾號「JAVA前線」查看更多精彩分享,主要包括源碼分析、實際應(yīng)用、架構(gòu)思維、職場分享、產(chǎn)品思考等等,同時也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)


        瀏覽 27
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
          
          

            1. 一级特黄大片欧美 | 偷尝禁果做爰av 97性潮久久久久久久久动漫 | 国产精品人妻人伦a 6 2v久动漫 骚虎导航 | 男女羞羞无遮挡 | 国产日韩在线看 |