1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        elastic-job的原理簡介和使用

        共 9932字,需瀏覽 20分鐘

         ·

        2021-04-01 02:38

        點擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

        優(yōu)質(zhì)文章,第一時間送達(dá)

        elastic-job是當(dāng)當(dāng)開源的一款非常好用的作業(yè)框架,在這之前,我們開發(fā)定時任務(wù)一般都是使用quartz或者spring-task(ScheduledExecutorService),無論是使用quartz還是spring-task,我們都會至少遇到兩個痛點: 

        1.不敢輕易跟著應(yīng)用服務(wù)多節(jié)點部署,可能會重復(fù)多次執(zhí)行而引發(fā)系統(tǒng)邏輯的錯誤。 

        2.quartz的集群僅僅只是用來HA,節(jié)點數(shù)量的增加并不能給我們的每次執(zhí)行效率帶來提升,即不能實現(xiàn)水平擴(kuò)展。 

        本篇博文將會自頂向下地介紹elastic-job,讓大家認(rèn)識了解并且快速搭建起環(huán)境。


        elastic-job產(chǎn)品線說明

        elastic-job在2.x之后,出了兩個產(chǎn)品線:Elastic-Job-Lite和Elastic-Job-Cloud。我們一般使用Elastic-Job-Lite就能夠滿足需求,本文也是以Elastic-Job-Lite為主。1.x系列對應(yīng)的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心類名,差別雖大,原理類似,建議使用2.x系列。寫此博文,最新release版本為2.0.5。

         

        elastic-job-lite原理

        舉個典型的job場景,比如余額寶里的昨日收益,系統(tǒng)需要job在每天某個時間點開始,給所有余額寶用戶計算收益。如果用戶數(shù)量不多,我們可以輕易使用quartz來完成,我們讓計息job在某個時間點開始執(zhí)行,循環(huán)遍歷所有用戶計算利息,這沒問題??墒牵绻脩趔w量特別大,我們可能會面臨著在第二天之前處理不完這么多用戶。另外,我們部署job的時候也得注意,我們可能會把job直接放在我們的webapp里,webapp通常是多節(jié)點部署的,這樣,我們的job也就是多節(jié)點,多個job同時執(zhí)行,很容易造成重復(fù)執(zhí)行,比如用戶重復(fù)計息,為了避免這種情況,我們可能會對job的執(zhí)行加鎖,保證始終只有一個節(jié)點能執(zhí)行,或者干脆讓job從webapp里剝離出來,獨自部署一個節(jié)點。 

        elastic-job就可以幫助我們解決上面的問題,elastic底層的任務(wù)調(diào)度還是使用的quartz,通過zookeeper來動態(tài)給job節(jié)點分片。 

        我們來看: 

        很大體量的用戶需要在特定的時間段內(nèi)計息完成 

        我們肯定是希望我們的任務(wù)可以通過集群達(dá)到水平擴(kuò)展,集群里的每個節(jié)點都處理部分用戶,不管用戶數(shù)量有多龐大,我們只要增加機(jī)器就可以了,比如單臺機(jī)器特定時間能處理n個用戶,2臺機(jī)器處理2n個用戶,3臺3n,4臺4n...,再多的用戶也不怕了。 

        使用elastic-job開發(fā)的作業(yè)都是zookeeper的客戶端,比如我希望3臺機(jī)器跑job,我們將任務(wù)分成3片,框架通過zk的協(xié)調(diào),最終會讓3臺機(jī)器分別分配到0,1,2的任務(wù)片,比如server0-->0,server1-->1,server2-->2,當(dāng)server0執(zhí)行時,可以只查詢id%3==0的用戶,server1執(zhí)行時,只查詢id%3==1的用戶,server2執(zhí)行時,只查詢id%3==2的用戶。 

        任務(wù)部署多節(jié)點引發(fā)重復(fù)執(zhí)行 

        在上面的基礎(chǔ)上,我們再增加server3,此時,server3分不到任務(wù)分片,因為只有3片,已經(jīng)分完了。沒有分到任務(wù)分片的作業(yè)程序?qū)⒉粓?zhí)行。 

        如果此時server2掛了,那么server2的分片項會分配給server3,server3有了分片,就會替代server2執(zhí)行。 

        如果此時server3也掛了,只剩下server0和server1了,框架也會自動把server3的分片隨機(jī)分配給server0或者server1,可能會這樣,server0-->0,server1-->1,2。 

        這種特性稱之為彈性擴(kuò)容,即elastic-job名稱的由來。

        代碼演示

        我們搭建環(huán)境通過示例代碼來演示上面的例子,elastic-job是不支持單機(jī)多實例的,通過zk的協(xié)調(diào)分片是以ip為單元的。很多同學(xué)上來可能就是通過單機(jī)多實例來學(xué)習(xí),結(jié)果導(dǎo)致分片和預(yù)期不一致。這里沒辦法,只能通過多機(jī)器或者虛擬機(jī),我們這里使用虛擬機(jī),另外,由于資源有限,我們這里僅僅只模擬兩臺機(jī)器。 


        節(jié)點說明: 

        本地宿主機(jī)器 

        zookeeper、job 

        192.168.241.1 


        虛擬機(jī) 

        job 

        192.168.241.128 


        環(huán)境說明: 

        Java 

        請使用JDK1.7及其以上版本。 

        Zookeeper 

        請使用Zookeeper3.4.6及其以上版本 

        Elastic-Job-Lite 

        2.0.5(2.x系列即可,最好是2.0.4及其以上,因為2.0.4版本有本人提交的少許代碼,(*^__^*) 嘻嘻……) 


        需求說明: 

        通過兩臺機(jī)器演示動態(tài)分片 


        step1. 引入框架的jar包

        <!-- 引入elastic-job-lite核心模塊 -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.0.5</version>
        </dependency>
        <!-- 使用springframework自定義命名空間時引入 -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.0.5</version>
        </dependency>


        step2. 編寫job 

        package com.fanfan.sample001;
         
        import com.dangdang.ddframe.job.api.ShardingContext;
        import com.dangdang.ddframe.job.api.simple.SimpleJob;
         
        import java.util.Date;
         
        /**
         * Created by fanfan on 2016/12/20.
         */
        public class MySimpleJob implements SimpleJob {
            @Override
            public void execute(ShardingContext shardingContext) {
                System.out.println(String.format("------Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項: %s",
                        Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
                /**
                 * 實際開發(fā)中,有了任務(wù)總片數(shù)和當(dāng)前分片項,就可以對任務(wù)進(jìn)行分片執(zhí)行了
                 * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
                 */
            }
        }

        Step3. Spring配置

        <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
               xmlns:job="http://www.dangdang.com/schema/ddframe/job"
               xsi:schemaLocation="http://www.springframework.org/schema/beans
                                http://www.springframework.org/schema/beans/spring-beans.xsd
                                http://www.dangdang.com/schema/ddframe/reg
                                http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                                http://www.dangdang.com/schema/ddframe/job
                                http://www.dangdang.com/schema/ddframe/job/job.xsd"
        >
            <!--配置作業(yè)注冊中心 -->
            <reg:zookeeper id="regCenter" server-lists="192.168.241.1:2181" namespace="dd-job"
                           base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
         
            <!-- 配置作業(yè)-->
            <job:simple id="mySimpleJob" class="com.fanfan.sample001.MySimpleJob" registry-center-ref="regCenter"
                        sharding-total-count="2" cron="0/2 * * * * ?" overwrite="true" />
         
        </beans>


        Case1. 單節(jié)點




        Case2. 增加一個節(jié)點

          

         

        Case3. 斷開一個節(jié)點 

         

        作業(yè)類型

        elastic-job提供了三種類型的作業(yè):Simple類型作業(yè)、Dataflow類型作業(yè)、Script類型作業(yè)。這里主要講解前兩者。Script類型作業(yè)意為腳本類型作業(yè),支持shell,python,perl等所有類型腳本,使用不多,可以參見github文檔。 


        SimpleJob需要實現(xiàn)SimpleJob接口,意為簡單實現(xiàn),未經(jīng)過任何封裝,與quartz原生接口相似,比如示例代碼中所使用的job。 


        Dataflow類型用于處理數(shù)據(jù)流,需實現(xiàn)DataflowJob接口。該接口提供2個方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù)。 

        可通過DataflowJobConfiguration配置是否流式處理。 

        流式處理數(shù)據(jù)只有fetchData方法的返回值為null或集合長度為空時,作業(yè)才停止抓取,否則作業(yè)將一直運行下去;非流式處理數(shù)據(jù)則只會在每次作業(yè)執(zhí)行過程中執(zhí)行一次fetchData方法和processData方法,隨即完成本次作業(yè)。 

        實際開發(fā)中,Dataflow類型的job還是很有好用的。 


        比如拿余額寶計息來說: 

        package com.fanfan.sample001;
         
        import com.dangdang.ddframe.job.api.ShardingContext;
        import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
         
        import java.util.ArrayList;
        import java.util.List;
         
        /**
         * Created by fanfan on 2016/12/23.
         */
        public class MyDataFlowJob implements DataflowJob<User> {
         
            /*
                status
                0:待處理
                1:已處理
             */
         
            @Override
            public List<User> fetchData(ShardingContext shardingContext) {
                List<User> users = null;
                /**
                 * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
                 */
                return users;
            }
         
            @Override
            public void processData(ShardingContext shardingContext, List<User> data) {
                for (User user: data) {
                    System.out.println(String.format("用戶 %s 開始計息", user.getUserId()));
                    user.setStatus(1);
                    /**
                     * update user
                     */
                }
            }
        }
        <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"
                      sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />

        其它功能

        上述介紹的是最精簡常用的功能。elastic-job的功能集還不止這些,比如像作業(yè)事件追蹤、任務(wù)監(jiān)聽等,另外,elastic-job-lite-console作為一個獨立的運維平臺還提供了用來查詢和操作任務(wù)的web頁面。 

        這些增強(qiáng)的功能讀者可以在github/elastic-job上自行學(xué)習(xí),相信有了本篇博文的基礎(chǔ),再閱讀那些文檔就特別簡單了。 

        ————————————————

        版權(quán)聲明:本文為CSDN博主「秋名車手」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。

        原文鏈接:

        https://blog.csdn.net/fanfan_v5/article/details/61310045





        粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

        ??????

        ??長按上方微信二維碼 2 秒


        感謝點贊支持下哈 

        瀏覽 57
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            特级西西www大胆无码 | 欧美女人性爱 | 黄片欧美 | 8050午夜一级免费 | 国产一区二区三区www | 黄片小网站| 亚洲深深色噜噜狠狠爱网站 | 三级经典在线视频 | ai换脸刘亦菲国产精品 | 公交车上少妇迎合我摩擦 |