1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Project Reactor 響應式編程

        共 12519字,需瀏覽 26分鐘

         ·

        2021-06-27 23:44

        一. 什么是響應式編程?

        在計算中,響應式編程或反應式編程是一種面向數(shù)據(jù)流和變化傳播的聲明式編程范式。這意味著可以在編程語言中很方便地表達靜態(tài)或動態(tài)的數(shù)據(jù)流,而相關的計算模型會自動將變化的值通過數(shù)據(jù)流進行傳播。

        上面一段話來自維基百科。

        響應式編程顧名思義就是在于響應二字,我們需要在某個事件發(fā)生時做出響應。

        我們現(xiàn)實生活就是對響應式很好的解釋,我們人類的舉動大多都是基于事件驅動模式,當有人呼喊你的名字,你會根據(jù)這個事件來判斷要不要進行應答,這個過程其實就是產生事件,然后我們作為消費者對事件進行處理,而我們的處理結果也會繼續(xù)向下傳遞。

        在響應式編程中,通常是采用異步回調的方式,回調方法的調用和控制則會由響應式框架來完成,對于應用開發(fā)來說只需要關注回調方法的實現(xiàn)就可以了。

        這里提一個著名的設計原則:好萊塢原則(Hollywood principle)

        Don't call us, we will call you.

        演員提交簡歷之后,回家等著就好,演藝公司會主動打電話給你。

        二. Project Reactor介紹

        Java中最早的Reactor庫RxJava借鑒于.Net的Reactor Extensions,后來Jdk在Java9提供了標準化的響應式庫實現(xiàn)java.util.concurrent.Flow,再后來,Project Reactor作為第四代響應式編程框架出現(xiàn),它是一個完全非阻塞響應式編程的基石,直接集成了Java函數(shù)式API,特別是CompletableFuture,Stream和Duration。Reactor Netty實現(xiàn)了非阻塞跨進程通信,提升了服務間通信效率。

        我們在平常開發(fā)中,異步編程無非是使用JUC包下的工具類或者一些Java同步語義。

        ?阻塞等待:如 Future.get()?不安全的數(shù)據(jù)訪問:如 ReentrantLock.lock()?異常冒泡:如 try…catch…finally?同步阻塞:如 synchronized{ }?Wrapper分配(GC 壓力):如 new Wrapper(event)

        或者自定義線程池,但也會遇到諸如以下的問題。

        ?Callable 分配 -- 可能導致 GC 壓力。?同步過程強制每個線程執(zhí)行停-檢查操作。?消息的消費可能比生產慢。?使用線程池(ThreadPool)將任務傳遞給目標線程 -- 通過 FutureTask 方式肯定會產生 GC 壓力。?阻塞直至IO回調。

        上面等等問題都會造成的系統(tǒng)性能瓶頸或者安全問題,在Future.get時我們無法避免阻塞等待,最差情況下程序運行其實還是同步的,使用Reactor不但可以很有效的解決上述問題,還能讓我們寫出更加簡潔明了的代碼。

        三. Reactor核心概念

        代碼: https://github.com/CasterWx/reactor-ppt

        Flux

        Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。

        1. just()

        可以指定序列中包含的全部元素。創(chuàng)建出來的 Flux 序列在發(fā)布這些元素之后會自動結束。

        Flux.just("hello", "world")
        .doOnNext((i) -> {
        System.out.println("[doOnNext] " + i);
        })
        .doOnComplete(() -> System.out.println("[doOnComplete]"))
        .subscribe(i -> System.out.println("[subscribe] " + i));

        // 執(zhí)行結果
        [doOnNext] hello
        [subscribe] hello
        [doOnNext] world
        [subscribe] world
        [doOnComplete]

        2. fromArray(),fromIterable()和 fromStream()

        可以從一個數(shù)組、Iterable 對象或 Stream 對象中創(chuàng)建 Flux 對象。

        List<String> arr = Arrays.asList("flux", "mono", "reactor", "core");
        Flux.fromIterable(arr)
        .doOnNext((i) -> {
        System.out.println("[doOnNext] " + i);
        })
        .subscribe((i) -> {
        System.out.println("[subscribe] " + i);
        });
        //執(zhí)行結果
        [doOnNext] flux
        [subscribe] flux
        [doOnNext] mono
        [subscribe] mono
        [doOnNext] reactor
        [subscribe] reactor
        [doOnNext] core
        [subscribe] core

        3. empty()

        創(chuàng)建一個不包含任何元素,只發(fā)布結束消息的序列。

         Flux.empty()
        .doOnNext(i -> {
        System.out.println("[doOnNext] " + i);
        }).doOnComplete(() -> {
        System.out.println("[DoOnComplete] ");
        }).subscribe(i -> {
        System.out.println("[subscribe] " + i);
        });
        //執(zhí)行結果
        [DoOnComplete]

        4. error(Throwable error)

        創(chuàng)建一個只包含錯誤消息的序列。

        try {
        int []arr = new int[5];
        arr[10] = 2;
        } catch (Exception e) {
        Flux.error(e).subscribe(i -> {
        System.out.println("error subscribe");
        });
        }
        //執(zhí)行結果

        5. never()

        創(chuàng)建一個不包含任何消息通知的序列。

        Flux.never()
        .doOnNext(i -> {
        System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
        System.out.println("doOnComplete");
        }).subscribe((i) -> {
        System.out.println("subscribe " + i);
        });
        //執(zhí)行結果

        6. range(int start, int count)

        創(chuàng)建包含從 start 起始的 count 個數(shù)量的 Integer 對象的序列。

        Flux.range(5, 10)
        .doOnNext(i -> {
        System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
        System.out.println("doOnComplete");
        }).subscribe((i) -> {
        System.out.println("subscribe " + i);
        });
        //執(zhí)行結果
        doOnNext 5
        subscribe 5
        doOnNext 6
        subscribe 6
        doOnNext 7
        subscribe 7
        doOnNext 8
        subscribe 8
        doOnNext 9
        subscribe 9
        doOnNext 10
        subscribe 10
        doOnNext 11
        subscribe 11
        doOnNext 12
        subscribe 12
        doOnNext 13
        subscribe 13
        doOnNext 14
        subscribe 14
        doOnComplete

        7. interval(Duration period)和 interval(Duration delay, Duration period)

        創(chuàng)建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發(fā)布。除了間隔時間之外,還可以指定起始元素發(fā)布之前的延遲時間。

        Flux.interval(Duration.ofSeconds(4), Duration.ofSeconds(2))
        .doOnNext(i -> {
        System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
        System.out.println("doOnComplete " + new Date());
        }).subscribe((i) -> {
        System.out.println("subscribe " + i + ", date: " + new Date());
        });
        try {
        Thread.sleep(10000);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        //執(zhí)行結果
        doOnNext 0
        subscribe 0, date: Fri Jun 25 10:17:56 CST 2021
        doOnNext 1
        subscribe 1, date: Fri Jun 25 10:17:58 CST 2021
        doOnNext 2
        subscribe 2, date: Fri Jun 25 10:18:00 CST 2021
        doOnNext 3
        subscribe 3, date: Fri Jun 25 10:18:02 CST 2021

        上面實例為什么沒有輸出doOnComplete, 從第四秒開始,每兩秒生產一個元素,等到最后complete時已經到了sleep的十秒時間,主線程main已經推出。

        8. intervalMillis(long period)和 intervalMillis(long delay, long period)

        與 interval()方法的作用相同,只不過該方法通過毫秒數(shù)來指定時間間隔和延遲時間。

        Mono

        Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。

        1. fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier()

        分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中創(chuàng)建 Mono。

         Mono.fromCallable(() -> {
        System.out.println("begin callable");
        return "Hello";
        })
        .subscribeOn(Schedulers.elastic())
        .doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
        .subscribe(System.out::println);
        Thread.sleep(10000);
        //執(zhí)行結果
        begin callable
        doOnNext Hello, thread :elastic-2
        Hello
        Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("begin");
        return "hello";
        }))
        .subscribeOn(Schedulers.elastic())
        .doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
        .subscribe(System.out::println);
        Thread.sleep(10000);
        //執(zhí)行結果
        begin
        doOnNext hello, thread :elastic-2
        hello

        2. delay(Duration duration)和 delayMillis(long duration)

        創(chuàng)建一個 Mono 序列,在指定的延遲時間之后,產生數(shù)字 0 作為唯一值。

        Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
        Thread.sleep(3000);
        //執(zhí)行結果, 延遲一秒后打印
        0

        3. ignoreElements(Publisher source)

        創(chuàng)建一個 Mono 序列,忽略作為源的 Publisher 中的所有元素,只產生結束消息。

        Mono.ignoreElements((i) -> {
        System.out.println("ignoreElements");
        })
        .doOnNext((i) -> System.out.println("doOnNext " + i))
        .subscribe(System.out::println);
        //執(zhí)行結果
        ignoreElements

        4. justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)

        從一個 Optional 對象或可能為 null 的對象中創(chuàng)建 Mono。只有 Optional 對象中包含值或對象不為 null 時,Mono 序列才產生對應的元素。

        Optional<Integer> optional = Optional.empty();
        Mono.justOrEmpty(optional)
        .doOnNext((i) -> System.out.println("doOnNext " + i))
        .subscribe(System.out::println);

        System.out.println("========");

        optional = Optional.of(100);
        Mono.justOrEmpty(optional)
        .doOnNext((i) -> System.out.println("doOnNext " + i))
        .subscribe(System.out::println);
        //執(zhí)行結果
        ========
        doOnNext 100
        100

        操作符

        1. buffer 和 bufferTimeout

        這兩個操作符的作用是把當前流中的元素收集到集合中,并把集合對象作為流中的新元素。在進行收集時可以指定不同的條件:所包含的元素的最大數(shù)量或收集的時間間隔。方法 buffer()僅使用一個條件,而 bufferTimeout()可以同時指定兩個條件。指定時間間隔時可以使用 Duration 對象或毫秒數(shù),即使用 bufferMillis()或 bufferTimeoutMillis()兩個方法。

        除了元素數(shù)量和時間間隔之外,還可以通過 bufferUntil 和 bufferWhile 操作符來進行收集。這兩個操作符的參數(shù)是表示每個集合中的元素所要滿足的條件的 Predicate 對象。bufferUntil 會一直收集直到 Predicate 返回為 true。使得 Predicate 返回 true 的那個元素可以選擇添加到當前集合或下一個集合中;bufferWhile 則只有當 Predicate 返回 true 時才會收集。一旦值為 false,會立即開始下一次收集。

        Flux.range(1, 100).buffer(20).subscribe(System.out::println);

        //執(zhí)行結果
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
        [21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
        [41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
        [61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
        [81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]

        2. filter

        對流中包含的元素進行過濾,只留下滿足 Predicate 指定條件的元素。

        Flux.range(1, 10)
        .filter(i -> i%2==0)
        .doOnNext(i -> {
        System.out.println("[doOnNext] " + i);
        })
        .subscribe(i -> {
        System.out.println("subscribe " + i);
        });
        //執(zhí)行結果
        [doOnNext] 2
        subscribe 2
        [doOnNext] 4
        subscribe 4
        [doOnNext] 6
        subscribe 6
        [doOnNext] 8
        subscribe 8
        [doOnNext] 10
        subscribe 10

        3. window

        window 操作符的作用類似于 buffer,所不同的是 window 操作符是把當前流中的元素收集到另外的 Flux 序列中,因此返回值類型是 Flux。

        Flux.range(1, 15).window(5)
        .doOnNext((flux -> {}))
        .subscribe(flux -> {
        flux.doOnNext((item) -> {
        System.out.println("[window] flux: " + item);
        })
        .doOnComplete(() -> System.out.println("flux item complete"))
        .subscribe();
        });
        // 執(zhí)行結果
        [window] flux: 1
        [window] flux: 2
        [window] flux: 3
        [window] flux: 4
        [window] flux: 5
        flux item complete
        [window] flux: 6
        [window] flux: 7
        [window] flux: 8
        [window] flux: 9
        [window] flux: 10
        flux item complete
        [window] flux: 11
        [window] flux: 12
        [window] flux: 13
        [window] flux: 14
        [window] flux: 15
        flux item complete

        4. zipWith

        zipWith 操作符把當前流中的元素與另外一個流中的元素按照一對一的方式進行合并。在合并時可以不做任何處理,由此得到的是一個元素類型為 Tuple2 的流;也可以通過一個 BiFunction 函數(shù)對合并的元素進行處理,所得到的流的元素類型為該函數(shù)的返回值。

        Flux.just("Hello", "Project")
        .zipWith(Flux.just("World", "Reactor"))
        .subscribe(System.out::println);

        System.out.println("======");

        Flux.just("Hello", "Project")
        .zipWith(Flux.just("World", "Reactor"), (s1, s2) -> String.format("%s!%s!", s1, s2))
        .subscribe(System.out::println);
        // 執(zhí)行結果
        Hello,World
        Project,Reactor
        ======
        Hello!World!
        Project!Reactor!

        5. take

        take 系列操作符用來從當前流中提取元素。提取的方式可以有很多種。

        take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的數(shù)量或時間間隔來提取。

        Flux.range(1, 10).take(2).subscribe(System.out::println);
        // 執(zhí)行結果
        1
        2

        1.takeLast(long n):提取流中的最后 N 個元素。

        Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
        // 執(zhí)行結果
        9
        10

        1.takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。

        Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);
        // 執(zhí)行結果
        1
        2
        3
        4
        5
        6

        1.takeWhile(Predicate<? super T> continuePredicate):當 Predicate 返回 true 時才進行提取。

        Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
        // 執(zhí)行結果
        1
        2
        3
        4

        1.takeUntilOther(Publisher<?> other):提取元素直到另外一個流開始產生元素。

        Flux.range(1, 5).takeUntilOther((i) -> {
        try {
        Thread.sleep(1000);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        }).subscribe(System.out::println);
        // 執(zhí)行結果,暫停1000ms后開始輸出
        1
        2
        3
        4
        5

        6. reduce 和 reduceWith

        reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。在操作時可以指定一個初始值。如果沒有初始值,則序列的第一個元素作為初始值。

        Flux.range(1, 10)
        .reduce((x, y) -> {
        System.out.println("x:" + x + ", y:" + y);
        return x+y;
        })
        .subscribe(System.out::println);
        // 執(zhí)行結果
        x:1, y:2
        x:3, y:3
        x:6, y:4
        x:10, y:5
        x:15, y:6
        x:21, y:7
        x:28, y:8
        x:36, y:9
        x:45, y:10
        55
        Flux.range(1, 10)
        .reduceWith(() -> 100, (x, y) -> {
        System.out.println("x:" + x + ", y:" + y);
        return x+y;
        })
        .subscribe(System.out::println);
        // 執(zhí)行結果
        x:100, y:1
        x:101, y:2
        x:103, y:3
        x:106, y:4
        x:110, y:5
        x:115, y:6
        x:121, y:7
        x:128, y:8
        x:136, y:9
        x:145, y:10
        155

        7. merge 和 mergeSequential

        merge 和 mergeSequential 操作符用來把多個流合并成一個 Flux 序列。不同之處在于 merge 按照所有流中元素的實際產生順序來合并,而 mergeSequential 則按照所有流被訂閱的順序,以流為單位進行合并。

        Flux.merge(Flux.interval(
        Duration.of(0, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
        Duration.of(50, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
        .forEach(System.out::println);
        System.out.println("==============");
        Flux.mergeSequential(Flux.interval(
        Duration.of(0, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
        Duration.of(50, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
        .forEach(System.out::println);
        // 執(zhí)行結果
        0
        0
        1
        1
        ==============
        0
        1
        0
        1

        8. flatMap 和 flatMapSequential

        flatMap 和 flatMapSequential 操作符把流中的每個元素轉換成一個流,再把所有流中的元素進行合并。flatMapSequential 和 flatMap 之間的區(qū)別與 mergeSequential 和 merge 之間的區(qū)別是一樣的。

        Flux.just(1, 2)
        .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x))
        .toStream()
        .forEach(System.out::println);
        // 執(zhí)行結果
        0
        0
        1

        9. concatMap 和 combineLatest

        concatMap 操作符的作用也是把流中的每個元素轉換成一個流,再把所有流進行合并。與 flatMap 不同的是,concatMap 會根據(jù)原始流中的元素順序依次把轉換之后的流進行合并;與 flatMapSequential 不同的是,concatMap 對轉換之后的流的訂閱是動態(tài)進行的,而 flatMapSequential 在合并之前就已經訂閱了所有的流。

        Flux.just(5, 10)
        .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
        .toStream()
        .forEach(System.out::println);

        Flux.combineLatest(
        Arrays::toString,
        Flux.intervalMillis(100).take(5),
        Flux.intervalMillis(50, 100).take(5)
        ).toStream().forEach(System.out::println);

        四. 結束

        上文已經簡單介紹了Reactor的兩個核心概念Flux和Mono,以及一些常用操作符的使用,剛開始使用響應式編程范式對于部分開發(fā)人員來說可能極度困難,但熟能生巧,長期使用讓思維方式轉變才能領會到響應式編程的優(yōu)點。


        瀏覽 65
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            久久久久久AV无码免费网站动漫 | 中国逼| 日韩欧美在线视频观看 | 亚洲午夜影院在线 | 久久国产露脸精品国产 | 日韩成人一区二区视频 | 少妇呻吟视频 | 天躁夜夜躁2021aa91 | 久久九九女女男女热 | 操老女人骚逼 |