1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        SpringBoot + SpringBatch + Quartz整合定時(shí)批量任務(wù)

        共 4864字,需瀏覽 10分鐘

         ·

        2021-12-28 18:42

        來源:blog.csdn.net/zxd1435513775/article/

        details/99677223

        一、引言

        最近一周,被借調(diào)到其他部門,趕一個(gè)緊急需求,需求內(nèi)容如下:

        PC網(wǎng)頁觸發(fā)一條設(shè)備升級(jí)記錄(下圖),后臺(tái)要定時(shí)批量設(shè)備更新。這里定時(shí)要用到Quartz,批量數(shù)據(jù)處理要用到SpringBatch,二者結(jié)合,可以完成該需求。

        由于之前,沒有用過SpringBatch,于是上網(wǎng)查了下資料,發(fā)現(xiàn)可參考的不是很多,于是只能去慢慢的翻看官方文檔。

        https://docs.spring.io/spring-batch/4.1.x/reference/html/

        遇到不少問題,就記錄一下吧。

        二、代碼具體實(shí)現(xiàn)

        1、pom文件

        ??<dependencies>
        ????<dependency>
        ??????<groupId>org.springframework.bootgroupId>
        ??????<artifactId>spring-boot-starter-webartifactId>
        ????dependency>
        ????<dependency>
        ??????<groupId>org.postgresqlgroupId>
        ??????<artifactId>postgresqlartifactId>
        ????dependency>
        ????<dependency>
        ??????<groupId>org.springframework.bootgroupId>
        ??????<artifactId>spring-boot-starter-jdbcartifactId>
        ????dependency>
        ????<dependency>
        ??????<groupId>org.springframework.bootgroupId>
        ??????<artifactId>spring-boot-starter-batchartifactId>
        ????dependency>
        ????<dependency>
        ??????<groupId>org.projectlombokgroupId>
        ??????<artifactId>lombokartifactId>
        ????dependency>
        ????<dependency>
        ??????<groupId>org.springframework.bootgroupId>
        ??????<artifactId>spring-boot-starter-batchartifactId>
        ????dependency>
        ???dependencies>

        2、application.yaml文件

        spring:
        ??datasource:
        ????username:?thinklink
        ????password:?thinklink
        ????url:?jdbc:postgresql://172.16.205.54:5432/thinklink
        ????driver-class-name:?org.postgresql.Driver
        ??batch:
        ????job:
        ??????enabled:?false
        server:
        ??port:?8073

        #upgrade-dispatch-base-url:?http://172.16.205.125:8080/api/rpc/dispatch/command/
        upgrade-dispatch-base-url:?http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/

        #?每次批量處理的數(shù)據(jù)量,默認(rèn)為5000
        batch-size:?5000

        3、Service實(shí)現(xiàn)類,觸發(fā)批處理任務(wù)的入口,執(zhí)行一個(gè)job

        @Service("batchService")
        public?class?BatchServiceImpl?implements?BatchService?{

        ?//?框架自動(dòng)注入
        ????@Autowired
        ????private?JobLauncher?jobLauncher;
        ????@Autowired
        ????private?Job?updateDeviceJob;
        ????/**
        ?????*?根據(jù)?taskId?創(chuàng)建一個(gè)Job
        ?????*?@param?taskId
        ?????*?@throws?Exception
        ?????*/

        ????@Override
        ????public?void?createBatchJob(String?taskId)?throws?Exception?{
        ????????JobParameters?jobParameters?=?new?JobParametersBuilder()
        ????????????????.addString("taskId",?taskId)
        ????????????????.addString("uuid",?UUID.randomUUID().toString().replace("-",""))
        ????????????????.toJobParameters();
        ????????//?傳入一個(gè)Job任務(wù)和任務(wù)需要的參數(shù)
        ????????jobLauncher.run(updateDeviceJob,?jobParameters);
        ????}
        }

        4、SpringBatch配置類,此部分最重要(☆☆☆☆☆)

        @Configuration
        public?class?BatchConfiguration?{

        ????private?static?final?Logger?log?=?LoggerFactory.getLogger(BatchConfiguration.class);

        ????@Value("${batch-size:5000}")
        ????private?int?batchSize;

        ?//?框架自動(dòng)注入
        ????@Autowired
        ????public?JobBuilderFactory?jobBuilderFactory;

        ?//?框架自動(dòng)注入
        ????@Autowired
        ????public?StepBuilderFactory?stepBuilderFactory;

        ?//?數(shù)據(jù)過濾器,對(duì)從數(shù)據(jù)庫讀出來的數(shù)據(jù),注意進(jìn)行操作
        ????@Autowired
        ????public?TaskItemProcessor?taskItemProcessor;

        ????//?接收job參數(shù)
        ????public?Map?parameters;

        ????public?Object?taskId;

        ????@Autowired
        ????private?JdbcTemplate?jdbcTemplate;

        ?//?讀取數(shù)據(jù)庫操作
        ????@Bean
        ????@StepScope
        ????public?JdbcCursorItemReader?itemReader(DataSource?dataSource)?{

        ????????String?querySql?=?"?SELECT?"?+
        ????????????????"?e.?ID?AS?taskId,?"?+
        ????????????????"?e.user_id?AS?userId,?"?+
        ????????????????"?e.timing_startup?AS?startTime,?"?+
        ????????????????"?u.device_id?AS?deviceId,?"?+
        ????????????????"?d.app_name?AS?appName,?"?+
        ????????????????"?d.compose_file?AS?composeFile,?"?+
        ????????????????"?e.failure_retry?AS?failureRetry,?"?+
        ????????????????"?e.tetry_times?AS?retryTimes,?"?+
        ????????????????"?e.device_managered?AS?deviceManagered?"?+
        ????????????????"?FROM?"?+
        ????????????????"?eiot_upgrade_task?e?"?+
        ????????????????"?LEFT?JOIN?eiot_upgrade_device?u?ON?e.?ID?=?u.upgrade_task_id?"?+
        ????????????????"?LEFT?JOIN?eiot_app_detail?d?ON?e.app_id?=?d.?ID?"?+
        ????????????????"?WHERE?"?+
        ????????????????"?(?"?+
        ????????????????"?u.device_upgrade_status?=?0?"?+
        ????????????????"?OR?u.device_upgrade_status?=?2"?+
        ????????????????"?)"?+
        ????????????????"?AND?e.tetry_times?>?u.retry_times?"?+
        ????????????????"?AND?e.?ID?=??";

        ????????return?new?JdbcCursorItemReaderBuilder()
        ????????????????.name("itemReader")
        ????????????????.sql(querySql)
        ????????????????.dataSource(dataSource)
        ????????????????.queryArguments(new?Object[]{parameters.get("taskId").getValue()})
        ????????????????.rowMapper(new?DispatchRequest.DispatchRequestRowMapper())
        ????????????????.build();
        ????}

        ?//?將結(jié)果寫回?cái)?shù)據(jù)庫
        ????@Bean
        ????@StepScope
        ????public?ItemWriter?itemWriter()?{
        ????????return?new?ItemWriter()?{

        ????????????private?int?updateTaskStatus(DispatchRequest?dispatchRequest,?int?status)?{
        ????????????????log.info("update?taskId:?{},?deviceId:?{}?to?status?{}",?dispatchRequest.getTaskId(),?dispatchRequest.getDeviceId(),?status);

        ????????????????Integer?retryTimes?=?jdbcTemplate.queryForObject(
        ????????????????????????"select?retry_times?from?eiot_upgrade_device?where?device_id?=???and?upgrade_task_id?=??",
        ????????????????????????new?Object[]{?dispatchRequest.getDeviceId(),?dispatchRequest.getTaskId()},?Integer.class
        ????????????????)
        ;
        ????????????????retryTimes?+=?1;
        ????????????????int?updateCount?=?jdbcTemplate.update("update?eiot_upgrade_device?set?device_upgrade_status?=??,?retry_times?=???"?+
        ????????????????????????"where?device_id?=???and?upgrade_task_id?=??",?status,?retryTimes,?dispatchRequest.getDeviceId(),?dispatchRequest.getTaskId());
        ????????????????if?(updateCount?<=?0)?{
        ????????????????????log.warn("no?task?updated");
        ????????????????}?else?{
        ????????????????????log.info("count?of?{}?task?updated",?updateCount);
        ????????????????}

        ????????????????//?最后一次重試
        ????????????????if?(status?==?STATUS_DISPATCH_FAILED?&&?retryTimes?==?dispatchRequest.getRetryTimes())?{
        ????????????????????log.info("the?last?retry?of?{}?failed,?inc?deviceManagered",?dispatchRequest.getTaskId());
        ????????????????????return?1;
        ????????????????}?else?{
        ????????????????????return?0;
        ????????????????}
        ????????????}

        ????????????@Override
        ????????????@Transactional
        ????????????public?void?write(List?list)?throws?Exception?{
        ????????????????Map?taskMap?=?jdbcTemplate.queryForMap(
        ????????????????????????"select?device_managered,?device_count,?task_status?from?eiot_upgrade_task?where?id?=??",
        ????????????????????????list.get(0).getDispatchRequest().getTaskId()?//?我們認(rèn)定一個(gè)批量里面,taskId都是一樣的
        ????????????????????????);
        ????????????????int?deviceManagered?=?(int)taskMap.get("device_managered");
        ????????????????Integer?deviceCount?=?(Integer)?taskMap.get("device_count");
        ????????????????if?(deviceCount?==?null)?{
        ????????????????????log.warn("deviceCount?of?task?{}?is?null",?list.get(0).getDispatchRequest().getTaskId());
        ????????????????}
        ????????????????int?taskStatus?=?(int)taskMap.get("task_status");
        ????????????????for?(ProcessResult?result:?list)?{
        ????????????????????deviceManagered?+=?updateTaskStatus(result.getDispatchRequest(),?result.getStatus());
        ????????????????}
        ????????????????if?(deviceCount?!=?null?&&?deviceManagered?==?deviceCount)?{
        ????????????????????taskStatus?=?2;?//任務(wù)狀態(tài)?0:待升級(jí),1:升級(jí)中,2:已完成
        ????????????????}
        ????????????????jdbcTemplate.update("update?eiot_upgrade_task??set?device_managered?=??,?task_status?=???"?+
        ????????????????????????"where?id?=??",?deviceManagered,?taskStatus,?list.get(0).getDispatchRequest().getTaskId());
        ????????????}
        ????????};
        ????}

        ????/**
        ?????*?定義一個(gè)下發(fā)更新的?job
        ?????*?@return
        ?????*/

        ????@Bean
        ????public?Job?updateDeviceJob(Step?updateDeviceStep)?{
        ????????return?jobBuilderFactory.get(UUID.randomUUID().toString().replace("-",?""))
        ????????????????.listener(new?JobListener())?//?設(shè)置Job的監(jiān)聽器
        ????????????????.flow(updateDeviceStep)//?執(zhí)行下發(fā)更新的Step
        ????????????????.end()
        ????????????????.build();
        ????}

        ????/**
        ?????*?定義一個(gè)下發(fā)更新的?step
        ?????*?@return
        ?????*/

        ????@Bean
        ????public?Step?updateDeviceStep(JdbcCursorItemReader?itemReader,ItemWriter?itemWriter)?{
        ????????return?stepBuilderFactory.get(UUID.randomUUID().toString().replace("-",?""))
        ????????????????.?chunk(batchSize)
        ????????????????.reader(itemReader)?//根據(jù)taskId從數(shù)據(jù)庫讀取更新設(shè)備信息
        ????????????????.processor(taskItemProcessor)?//?每條更新信息,執(zhí)行下發(fā)更新接口
        ????????????????.writer(itemWriter)
        ????????????????.build();
        ????}

        ????//?job?監(jiān)聽器
        ????public?class?JobListener?implements?JobExecutionListener?{

        ????????@Override
        ????????public?void?beforeJob(JobExecution?jobExecution)?{
        ????????????log.info(jobExecution.getJobInstance().getJobName()?+?"?before...?");
        ????????????parameters?=?jobExecution.getJobParameters().getParameters();
        ????????????taskId?=?parameters.get("taskId").getValue();
        ????????????log.info("job?param?taskId?:?"?+?parameters.get("taskId"));
        ????????}

        ????????@Override
        ????????public?void?afterJob(JobExecution?jobExecution)?{

        ????????????log.info(jobExecution.getJobInstance().getJobName()?+?"?after...?");
        ????????????//?當(dāng)所有job執(zhí)行完之后,查詢?cè)O(shè)備更新狀態(tài),如果有失敗,則要定時(shí)重新執(zhí)行job
        ????????????String?sql?=?"?SELECT?"?+
        ????????????????????"?count(*)?"?+
        ????????????????????"?FROM?"?+
        ????????????????????"?eiot_upgrade_device?d?"?+
        ????????????????????"?LEFT?JOIN?eiot_upgrade_task?u?ON?d.upgrade_task_id?=?u.?ID?"?+
        ????????????????????"?WHERE?"?+
        ????????????????????"?u.?ID?=???"?+
        ????????????????????"?AND?d.retry_times??+
        ????????????????????"?AND?(?"?+
        ????????????????????"?d.device_upgrade_status?=?0?"?+
        ????????????????????"?OR?d.device_upgrade_status?=?2?"?+
        ????????????????????"?)?";

        ????????????//?獲取更新失敗的設(shè)備個(gè)數(shù)
        ????????????Integer?count?=?jdbcTemplate.queryForObject(sql,?new?Object[]{taskId},?Integer.class);

        ????????????log.info("update?device?failure?count?:?"?+?count);

        ????????????//?下面是使用Quartz觸發(fā)定時(shí)任務(wù)
        ????????????//?獲取任務(wù)時(shí)間,單位秒
        //????????????String?time?=?jdbcTemplate.queryForObject(sql,?new?Object[]{taskId},?Integer.class);
        ????????????//?此處方便測(cè)試,應(yīng)該從數(shù)據(jù)庫中取taskId對(duì)應(yīng)的重試間隔,單位秒
        ????????????Integer?millSecond?=?10;

        ????????????if(count?!=?null?&&?count?>?0){
        ????????????????String?jobName?=?"UpgradeTask_"?+?taskId;
        ????????????????String?reTaskId?=?taskId.toString();
        ????????????????Map?params?=?new?HashMap<>();
        ????????????????params.put("jobName",jobName);
        ????????????????params.put("taskId",reTaskId);
        ????????????????if?(QuartzManager.checkNameNotExist(jobName))
        ????????????????{
        ????????????????????QuartzManager.scheduleRunOnceJob(jobName,?RunOnceJobLogic.class,params,millSecond);
        ????????????????}
        ????????????}

        ????????}
        ????}
        }

        5、Processor,處理每條數(shù)據(jù),可以在此對(duì)數(shù)據(jù)進(jìn)行過濾操作

        @Component("taskItemProcessor")
        public?class?TaskItemProcessor?implements?ItemProcessor<DispatchRequest,?ProcessResult>?{

        ????public?static?final?int?STATUS_DISPATCH_FAILED?=?2;
        ????public?static?final?int?STATUS_DISPATCH_SUCC?=?1;

        ????private?static?final?Logger?log?=?LoggerFactory.getLogger(TaskItemProcessor.class);

        ????@Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
        ????private?String?dispatchUrl;

        ????@Autowired
        ????JdbcTemplate?jdbcTemplate;

        ????/**
        ?????*?在這里,執(zhí)行?下發(fā)更新指令?的操作
        ?????*?@param?dispatchRequest
        ?????*?@return
        ?????*?@throws?Exception
        ?????*/

        ????@Override
        ????public?ProcessResult?process(final?DispatchRequest?dispatchRequest)?{
        ????????//?調(diào)用接口,下發(fā)指令
        ????????String?url?=?dispatchUrl?+?dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();

        ????????log.info("request?url:"?+?url);
        ????????RestTemplate?restTemplate?=?new?RestTemplate();
        ????????HttpHeaders?headers?=?new?HttpHeaders();
        ????????headers.setContentType(MediaType.APPLICATION_JSON_UTF8);

        ????????MultiValueMap?params?=?new?LinkedMultiValueMap();

        ????????JSONObject?jsonOuter?=?new?JSONObject();
        ????????JSONObject?jsonInner?=?new?JSONObject();
        ????????try?{
        ????????????jsonInner.put("jobId",dispatchRequest.getTaskId());
        ????????????jsonInner.put("name",dispatchRequest.getName());
        ????????????jsonInner.put("composeFile",?Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
        ????????????jsonInner.put("policy",new?JSONObject().put("startTime",dispatchRequest.getPolicy()));
        ????????????jsonInner.put("timestamp",dispatchRequest.getTimestamp());

        ????????????jsonOuter.put("method","updateApp");
        ????????????jsonOuter.put("params",jsonInner);
        ????????}?catch?(JSONException?e)?{
        ????????????log.info("JSON?convert?Exception?:"?+?e);
        ????????}catch?(IOException?e)?{
        ????????????log.info("Base64Util?bytesToBase64Str?:"?+?e);
        ????????}

        ????????log.info("request?body?json?:"?+?jsonOuter);
        ????????HttpEntity?requestEntity?=?new?HttpEntity(jsonOuter.toString(),headers);
        ????????int?status;
        ????????try?{
        ????????????ResponseEntity?response?=?restTemplate.postForEntity(url,requestEntity,String.class);
        ????????????log.info("response?:"?+?response);
        ????????????if?(response.getStatusCode()?==?HttpStatus.OK)?{
        ????????????????status?=?STATUS_DISPATCH_SUCC;
        ????????????}?else?{
        ????????????????status?=?STATUS_DISPATCH_FAILED;
        ????????????}

        ????????}catch?(Exception?e){
        ????????????status?=?STATUS_DISPATCH_FAILED;
        ????????}

        ????????return?new?ProcessResult(dispatchRequest,?status);
        ????}
        }

        6、封裝數(shù)據(jù)庫返回?cái)?shù)據(jù)的實(shí)體Bean,注意靜態(tài)內(nèi)部類


        public?class?DispatchRequest?{

        ????private?String?taskId;
        ????private?String?deviceId;
        ????private?String?userId;
        ????private?String?name;
        ????private?byte[]?composeFile;
        ????private?String?policy;
        ????private?String?timestamp;
        ????private?String?md5;
        ????private?int?failureRetry;
        ????private?int?retryTimes;
        ????private?int?deviceManagered;

        ???//?省略構(gòu)造函數(shù),setter/getter/tostring方法
        ???//......
        ???
        ????public?static?class?DispatchRequestRowMapper?implements?RowMapper<DispatchRequest>?{
        ????????@Override
        ????????public?DispatchRequest?mapRow(ResultSet?resultSet,?int?i)?throws?SQLException?{

        ????????????DispatchRequest?dispatchRequest?=?new?DispatchRequest();
        ????????????dispatchRequest.setTaskId(resultSet.getString("taskId"));
        ????????????dispatchRequest.setUserId(resultSet.getString("userId"));
        ????????????dispatchRequest.setPolicy(resultSet.getString("startTime"));
        ????????????dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
        ????????????dispatchRequest.setName(resultSet.getString("appName"));
        ????????????dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
        ????????????dispatchRequest.setTimestamp(DateUtil.DateToString(new?Date()));
        ????????????dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
        ????????????dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
        ????????????dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
        ????????????return?dispatchRequest;
        ????????}
        ????}
        }

        7、啟動(dòng)類上要加上注解

        @SpringBootApplication
        @EnableBatchProcessing
        public?class?Application?{

        ????public?static?void?main(String[]?args)?{
        ????????SpringApplication.run(Application.class,?args);
        ????}
        }

        三、小結(jié)

        其實(shí)SpringBatch并沒有想象中那么好用,當(dāng)從數(shù)據(jù)庫中每次取5000條數(shù)據(jù)后,進(jìn)入processor中是逐條處理的,這個(gè)時(shí)候不能不行操作,等5000條數(shù)據(jù)處理完之后,再一次性執(zhí)行ItemWriter方法。

        在使用的過程中,最坑的地方是ItemReader和ItemWriter這兩個(gè)地方,如何執(zhí)行自定義的Sql,參考文中代碼就行。

        至于Quartz定時(shí)功能,很簡(jiǎn)單,只要定時(shí)創(chuàng)建SpringBatch里面的Job,讓這個(gè)job啟動(dòng)就好了,此處就不在給出了,貼的代碼太多了。由于公司一些原因,代碼不能放到GitHub上。

        END


        推薦閱讀

        一鍵生成Springboot & Vue項(xiàng)目!【畢設(shè)神器】

        Java可視化編程工具系列(一)

        Java可視化編程工具系列(二)


        順便給大家推薦一個(gè)GitHub項(xiàng)目,這個(gè) GitHub 整理了上千本常用技術(shù)PDF,絕大部分核心的技術(shù)書籍都可以在這里找到,

        GitHub地址:https://github.com/javadevbooks/books

        Gitee地址:https://gitee.com/javadevbooks/books

        電子書已經(jīng)更新好了,你們需要的可以自行下載了,記得點(diǎn)一個(gè)star,持續(xù)更新中..


        瀏覽 23
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            大香蕉伊人综合 | 91毛片视频 | 夜夜艹日日艹 | 啊灬啊别停灬用力啊岳一剧情介绍 | 西西人体444WWW无码男男 | 亚洲欧美久久 | 让男人爽到不行的床技 | 自拍 亚洲 女优 欧美 偷拍 | 97人妻人人揉人人躁 原 | 俄罗斯的黄色片 |