1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        如何監(jiān)控MySQL的binlog?

        共 7839字,需瀏覽 16分鐘

         ·

        2021-08-05 04:21

        最近在工作中,遇到了這樣一個業(yè)務場景,我們需要關注一個業(yè)務系統(tǒng)數(shù)據(jù)庫中某幾張表的數(shù)據(jù),當數(shù)據(jù)發(fā)生新增或修改時,將它同步到另一個業(yè)務系統(tǒng)數(shù)據(jù)庫中的表中。

        一提到數(shù)據(jù)庫的同步,估計大家第一時間想到的就是基于binlog的主從復制了,但是放在我們的場景中,還有幾個問題:

        • 第一,并不是需要復制所有表的數(shù)據(jù),復制對象只有少量的幾張表
        • 第二,也是比較麻煩的,兩個業(yè)務系統(tǒng)數(shù)據(jù)庫表結構可能不一致。例如,要同步數(shù)據(jù)庫1的A表中的某些字段到數(shù)據(jù)庫2的B表中,在這一過程中,A表和B表的字段并不是完全相同

        這樣的話,我們只能通過代碼的方式,首先獲取到數(shù)據(jù)庫1表中數(shù)據(jù)的變動,再通過手動映射的方式,插入到數(shù)據(jù)庫2的表中。但是,獲取變動數(shù)據(jù)的這一過程,還是離不開binlog,因此我們就需要在代碼中對binlog進行一下監(jiān)控。

        先說結論,我們最終使用了一個開源工具mysql-binlog-connector-java,用來監(jiān)控binlog變化并獲取數(shù)據(jù),獲取數(shù)據(jù)后再手動插入到另一個庫的表中,基于它來實現(xiàn)了數(shù)據(jù)的同步。這個工具的git項目地址如下:

        https://github.com/shyiko/mysql-binlog-connector-java

        在正式開始前,還是先簡單介紹一下mysqlbinlog,binlog是一個二進制文件,它保存在磁盤中,是用來記錄數(shù)據(jù)庫表結構變更、表數(shù)據(jù)修改的二進制日志。其實除了數(shù)據(jù)復制外,它還可以實現(xiàn)數(shù)據(jù)恢復、增量備份等功能。

        啟動項目前,首先需要確保mysql服務已經(jīng)啟用了binlog

        show variables like 'log_bin';

        如果為值為OFF,表示沒有啟用,那么需要首先啟用binlog,修改配置文件:

        log_bin=mysql-bin
        binlog-format=ROW
        server-id=1

        對參數(shù)做一個簡要說明:

        • 在配置文件中加入了log_bin配置項后,表示啟用了binlog
        • binlog-formatbinlog的日志格式,支持三種類型,分別是STATEMENTROW、MIXED,我們在這里使用ROW模式
        • server-id用于標識一個sql語句是從哪一個server寫入的,這里一定要進行設置,否則我們在后面的代碼中會無法正常監(jiān)聽到事件

        在更改完配置文件后,重啟mysql服務。再次查看是否啟用binlog,返回為ON,表示已經(jīng)開啟成功。

        在Java項目中,首先引入maven坐標:

        <dependency>
            <groupId>com.github.shyiko</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.21.0</version>
        </dependency>

        寫一段簡單的示例,看看它的具體使用方式:

        public static void main(String[] args) {
            BinaryLogClient client = new BinaryLogClient("127.0.0.1"3306"hydra""123456");
            client.setServerId(2);

            client.registerEventListener(event -> {
                EventData data = event.getData();
                if (data instanceof TableMapEventData) {
                    System.out.println("Table:");
                    TableMapEventData tableMapEventData = (TableMapEventData) data;
                    System.out.println(tableMapEventData.getTableId()+": ["+tableMapEventData.getDatabase() + "-" + tableMapEventData.getTable()+"]");
                }
                if (data instanceof UpdateRowsEventData) {
                    System.out.println("Update:");
                    System.out.println(data.toString());
                } else if (data instanceof WriteRowsEventData) {
                    System.out.println("Insert:");
                    System.out.println(data.toString());
                } else if (data instanceof DeleteRowsEventData) {
                    System.out.println("Delete:");
                    System.out.println(data.toString());
                }
            });

            try {
                client.connect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        首先,創(chuàng)建一個BinaryLogClient客戶端對象,初始化時需要傳入mysql的連接信息,創(chuàng)建完成后,給客戶端注冊一個監(jiān)聽器,來實現(xiàn)它對binlog的監(jiān)聽和解析。在監(jiān)聽器中,我們暫時只對4種類型的事件數(shù)據(jù)進行了處理,除了WriteRowsEventData、DeleteRowsEventDataUpdateRowsEventData對應增刪改操作類型的事件數(shù)據(jù)外,還有一個TableMapEventData類型的數(shù)據(jù),包含了表的對應關系,在后面的例子中再具體說明。

        在這里,客戶端監(jiān)聽到的是數(shù)據(jù)庫級別的所有事件,并且可以監(jiān)聽到表的DML語句和DDL語句,所以我們只需要處理我們關心的事件數(shù)據(jù)就行,否則會收到大量的冗余數(shù)據(jù)。

        啟動程序,控制臺輸出:

        com.github.shyiko.mysql.binlog.BinaryLogClient openChannelToBinaryLogStream
        信息: Connected to 127.0.0.1:3306 at mysql-bin.000002/1046 (sid:2, cid:10)

        連接mysql的binlog成功,接下來,我們在數(shù)據(jù)庫中插入一條數(shù)據(jù),這里操作的數(shù)據(jù)庫名字是tenant,表是dept

        insert into dept VALUES(8,"人力","","1");

        這時,控制臺就會打印監(jiān)聽到事件的數(shù)據(jù):

        Table:
        108: [tenant-dept]
        Insert:
        WriteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[
        [8, 人力, , 1]
        ]}

        我們監(jiān)聽到的事件類型數(shù)據(jù)有兩類,第一類是TableMapEventData,通過它可以獲取操作的數(shù)據(jù)庫名稱、表名稱以及表的id。之所以我們要監(jiān)聽這個事件,是因為之后監(jiān)聽的實際操作中返回數(shù)據(jù)中包含了表的id,而沒有表名等信息,所以如果我們想知道具體的操作是在哪一張表的話,就要先維護一個id與表的對應關系。

        第二個打印出來的監(jiān)聽事件數(shù)據(jù)是WriteRowsEventData,其中記錄了insert語句作用的表,插入涉及到的列,以及實際插入的數(shù)據(jù)。另外,如果我們只需要對特定的一張或幾張表進行處理的話,也可以提前設置表的名單,在這里根據(jù)表id到表名的映射關系,實現(xiàn)數(shù)據(jù)的過濾,

        接下來,我們再執(zhí)行一條update語句:

        update dept set tenant_id=3 where id=8 or id=9

        控制臺輸出:

        Table:
        108: [tenant-dept]
        Update:
        UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, rows=[
        {before=[8, 人力, , 1], after=[8, 人力, , 3]},
        {before=[9, 人力, , 1], after=[9, 人力, , 3]}
        ]}

        在執(zhí)行update語句時,可能會作用于多條數(shù)據(jù),因此在實際修改的數(shù)據(jù)中,可能包含多行記錄,這一點體現(xiàn)在上面的rows中,包含了id為8和9的兩條數(shù)據(jù)。

        最后,再執(zhí)行一條delete語句:

        delete from dept where tenant_id=3

        控制臺打印如下,rows中同樣返回了生效的兩條數(shù)據(jù):

        Table:
        108: [tenant-dept]
        Delete:
        DeleteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[
        [8, 人力, , 3],
        [9, 人力, , 3]
        ]}

        簡單的使用原理介紹完成后,再回到我們原先的需求上,需要將一張表中新增或修改的數(shù)據(jù)同步到另一張表中,問題還有一個,就是如何將返回的數(shù)據(jù)對應到所在的列上。這時應該怎么實現(xiàn)呢?

        update操作為例,我們要對提取的數(shù)據(jù)后進行一下處理,更改上面例子中的方法:

        if (data instanceof UpdateRowsEventData) {
            System.out.println("Update:");
            UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
            for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
                List<Serializable> entries = Arrays.asList(row.getValue());
                System.out.println(entries);
                JSONObject dataObject = getDataObject(entries);
                System.out.println(dataObject);
            }
        }

        在將data強制轉換為UpdateRowsEventData后,可以使用getRows方法獲取到更新的行數(shù)據(jù),并且能夠取到每一列的值。

        之后,調(diào)用了一個自己實現(xiàn)的getDataObject方法,用它來實現(xiàn)數(shù)據(jù)到列的綁定過程:

        private static JSONObject getDataObject(List message) {
            JSONObject resultObject = new JSONObject();
            String format = "{\"id\":\"0\",\"dept_name\":\"1\",\"comment\":\"2\",\"tenant_id\":\"3\"}";
            JSONObject json = JSON.parseObject(format);
            for (String key : json.keySet()) {
                resultObject.put(key, message.get(json.getInteger(key)));
            }
            return resultObject;
        }

        format字符串中,提前維護了一個數(shù)據(jù)庫表的字段順序的字符串,標識了每個字段位于順序中的第幾個位置。通過上面這個函數(shù),能夠實現(xiàn)數(shù)據(jù)到列的填裝過程,我們再執(zhí)行一條update語句來查看一下結果:

        update dept set tenant_id=3,comment="1" where id=8

        控制臺打印結果如下:

        Table:
        108: [tenant-dept]
        Update:
        [8, 人力, 1, 3]
        {"tenant_id":3,"dept_name":"人力","comment":"1","id":8}

        可以看到,將修改后的這一條記錄中的屬性填裝到了它對應的列中,之后我們再根據(jù)具體的業(yè)務邏輯,就可以根據(jù)字段名取出數(shù)據(jù),將數(shù)據(jù)同步到其他的表了。

        < END >

        也許你還想看
          | Java領域的又一神書!周志明老師YYDS!
          | 我常用的20+個學習編程的網(wǎng)站!蕪湖起飛!
          | 想去讀個研究生了
          | 7年前,24歲,出版了一本 Redis 神書
          | 京東二面:為什么需要分布式ID?你項目中是怎么做的?

        我是 Guide哥,一個工作2年有余,接觸編程已經(jīng)6年有余的程序員。大三開源 JavaGuide,目前已經(jīng) 100k+ Star。未來幾年,希望持續(xù)完善 JavaGuide,爭取能夠幫助更多學習 Java 的小伙伴!共勉!凎!點擊即可了解我的個人經(jīng)歷

        歡迎點贊分享。咱們下期再會!

        瀏覽 38
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            丰满大乳伦理少妇 | 特级西西www大胆无码 | 国内特级毛片 | 依依成人网站 | 特黄A级毛片 | 嗯~少妇我弄的你舒服吗 | 亚洲大屄干欧美大屄 | 亚洲国产又黄又爽女人高潮的 | 男女生互操| 成人免费A级毛片免费看无码 |