1. Spring 官方批處理框架真香!Spring 全家桶永遠(yuǎn)滴神!

        共 26945字,需瀏覽 54分鐘

         ·

        2021-10-08 20:22

        假期余額嚴(yán)重不足,難受呀!今天早上 8 點(diǎn)半出門坐車,晚上 9 點(diǎn)才到家,差不多坐了一天車才到家,屁股都坐疼了......

        推薦一個(gè)很多小伙伴沒注意到的 Spring 官方的批處理框架。

        Spring Batch 是一個(gè)輕量級(jí)但功能又十分全面的批處理框架,主要用于批處理場景比如從數(shù)據(jù)庫、文件或隊(duì)列中讀取大量記錄。不過,需要注意的是:Spring Batch 不是調(diào)度框架。商業(yè)和開源領(lǐng)域都有許多優(yōu)秀的企業(yè)調(diào)度框架比如 Quartz、XXL-JOB、Elastic-Job。它旨在與調(diào)度程序一起工作,而不是取代調(diào)度程序。

        目前,Spring Batch 也已經(jīng)被收錄進(jìn)了開源項(xiàng)目 awesome-java (非常棒的 Java 開源項(xiàng)目集合)。

        項(xiàng)目地址:https://github.com/CodingDocs/awesome-java

        關(guān)于 Spring Batch 的詳細(xì)介紹可以參考 Spring Batch 官方文檔[1],入門教程可以參考下面的內(nèi)容,原文地址:https://mrbird.cc/Spring-Batch 入門.html?。

        項(xiàng)目搭建

        新建一個(gè) Spring Boot 項(xiàng)目,版本為 2.2.4.RELEASE,artifactId 為 spring-batch-start,項(xiàng)目結(jié)構(gòu)如下圖所示:

        然后在 pom.xml 中引入 Spring Batch、MySQL 和 JDBC 依賴,引入后 pom.xml 內(nèi)容如下所示:


        <project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        ?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd">

        ????<modelVersion>4.0.0modelVersion>
        ????<parent>
        ????????<groupId>org.springframework.bootgroupId>
        ????????<artifactId>spring-boot-starter-parentartifactId>
        ????????<version>2.2.5.RELEASEversion>
        ????????<relativePath/>?
        ????parent>
        ????<groupId>cc.mrbirdgroupId>
        ????<artifactId>spring-batch-startartifactId>
        ????<version>0.0.1-SNAPSHOTversion>
        ????<name>spring-batch-startname>
        ????<description>Demo?project?for?Spring?Bootdescription>

        ????<properties>
        ????????<java.version>1.8java.version>
        ????properties>

        ????<dependencies>
        ????????<dependency>
        ????????????<groupId>org.springframework.bootgroupId>
        ????????????<artifactId>spring-boot-starter-batchartifactId>
        ????????dependency>

        ????????<dependency>
        ????????????<groupId>mysqlgroupId>
        ????????????<artifactId>mysql-connector-javaartifactId>
        ????????dependency>
        ????????<dependency>
        ????????????<groupId>org.springframework.bootgroupId>
        ????????????<artifactId>spring-boot-starter-jdbcartifactId>
        ????????dependency>
        ????dependencies>

        ????<build>
        ????????<plugins>
        ????????????<plugin>
        ????????????????<groupId>org.springframework.bootgroupId>
        ????????????????<artifactId>spring-boot-maven-pluginartifactId>
        ????????????plugin>
        ????????plugins>
        ????build>
        project>

        在編寫代碼之前,我們先來簡單了解下 Spring Batch 的組成:


        • Spring Batch 里最基本的單元就是任務(wù) Job,一個(gè) Job 由若干個(gè)步驟 Step 組成。
        • 任務(wù)啟動(dòng)器 Job Launcher 負(fù)責(zé)運(yùn)行 Job。
        • 任務(wù)存儲(chǔ)倉庫 Job Repository 存儲(chǔ)著 Job 的執(zhí)行狀態(tài),參數(shù)和日志等信息。Job 處理任務(wù)又可以分為三大類:
          • 數(shù)據(jù)讀取 Item Reader
          • 數(shù)據(jù)中間處理 Item Processor
          • 數(shù)據(jù)輸出 Item Writer。

        任務(wù)存儲(chǔ)倉庫可以是關(guān)系型數(shù)據(jù)庫 MySQL,非關(guān)系型數(shù)據(jù)庫 MongoDB 或者直接存儲(chǔ)在內(nèi)存中,本篇使用的是 MySQL 作為任務(wù)存儲(chǔ)倉庫。

        新建一個(gè)名稱為 springbatch 的 MySQL 數(shù)據(jù)庫,然后導(dǎo)入 org.springframework.batch.core 目錄下的 schema-mysql.sql 文件:

        導(dǎo)入后,庫表如下圖所示:


        然后在項(xiàng)目的配置文件 application.yml 里添加 MySQL 相關(guān)配置:

        spring:
        ??datasource:
        ????driver-class-name:?com.mysql.cj.jdbc.Driver
        ????url:?jdbc:mysql://127.0.0.1:3306/springbatch
        ????username:?root
        ????password:?123456

        接著在 Spring Boot 的入口類上添加 @EnableBatchProcessing 注解,表示開啟 Spring Batch 批處理功能:

        @SpringBootApplication
        @EnableBatchProcessing
        public?class?SpringBatchStartApplication?{
        ????public?static?void?main(String[]?args)?{
        ????????SpringApplication.run(SpringBatchStartApplication.class,?args);
        ????}
        }

        至此,基本框架搭建好了,下面開始配置一個(gè)簡單的任務(wù)。

        編寫第一個(gè)任務(wù)

        cc.mrbird.batch 目錄下新建 job 包,然后在該包下新建一個(gè) FirstJobDemo 類,代碼如下所示:

        @Component
        public?class?FirstJobDemo?{

        ????@Autowired
        ????private?JobBuilderFactory?jobBuilderFactory;
        ????@Autowired
        ????private?StepBuilderFactory?stepBuilderFactory;

        ????@Bean
        ????public?Job?firstJob()?{
        ????????return?jobBuilderFactory.get("firstJob")
        ????????????????.start(step())
        ????????????????.build();
        ????}

        ????private?Step?step()?{
        ????????return?stepBuilderFactory.get("step")
        ????????????????.tasklet((contribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟....");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}
        }

        上面代碼中,我們注入了JobBuilderFactory任務(wù)創(chuàng)建工廠和StepBuilderFactory步驟創(chuàng)建工廠,分別用于創(chuàng)建任務(wù) Job 和步驟 Step。JobBuilderFactoryget方法用于創(chuàng)建一個(gè)指定名稱的任務(wù),start方法指定任務(wù)的開始步驟,步驟通過StepBuilderFactory構(gòu)建。

        步驟 Step 由若干個(gè)小任務(wù) Tasklet 組成,所以我們通過tasklet方法創(chuàng)建。tasklet方法接收一個(gè)Tasklet類型參數(shù),Tasklet是一個(gè)函數(shù)是接口,源碼如下:

        public?interface?Tasklet?{
        ????@Nullable
        ????RepeatStatus?execute(StepContribution?contribution,?ChunkContext?chunkContext)?throws?Exception;
        }

        所以我們可以使用 lambda 表達(dá)式創(chuàng)建一個(gè)匿名實(shí)現(xiàn):

        (contribution,?chunkContext)?->?{
        ????System.out.println("執(zhí)行步驟....");
        ????return?RepeatStatus.FINISHED;
        }

        該匿名實(shí)現(xiàn)必須返回一個(gè)明確的執(zhí)行狀態(tài),這里返回RepeatStatus.FINISHED表示該小任務(wù)執(zhí)行成功,正常結(jié)束。

        此外,需要注意的是,我們配置的任務(wù) Job 必須注冊到 Spring IOC 容器中,并且任務(wù)的名稱和步驟的名稱組成唯一。比如上面的例子,我們的任務(wù)名稱為 firstJob,步驟的名稱為 step,如果存在別的任務(wù)和步驟組合也叫這個(gè)名稱的話,則會(huì)執(zhí)行失敗。

        啟動(dòng)項(xiàng)目,控制臺(tái)打印日志如下:

        ...
        2020-03-06?11:01:11.785??INFO?17324?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=firstJob]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?11:01:11.846??INFO?17324?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step]
        執(zhí)行步驟....
        2020-03-06?11:01:11.886??INFO?17324?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step]?executed?in?40ms
        2020-03-06?11:01:11.909??INFO?17324?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=firstJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?101ms

        可以看到,任務(wù)成功執(zhí)行了,數(shù)據(jù)庫的庫表也將記錄相關(guān)運(yùn)行日志。

        重新啟動(dòng)項(xiàng)目,控制臺(tái)并不會(huì)再次打印出任務(wù)執(zhí)行日志,因?yàn)?Job 名稱和 Step 名稱組成唯一,執(zhí)行完的不可重復(fù)的任務(wù),不會(huì)再次執(zhí)行。

        多步驟任務(wù)

        一個(gè)復(fù)雜的任務(wù)一般包含多個(gè)步驟,下面舉個(gè)多步驟任務(wù)的例子。在 job 包下新建MultiStepJobDemo類:

        @Component
        public?class?MultiStepJobDemo?{

        ????@Autowired
        ????private?JobBuilderFactory?jobBuilderFactory;
        ????@Autowired
        ????private?StepBuilderFactory?stepBuilderFactory;

        ????@Bean
        ????public?Job?multiStepJob()?{
        ????????return?jobBuilderFactory.get("multiStepJob")
        ????????????????.start(step1())
        ????????????????.next(step2())
        ????????????????.next(step3())
        ????????????????.build();
        ????}

        ????private?Step?step1()?{
        ????????return?stepBuilderFactory.get("step1")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step2()?{
        ????????return?stepBuilderFactory.get("step2")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step3()?{
        ????????return?stepBuilderFactory.get("step3")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}
        }

        上面代碼中,我們通過step1()step2()step3()三個(gè)方法創(chuàng)建了三個(gè)步驟。Job 里要使用這些步驟,只需要通過JobBuilderFactorystart方法指定第一個(gè)步驟,然后通過next方法不斷地指定下一個(gè)步驟即可。

        啟動(dòng)項(xiàng)目,控制臺(tái)打印日志如下:

        2020-03-06?13:52:52.188??INFO?18472?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=multiStepJob]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?13:52:52.222??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
        執(zhí)行步驟一操作。。。
        2020-03-06?13:52:52.251??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?29ms
        2020-03-06?13:52:52.292??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
        執(zhí)行步驟二操作。。。
        2020-03-06?13:52:52.323??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?30ms
        2020-03-06?13:52:52.375??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
        執(zhí)行步驟三操作。。。
        2020-03-06?13:52:52.405??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?29ms
        2020-03-06?13:52:52.428??INFO?18472?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=multiStepJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?231ms

        三個(gè)步驟依次執(zhí)行成功。

        多個(gè)步驟在執(zhí)行過程中也可以通過上一個(gè)步驟的執(zhí)行狀態(tài)來決定是否執(zhí)行下一個(gè)步驟,修改上面的代碼:

        @Component
        public?class?MultiStepJobDemo?{

        ????@Autowired
        ????private?JobBuilderFactory?jobBuilderFactory;
        ????@Autowired
        ????private?StepBuilderFactory?stepBuilderFactory;

        ????@Bean
        ????public?Job?multiStepJob()?{
        ????????return?jobBuilderFactory.get("multiStepJob2")
        ????????????????.start(step1())
        ????????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
        ????????????????.from(step2())
        ????????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
        ????????????????.from(step3()).end()
        ????????????????.build();
        ????}

        ????private?Step?step1()?{
        ????????return?stepBuilderFactory.get("step1")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step2()?{
        ????????return?stepBuilderFactory.get("step2")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step3()?{
        ????????return?stepBuilderFactory.get("step3")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}
        }

        multiStepJob()方法的含義是:multiStepJob2 任務(wù)先執(zhí)行 step1,當(dāng) step1 狀態(tài)為完成時(shí),接著執(zhí)行 step2,當(dāng) step2 的狀態(tài)為完成時(shí),接著執(zhí)行 step3。ExitStatus.COMPLETED常量表示任務(wù)順利執(zhí)行完畢,正常退出,該類還包含以下幾種退出狀態(tài):

        public?class?ExitStatus?implements?Serializable,?Comparable<ExitStatus>?{

        ????/**
        ?????*?Convenient?constant?value?representing?unknown?state?-?assumed?not
        ?????*?continuable.
        ?????*/

        ????public?static?final?ExitStatus?UNKNOWN?=?new?ExitStatus("UNKNOWN");

        ????/**
        ?????*?Convenient?constant?value?representing?continuable?state?where?processing
        ?????*?is?still?taking?place,?so?no?further?action?is?required.?Used?for
        ?????*?asynchronous?execution?scenarios?where?the?processing?is?happening?in
        ?????*?another?thread?or?process?and?the?caller?is?not?required?to?wait?for?the
        ?????*?result.
        ?????*/

        ????public?static?final?ExitStatus?EXECUTING?=?new?ExitStatus("EXECUTING");

        ????/**
        ?????*?Convenient?constant?value?representing?finished?processing.
        ?????*/

        ????public?static?final?ExitStatus?COMPLETED?=?new?ExitStatus("COMPLETED");

        ????/**
        ?????*?Convenient?constant?value?representing?job?that?did?no?processing?(e.g.
        ?????*?because?it?was?already?complete).
        ?????*/

        ????public?static?final?ExitStatus?NOOP?=?new?ExitStatus("NOOP");

        ????/**
        ?????*?Convenient?constant?value?representing?finished?processing?with?an?error.
        ?????*/

        ????public?static?final?ExitStatus?FAILED?=?new?ExitStatus("FAILED");

        ????/**
        ?????*?Convenient?constant?value?representing?finished?processing?with
        ?????*?interrupted?status.
        ?????*/

        ????public?static?final?ExitStatus?STOPPED?=?new?ExitStatus("STOPPED");

        ????...
        }

        啟動(dòng)項(xiàng)目,控制臺(tái)日志打印如下:

        2020-03-06?14:21:49.384??INFO?18745?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=multiStepJob2]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?14:21:49.427??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
        執(zhí)行步驟一操作。。。
        2020-03-06?14:21:49.456??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?29ms
        2020-03-06?14:21:49.501??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
        執(zhí)行步驟二操作。。。
        2020-03-06?14:21:49.527??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?26ms
        2020-03-06?14:21:49.576??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
        執(zhí)行步驟三操作。。。
        2020-03-06?14:21:49.604??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?28ms
        2020-03-06?14:21:49.629??INFO?18745?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=multiStepJob2]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?238ms

        Flow 的用法

        Flow 的作用就是可以將多個(gè)步驟 Step 組合在一起然后再組裝到任務(wù) Job 中。舉個(gè) Flow 的例子,在 job 包下新建FlowJobDemo類:

        @Component
        public?class?FlowJobDemo?{

        ????@Autowired
        ????private?JobBuilderFactory?jobBuilderFactory;
        ????@Autowired
        ????private?StepBuilderFactory?stepBuilderFactory;

        ????@Bean
        ????public?Job?flowJob()?{
        ????????return?jobBuilderFactory.get("flowJob")
        ????????????????.start(flow())
        ????????????????.next(step3())
        ????????????????.end()
        ????????????????.build();
        ????}

        ????private?Step?step1()?{
        ????????return?stepBuilderFactory.get("step1")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step2()?{
        ????????return?stepBuilderFactory.get("step2")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step3()?{
        ????????return?stepBuilderFactory.get("step3")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????//?創(chuàng)建一個(gè)flow對(duì)象,包含若干個(gè)step
        ????private?Flow?flow()?{
        ????????return?new?FlowBuilder("flow")
        ????????????????.start(step1())
        ????????????????.next(step2())
        ????????????????.build();
        ????}
        }

        上面代碼中,我們通過FlowBuilder將 step1 和 step2 組合在一起,創(chuàng)建了一個(gè)名為 flow 的 Flow,然后再將其賦給任務(wù) Job。使用 Flow 和 Step 構(gòu)建 Job 的區(qū)別是,Job 流程中包含 Flow 類型的時(shí)候需要在build()方法前調(diào)用end()方法。

        啟動(dòng)程序,控制臺(tái)日志打印如下:

        2020-03-06?14:36:42.621??INFO?18865?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=flowJob]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?14:36:42.667??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
        執(zhí)行步驟一操作。。。
        2020-03-06?14:36:42.697??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?30ms
        2020-03-06?14:36:42.744??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
        執(zhí)行步驟二操作。。。
        2020-03-06?14:36:42.771??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?27ms
        2020-03-06?14:36:42.824??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
        執(zhí)行步驟三操作。。。
        2020-03-06?14:36:42.850??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?25ms
        2020-03-06?14:36:42.874??INFO?18865?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=flowJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?245ms

        并行執(zhí)行

        任務(wù)中的步驟除了可以串行執(zhí)行(一個(gè)接著一個(gè)執(zhí)行)外,還可以并行執(zhí)行,并行執(zhí)行在特定的業(yè)務(wù)需求下可以提供任務(wù)執(zhí)行效率。

        將任務(wù)并行化只需兩個(gè)簡單步驟:

        1. 將步驟 Step 轉(zhuǎn)換為 Flow;
        2. 任務(wù) Job 中指定并行 Flow。

        舉個(gè)例子,在 job 包下新建SplitJobDemo類:

        @Component
        public?class?SplitJobDemo?{

        ????@Autowired
        ????private?JobBuilderFactory?jobBuilderFactory;
        ????@Autowired
        ????private?StepBuilderFactory?stepBuilderFactory;

        ????@Bean
        ????public?Job?splitJob()?{
        ????????return?jobBuilderFactory.get("splitJob")
        ????????????????.start(flow1())
        ????????????????.split(new?SimpleAsyncTaskExecutor()).add(flow2())
        ????????????????.end()
        ????????????????.build();

        ????}

        ????private?Step?step1()?{
        ????????return?stepBuilderFactory.get("step1")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step2()?{
        ????????return?stepBuilderFactory.get("step2")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step3()?{
        ????????return?stepBuilderFactory.get("step3")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Flow?flow1()?{
        ????????return?new?FlowBuilder("flow1")
        ????????????????.start(step1())
        ????????????????.next(step2())
        ????????????????.build();
        ????}

        ????private?Flow?flow2()?{
        ????????return?new?FlowBuilder("flow2")
        ????????????????.start(step3())
        ????????????????.build();
        ????}
        }

        上面例子中,我們創(chuàng)建了兩個(gè) Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通過JobBuilderFactorysplit方法,指定一個(gè)異步執(zhí)行器,將 flow1 和 flow2 異步執(zhí)行(也就是并行)。

        啟動(dòng)項(xiàng)目,控制臺(tái)日志打印如下:

        2020-03-06?15:25:43.602??INFO?19449?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=splitJob]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?15:25:43.643??INFO?19449?---?[cTaskExecutor-1]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
        2020-03-06?15:25:43.650??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
        執(zhí)行步驟三操作。。。
        執(zhí)行步驟一操作。。。
        2020-03-06?15:25:43.673??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?23ms
        2020-03-06?15:25:43.674??INFO?19449?---?[cTaskExecutor-1]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?31ms
        2020-03-06?15:25:43.714??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
        執(zhí)行步驟二操作。。。
        2020-03-06?15:25:43.738??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?24ms
        2020-03-06?15:25:43.758??INFO?19449?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=splitJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?146ms

        可以看到 step3 并沒有在 step2 后才執(zhí)行,說明步驟已經(jīng)是并行化的(開啟并行化后,并行的步驟執(zhí)行順序并不能 100%確定,因?yàn)榫€程調(diào)度具有不確定性)。

        任務(wù)決策器

        決策器的作用就是可以指定程序在不同的情況下運(yùn)行不同的任務(wù)流程,比如今天是周末,則讓任務(wù)執(zhí)行 step1 和 step2,如果是工作日,則之心 step1 和 step3。

        使用決策器前,我們需要自定義一個(gè)決策器的實(shí)現(xiàn)。在 cc.mrbird.batch 包下新建 decider 包,然后創(chuàng)建MyDecider類,實(shí)現(xiàn)JobExecutionDecider接口:

        @Component
        public?class?MyDecider?implements?JobExecutionDecider?{
        ????@Override
        ????public?FlowExecutionStatus?decide(JobExecution?jobExecution,?StepExecution?stepExecution)?{
        ????????LocalDate?now?=?LocalDate.now();
        ????????DayOfWeek?dayOfWeek?=?now.getDayOfWeek();

        ????????if?(dayOfWeek?==?DayOfWeek.SATURDAY?||?dayOfWeek?==?DayOfWeek.SUNDAY)?{
        ????????????return?new?FlowExecutionStatus("weekend");
        ????????}?else?{
        ????????????return?new?FlowExecutionStatus("workingDay");
        ????????}
        ????}
        }

        MyDecider實(shí)現(xiàn)JobExecutionDecider接口的decide方法,該方法返回FlowExecutionStatus。上面的邏輯是:判斷今天是否是周末,如果是,返回FlowExecutionStatus("weekend")狀態(tài),否則返回FlowExecutionStatus("workingDay")狀態(tài)。

        下面演示如何在任務(wù) Job 里使用決策器。在 job 包下新建DeciderJobDemo

        @Component
        public?class?DeciderJobDemo?{

        ????@Autowired
        ????private?JobBuilderFactory?jobBuilderFactory;
        ????@Autowired
        ????private?StepBuilderFactory?stepBuilderFactory;
        ????@Autowired
        ????private?MyDecider?myDecider;

        ????@Bean
        ????public?Job?deciderJob()?{
        ????????return?jobBuilderFactory.get("deciderJob")
        ????????????????.start(step1())
        ????????????????.next(myDecider)
        ????????????????.from(myDecider).on("weekend").to(step2())
        ????????????????.from(myDecider).on("workingDay").to(step3())
        ????????????????.from(step3()).on("*").to(step4())
        ????????????????.end()
        ????????????????.build();
        ????}

        ????private?Step?step1()?{
        ????????return?stepBuilderFactory.get("step1")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step2()?{
        ????????return?stepBuilderFactory.get("step2")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        ????private?Step?step3()?{
        ????????return?stepBuilderFactory.get("step3")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}


        ????private?Step?step4()?{
        ????????return?stepBuilderFactory.get("step4")
        ????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????System.out.println("執(zhí)行步驟四操作。。。");
        ????????????????????return?RepeatStatus.FINISHED;
        ????????????????}).build();
        ????}

        上面代碼中,我們注入了自定義決策器MyDecider,然后在jobDecider()方法里使用了該決策器:

        @Bean
        public?Job?deciderJob()?{
        ????return?jobBuilderFactory.get("deciderJob")
        ????????????.start(step1())
        ????????????.next(myDecider)
        ????????????.from(myDecider).on("weekend").to(step2())
        ????????????.from(myDecider).on("workingDay").to(step3())
        ????????????.from(step3()).on("*").to(step4())
        ????????????.end()
        ????????????.build();
        }

        這段代碼的含義是:任務(wù) deciderJob 首先執(zhí)行 step1,然后指定自定義決策器,如果決策器返回 weekend,那么執(zhí)行 step2,如果決策器返回 workingDay,那么執(zhí)行 step3。如果執(zhí)行了 step3,那么無論 step3 的結(jié)果是什么,都將執(zhí)行 step4。

        啟動(dòng)項(xiàng)目,控制臺(tái)輸出如下所示:

        2020-03-06?16:09:10.541??INFO?19873?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=deciderJob]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?16:09:10.609??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
        執(zhí)行步驟一操作。。。
        2020-03-06?16:09:10.641??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?32ms
        2020-03-06?16:09:10.692??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
        執(zhí)行步驟三操作。。。
        2020-03-06?16:09:10.723??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?31ms
        2020-03-06?16:09:10.769??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step4]
        執(zhí)行步驟四操作。。。
        2020-03-06?16:09:10.797??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step4]?executed?in?27ms
        2020-03-06?16:09:10.818??INFO?19873?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=deciderJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?256ms

        因?yàn)榻裉焓?2020 年 03 月 06 日星期五,是工作日,所以任務(wù)執(zhí)行了 step1、step3 和 step4。

        任務(wù)嵌套

        任務(wù) Job 除了可以由 Step 或者 Flow 構(gòu)成外,我們還可以將多個(gè)任務(wù) Job 轉(zhuǎn)換為特殊的 Step,然后再賦給另一個(gè)任務(wù) Job,這就是任務(wù)的嵌套。

        舉個(gè)例子,在 job 包下新建NestedJobDemo類:

        @Component
        public?class?NestedJobDemo?{

        ????@Autowired
        ????private?JobBuilderFactory?jobBuilderFactory;
        ????@Autowired
        ????private?StepBuilderFactory?stepBuilderFactory;
        ????@Autowired
        ????private?JobLauncher?jobLauncher;
        ????@Autowired
        ????private?JobRepository?jobRepository;
        ????@Autowired
        ????private?PlatformTransactionManager?platformTransactionManager;

        ????//?父任務(wù)
        ????@Bean
        ????public?Job?parentJob()?{
        ????????return?jobBuilderFactory.get("parentJob")
        ????????????????.start(childJobOneStep())
        ????????????????.next(childJobTwoStep())
        ????????????????.build();
        ????}


        ????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
        ????private?Step?childJobOneStep()?{
        ????????return?new?JobStepBuilder(new?StepBuilder("childJobOneStep"))
        ????????????????.job(childJobOne())
        ????????????????.launcher(jobLauncher)
        ????????????????.repository(jobRepository)
        ????????????????.transactionManager(platformTransactionManager)
        ????????????????.build();
        ????}

        ????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
        ????private?Step?childJobTwoStep()?{
        ????????return?new?JobStepBuilder(new?StepBuilder("childJobTwoStep"))
        ????????????????.job(childJobTwo())
        ????????????????.launcher(jobLauncher)
        ????????????????.repository(jobRepository)
        ????????????????.transactionManager(platformTransactionManager)
        ????????????????.build();
        ????}

        ????//?子任務(wù)一
        ????private?Job?childJobOne()?{
        ????????return?jobBuilderFactory.get("childJobOne")
        ????????????????.start(
        ????????????????????stepBuilderFactory.get("childJobOneStep")
        ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????????????????System.out.println("子任務(wù)一執(zhí)行步驟。。。");
        ????????????????????????????????return?RepeatStatus.FINISHED;
        ????????????????????????????}).build()
        ????????????????).build();
        ????}

        ????//?子任務(wù)二
        ????private?Job?childJobTwo()?{
        ????????return?jobBuilderFactory.get("childJobTwo")
        ????????????????.start(
        ????????????????????stepBuilderFactory.get("childJobTwoStep")
        ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
        ????????????????????????????????System.out.println("子任務(wù)二執(zhí)行步驟。。。");
        ????????????????????????????????return?RepeatStatus.FINISHED;
        ????????????????????????????}).build()
        ????????????????).build();
        ????}
        }

        上面代碼中,我們通過childJobOne()childJobTwo()方法創(chuàng)建了兩個(gè)任務(wù) Job,這里沒什么好說的,前面都介紹過。關(guān)鍵在于childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,我們通過JobStepBuilder構(gòu)建了一個(gè)名稱為childJobOneStep的 Step,顧名思義,它是一個(gè)任務(wù)型 Step 的構(gòu)造工廠,可以將任務(wù)轉(zhuǎn)換為“特殊”的步驟。在構(gòu)建過程中,我們還需要傳入任務(wù)執(zhí)行器 JobLauncher、任務(wù)倉庫 JobRepository 和事務(wù)管理器 PlatformTransactionManager。

        將任務(wù)轉(zhuǎn)換為特殊的步驟后,將其賦給父任務(wù) parentJob 即可,流程和前面介紹的一致。

        配置好后,啟動(dòng)項(xiàng)目,控制臺(tái)輸出如下所示:

        2020-03-06?16:58:39.771??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=parentJob]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?16:58:39.812??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobOneStep]
        2020-03-06?16:58:39.866??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobOne]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?16:58:39.908??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobOneStep]
        子任務(wù)一執(zhí)行步驟。。。
        2020-03-06?16:58:39.940??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobOneStep]?executed?in?32ms
        2020-03-06?16:58:39.960??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobOne]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?86ms
        2020-03-06?16:58:39.983??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobOneStep]?executed?in?171ms
        2020-03-06?16:58:40.019??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobTwoStep]
        2020-03-06?16:58:40.067??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobTwo]]?launched?with?the?following?parameters:?[{}]
        2020-03-06?16:58:40.102??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobTwoStep]
        子任務(wù)二執(zhí)行步驟。。。
        2020-03-06?16:58:40.130??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobTwoStep]?executed?in?28ms
        2020-03-06?16:58:40.152??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobTwo]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?75ms
        2020-03-06?16:58:40.157??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobTwoStep]?executed?in?138ms
        2020-03-06?16:58:40.177??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=parentJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?398ms

        本節(jié)源碼鏈接:https://github.com/wuyouzhuguli/SpringAll/tree/master/67.spring-batch-start

        參考資料

        [1]

        Spring Batch 官方文檔: https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/spring-batch-intro.html#spring-batch-intro

        [2]

        https://mrbird.cc/Spring-Batch 入門.html: https://mrbird.cc/Spring-Batch入門.html


        < END >

        也許你還想看
        ? |?我在 B 站淘了 2 個(gè) Java 實(shí)戰(zhàn)項(xiàng)目! 小破站,YYDS!
        ??|?我常用的20+個(gè)學(xué)習(xí)編程的網(wǎng)站!蕪湖起飛!
        ? |?1w+字的 Dubbo 面試題/知識(shí)點(diǎn)總結(jié)!(2021 最新版)
        ? |?7年前,24歲,出版了一本 Redis 神書
        ? |?京東二面:為什么需要分布式ID?你項(xiàng)目中是怎么做的?
        ? |?再見 Spring Task,這個(gè)定時(shí)任務(wù)框架真香!
        ? |?一鍵生成數(shù)據(jù)庫文檔,堪稱數(shù)據(jù)庫界的Swagger
        ? |?來看看這個(gè)超好用的項(xiàng)目腳手架吧!5分鐘搭建一個(gè)Spring Boot 前后端分離系統(tǒng)!
        ? |?Spring 官宣,干掉 Spring 5.3.x!

        我是 Guide哥,一個(gè)工作2年有余,接觸編程已經(jīng)6年有余的程序員。大三開源 JavaGuide,目前已經(jīng) 100k+ Star。未來幾年,希望持續(xù)完善 JavaGuide,爭取能夠幫助更多學(xué)習(xí) Java 的小伙伴!共勉!凎!點(diǎn)擊即可了解我的個(gè)人經(jīng)歷。

        歡迎點(diǎn)贊分享。咱們下期再會(huì)!

        瀏覽 31
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 动漫美女被到爽视频 | 影音先锋毛片 | 一边操一边摸 | 国产男人搡女人免费视频 | 婷婷开心色四房播播免费 |