1. Kafka 怎么順序消費?面試必備!

        共 3834字,需瀏覽 8分鐘

         ·

        2022-03-23 11:12

        點擊關注公眾號,Java干貨及時送達

        前言

        本文針對解決Kafka不同Topic之間存在一定的數(shù)據(jù)關聯(lián)時的順序消費問題。

        如存在Topic-insert和Topic-update分別是對數(shù)據(jù)的插入和更新,當insert和update操作為同一數(shù)據(jù)時,應保證先insert再update。

        1、問題引入

        kafka的順序消費一直是一個難以解決的問題,kafka的消費策略是對于同Topic同Partition的消息可保證順序消費,其余無法保證。

        如果一個Topic只有一個Partition,那么這個Topic對應consumer的消費必然是有序的。不同的Topic的任何情況下都無法保證consumer的消費順序和producer的發(fā)送順序一致。

        如果不同Topic之間存在數(shù)據(jù)關聯(lián)且對消費順序有要求,該如何處理?本文主要解決此問題。

        另外,Kafka 系列面試題和答案全部整理好了,微信搜索Java技術棧,在后臺發(fā)送:面試,可以在線閱讀。

        2、解決思路

        現(xiàn)有Topic-insert和Topic-update,數(shù)據(jù)唯一標識為id,對于id=1的數(shù)據(jù)而言,要保證Topic-insert消費在前,Topic-update消費在后。想成為架構師,這份架構師圖譜建議看看,少走彎路。

        兩個Topic的消費為不同線程處理,所以為了保證在同一時間內(nèi)的同一數(shù)據(jù)標識的消息僅有一個業(yè)務邏輯在處理,需要對業(yè)務添加鎖操作。

        使用synchronized進行加鎖的話,會影響無關聯(lián)的insert和update的數(shù)據(jù)消費能力,如id=1的insert和id=2的update,在synchronized的情況下,無法并發(fā)處理,這是沒有必要的,我們需要的是對于id=1的insert和id=1的update在同一時間只有一個在處理,所以使用細粒度鎖來完成加鎖的操作。

        細粒度鎖實現(xiàn):https://blog.csdn.net/qq_38245668/article/details/105891161

        PS:如果為分布式系統(tǒng),細粒度鎖需要使用分布式鎖的對應實現(xiàn)。

        在對insert和update加鎖之后,其實還是沒有解決消費順序的問題,只是確保了同一時間只有一個業(yè)務在處理。 對于消費順序異常的問題,也就是先消費了update再消費insert的情況。

        處理方式:消費到update數(shù)據(jù),校驗庫中是否存在當前數(shù)據(jù)(也就是是否執(zhí)行insert),如果沒有,就將當前update數(shù)據(jù)存入緩存,key為數(shù)據(jù)標識id,在insert消費時檢查是否存在id對應的update緩存,如果有,就證明當前數(shù)據(jù)的消費順序異常,需執(zhí)行update操作,再將緩存數(shù)據(jù)移除。

        點擊關注公眾號,Java干貨及時送達

        3、實現(xiàn)方案

        ?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();????//?數(shù)據(jù)存儲????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();????private?WeakRefHashLock?weakRefHashLock;????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{????????this.weakRefHashLock?=?weakRefHashLock;????}????@KafkaListener(topics?=?"TOPIC_INSERT")????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結束",?id);????}}日志(代碼中已模擬必現(xiàn)消費順序異常的場景):接收到update ::1消費順序異常,將update數(shù)據(jù)?1?加入緩存接收到insert ::1開始處理?1?的insert開始處理update::1處理update::1 結束處理?1?的insert?結束觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">消息發(fā)送:

        ?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();????//?數(shù)據(jù)存儲????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();????private?WeakRefHashLock?weakRefHashLock;????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{????????this.weakRefHashLock?=?weakRefHashLock;????}????@KafkaListener(topics?=?"TOPIC_INSERT")????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結束",?id);????}}日志(代碼中已模擬必現(xiàn)消費順序異常的場景):接收到update ::1消費順序異常,將update數(shù)據(jù)?1?加入緩存接收到insert ::1開始處理?1?的insert開始處理update::1處理update::1 結束處理?1?的insert?結束觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">kafkaTemplate.send("TOPIC_INSERT",?"1");
        kafkaTemplate.send("TOPIC_UPDATE",?"1");

        最新 Kafka 面試題整理好了,大家可以在Java面試庫小程序在線刷題。

        監(jiān)聽代碼示例:

        ?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();????//?數(shù)據(jù)存儲????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();????private?WeakRefHashLock?weakRefHashLock;????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{????????this.weakRefHashLock?=?weakRefHashLock;????}????@KafkaListener(topics?=?"TOPIC_INSERT")????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結束",?id);????}}日志(代碼中已模擬必現(xiàn)消費順序異常的場景):接收到update ::1消費順序異常,將update數(shù)據(jù)?1?加入緩存接收到insert ::1開始處理?1?的insert開始處理update::1處理update::1 結束處理?1?的insert?結束觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">KafkaListenerDemo.java

        ?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();????//?數(shù)據(jù)存儲????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();????private?WeakRefHashLock?weakRefHashLock;????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{????????this.weakRefHashLock?=?weakRefHashLock;????}????@KafkaListener(topics?=?"TOPIC_INSERT")????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結束",?id);????}}日志(代碼中已模擬必現(xiàn)消費順序異常的場景):接收到update ::1消費順序異常,將update數(shù)據(jù)?1?加入緩存接收到insert ::1開始處理?1?的insert開始處理update::1處理update::1 結束處理?1?的insert?結束觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">@Component
        @Slf4j
        public?class?KafkaListenerDemo?{

        ????//?消費到的數(shù)據(jù)緩存
        ????private?Map?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();
        ????//?數(shù)據(jù)存儲
        ????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();
        ????private?WeakRefHashLock?weakRefHashLock;

        ????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{
        ????????this.weakRefHashLock?=?weakRefHashLock;
        ????}

        ????@KafkaListener(topics?=?"TOPIC_INSERT")
        ????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{
        ????????//?模擬順序異常,也就是insert后消費,這里線程sleep
        ????????Thread.sleep(1000);

        ????????String?id?=?record.value();
        ????????log.info("接收到insert ::?{}",?id);
        ????????Lock?lock?=?weakRefHashLock.lock(id);
        ????????lock.lock();
        ????????try?{
        ????????????log.info("開始處理?{}?的insert",?id);
        ????????????//?模擬?insert?業(yè)務處理
        ????????????Thread.sleep(1000);
        ????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)
        ????????????if?(UPDATE_DATA_MAP.containsKey(id)){
        ????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update
        ????????????????doUpdate(id);
        ????????????}
        ????????????log.info("處理?{}?的insert?結束",?id);
        ????????}finally?{
        ????????????lock.unlock();
        ????????}
        ????????acknowledgment.acknowledge();
        ????}

        ????@KafkaListener(topics?=?"TOPIC_UPDATE")
        ????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{

        ????????String?id?=?record.value();
        ????????log.info("接收到update ::?{}",?id);
        ????????Lock?lock?=?weakRefHashLock.lock(id);
        ????????lock.lock();
        ????????try?{
        ????????????//?測試使用,不做數(shù)據(jù)庫的校驗
        ????????????if?(!DATA_MAP.containsKey(id)){
        ????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存
        ????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);
        ????????????????UPDATE_DATA_MAP.put(id,?id);
        ????????????}else?{
        ????????????????doUpdate(id);
        ????????????}
        ????????}finally?{
        ????????????lock.unlock();
        ????????}
        ????????acknowledgment.acknowledge();
        ????}

        ????void?doUpdate(String?id)?throws?InterruptedException{
        ????????//?模擬?update
        ????????log.info("開始處理update::{}",?id);
        ????????Thread.sleep(1000);
        ????????log.info("處理update::{}?結束",?id);
        ????}

        }

        ?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();????//?數(shù)據(jù)存儲????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();????private?WeakRefHashLock?weakRefHashLock;????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{????????this.weakRefHashLock?=?weakRefHashLock;????}????@KafkaListener(topics?=?"TOPIC_INSERT")????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結束",?id);????}}日志(代碼中已模擬必現(xiàn)消費順序異常的場景):接收到update ::1消費順序異常,將update數(shù)據(jù)?1?加入緩存接收到insert ::1開始處理?1?的insert開始處理update::1處理update::1 結束處理?1?的insert?結束觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">日志(代碼中已模擬必現(xiàn)消費順序異常的場景):

        ?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();????//?數(shù)據(jù)存儲????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();????private?WeakRefHashLock?weakRefHashLock;????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{????????this.weakRefHashLock?=?weakRefHashLock;????}????@KafkaListener(topics?=?"TOPIC_INSERT")????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結束",?id);????}}日志(代碼中已模擬必現(xiàn)消費順序異常的場景):接收到update ::1消費順序異常,將update數(shù)據(jù)?1?加入緩存接收到insert ::1開始處理?1?的insert開始處理update::1處理update::1 結束處理?1?的insert?結束觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2">接收到update ::1
        消費順序異常,將update數(shù)據(jù)?1?加入緩存
        接收到insert ::1
        開始處理?1?的insert
        開始處理update::1
        處理update::1 結束
        處理?1?的insert?結束

        ?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();????//?數(shù)據(jù)存儲????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();????private?WeakRefHashLock?weakRefHashLock;????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{????????this.weakRefHashLock?=?weakRefHashLock;????}????@KafkaListener(topics?=?"TOPIC_INSERT")????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結束",?id);????}}日志(代碼中已模擬必現(xiàn)消費順序異常的場景):接收到update ::1消費順序異常,將update數(shù)據(jù)?1?加入緩存接收到insert ::1開始處理?1?的insert開始處理update::1處理update::1 結束處理?1?的insert?結束觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" style="color: rgb(58, 58, 58);" data-linktype="2">觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。

        版權聲明:本文為CSDN博主「方片龍」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。原文鏈接:https://blog.csdn.net/qq_38245668/article/details/105900011









        Spring Cloud 爆高危漏洞,趕緊修復!
        2021 年發(fā)生的 10 件技術大事??!
        23 種設計模式實戰(zhàn)(很全)
        Spring Boot 保護敏感配置的 4 種方法!
        再見單身狗!Java 創(chuàng)建對象的 6 種方式
        阿里為什么推薦使用 LongAdder?
        重磅官宣:Redis 對象映射框架來了??!
        別再寫爆爆爆炸類了,試試裝飾器模式!
        程序員精通各種技術體系,45歲求職難!
        Spring Boot 3.0 M1 發(fā)布,正式棄用 Java 8
        Spring Boot 學習筆記,這個太全了!



        關注Java技術??锤喔韶?/strong>



        獲取 Spring Boot 實戰(zhàn)筆記!
        瀏覽 57
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 特级黄色小说 | 欧美午夜片欧美片在线观看 | 精品国产乱码久久久久久牛牛 | 国产人兽在线 | 蜜臀久久99精品久久久酒店 |