1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Debezium Server Databend support Auto Schema Evolution

        共 9205字,需瀏覽 19分鐘

         ·

        2024-04-10 16:23

        背景

        Debezium Server Databend 是一個(gè)基于 Debezium Engine 自研的輕量級(jí) CDC 項(xiàng)目,用于實(shí)時(shí)捕獲數(shù)據(jù)庫更改并將其作為事件流傳遞最終將數(shù)據(jù)寫入目標(biāo)數(shù)據(jù)庫 Databend。它提供了一種簡(jiǎn)單的方式來監(jiān)視和捕獲關(guān)系型數(shù)據(jù)庫的變化,并支持將這些變化轉(zhuǎn)換為可消費(fèi)事件。使用 Debezium server databend 實(shí)現(xiàn) CDC 無須依賴大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一個(gè)啟動(dòng)腳本即可開啟實(shí)時(shí)數(shù)據(jù)同步。

        CDC 過程中的表 Schema 變更處理是上游數(shù)據(jù)庫中十分常見的用戶場(chǎng)景,也是數(shù)據(jù)同步框架實(shí)現(xiàn)的難點(diǎn)。針對(duì)該場(chǎng)景,Debezium-server-databend 0.3.0 引入了 Auto Schema Evolution 的能力,在每一批次的數(shù)據(jù)中協(xié)調(diào)并控制作業(yè)拓?fù)渲械?schema 變更事件處理。

        實(shí)現(xiàn)過程

        Debezium Server Databend 實(shí)現(xiàn) Auto Schema Evolution 功能的原理大致是:

        首先在配置文件中新增 debezium.sink.databend.schema.evolution 的配置來控制是否開啟自動(dòng)同步表結(jié)構(gòu)變更的功能,默認(rèn)為 false 不開啟該功能。

        當(dāng)上游數(shù)據(jù)源發(fā)生 schema 變更時(shí),先將流水線中已經(jīng)讀出的的數(shù)據(jù)全部刷出以保證進(jìn)入數(shù)據(jù)流的這一批 schema 的一致性。

        然后先將該類 schemachangekey 事件暫存到 schemaEvolutionEvents 的 ArrayList 中。

              
                List<DatabendChangeEvent> schemaEvolutionEvents = new ArrayList<>();
            for (DatabendChangeEvent event : events) {
             if (DatabendUtil.isSchemaChanged(event.schema()) && isSchemaEvolutionEnabled) {
                       schemaEvolutionEvents.add(event);
                   }
              }

        先將上面刷出的數(shù)據(jù)執(zhí)行寫入操作,寫入這批數(shù)據(jù)之后且解析 schema 變更的事件之前的時(shí)間里不會(huì)有新的數(shù)據(jù)進(jìn)來。數(shù)據(jù)處理完后再去解析 schema change events,如果事件類型屬于 DDL 并且為 alter table 語句,就對(duì)目標(biāo) database.table 執(zhí)行該 DDL。

              
                // handle schema evolution
                try {
                    schemaEvolution(table, schemaEvolutionEvents);
                } catch (Exception e) {
                    throw new RuntimeException(e.getMessage());
                }
              
                 public void schemaEvolution(RelationalTable table, List<DatabendChangeEvent> events) {
                for (DatabendChangeEvent event : events) {
                    Map<String, Object> values = event.valueAsMap();
                    for (Map.Entry<String, Object> entry : values.entrySet()) {
                        if (entry.getKey().contains("ddl") && entry.getValue().toString().toLowerCase().contains("alter table")) {
                            String tableName = getFirstWordAfterAlterTable(entry.getValue().toString());
                            String ddlSql = replaceFirstWordAfterTable(entry.getValue().toString(), table.databaseName + "." + tableName);
                            try (PreparedStatement statement = connection.prepareStatement(ddlSql)) {
                                System.out.println(ddlSql);
                                statement.execute(ddlSql);
                            } catch (SQLException e) {
                                throw new RuntimeException(e.getMessage());
                            }
                        }
                    }
                }
            }

        當(dāng) schema 變更事件處理成功后,會(huì)繼續(xù)新的數(shù)據(jù)同步流程。

        基本的處理流程如下圖所示:

        e4fa793d29b9fdf635b9365cecc78b16.webp

        實(shí)踐&演示

        Debezium Server Databend

        • Clone 項(xiàng)目: git clone ``https://github.com/databendcloud/debezium-server-databend.git

        • 從項(xiàng)目根目錄開始:

                      
                      debezium.sink.type=databend
          debezium.sink.databend.upsert=true
          debezium.sink.databend.upsert-keep-deletes=false
          debezium.sink.databend.database.databaseName=debezium
          debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
          debezium.sink.databend.database.username=cloudapp
          debezium.sink.databend.database.password=password
          debezium.sink.databend.database.primaryKey=id
          debezium.sink.databend.database.tableName=products
          debezium.sink.databend.database.param.ssl=true
          debezium.sink.databend.schema.evolution=true // Enable Auto Schema Evolution

          # enable event schemas
          debezium.format.value.schemas.enable=true
          debezium.format.key.schemas.enable=true
          debezium.format.value=json
          debezium.format.key=json

          # mysql source
          debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
          debezium.source.offset.storage.file.filename=data/offsets.dat
          debezium.source.offset.flush.interval.ms=60000

          debezium.source.database.hostname=127.0.0.1
          debezium.source.database.port=3306
          debezium.source.database.user=root
          debezium.source.database.password=123456
          debezium.source.database.dbname=mydb
          debezium.source.database.server.name=from_mysql
          debezium.source.include.schema.changes=false
          debezium.source.table.include.list=mydb.products
          # debezium.source.database.ssl.mode=required
          # Run without Kafka, use local file to store checkpoints
          debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
          debezium.source.database.history.file.filename=data/status.dat
          do event flattening. unwrap message!
          debezium.transforms=unwrap
          debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
          debezium.transforms.unwrap.delete.handling.mode=rewrite
          debezium.transforms.unwrap.drop.tombstones=true

          # ############ SET LOG LEVELS ############
          quarkus.log.level=INFO
          # Ignore messages below warning level from Jetty, because it's a bit verbose
          quarkus.log.category."org.eclipse.jetty".level=WARN
          • 使用提供的腳本運(yùn)行服務(wù): bash run.sh
          • Debezium Server with Databend 將會(huì)啟動(dòng)
          • 構(gòu)建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package
          • 構(gòu)建完成后,解壓服務(wù)器分發(fā)包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
          • 進(jìn)入解壓后的文件夾: cd databendDist
          • 創(chuàng)建 application.properties 文件并修改: nano conf/application.properties,將下面的 application.properties 拷貝進(jìn)去,根據(jù)用戶實(shí)際情況修改相應(yīng)的配置。

        Mysql 準(zhǔn)備表和數(shù)據(jù)

        創(chuàng)建數(shù)據(jù)庫 mydb 和表 products,并插入數(shù)據(jù):

              
              CREATE DATABASE mydb;
        USE mydb;

        CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255NOT NULL,description VARCHAR(512));
        ALTER TABLE products AUTO_INCREMENT = 10;

        INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
        (default,"car battery","12V car battery"),
        (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
        (default,"hammer","12oz carpenter's hammer"),
        (default,"hammer","14oz carpenter's hammer"),
        (default,"hammer","16oz carpenter's hammer"),
        (default,"rocks","box of assorted rocks"),
        (default,"jacket","water resistent black wind breaker"),
        (default,"cloud","test for databend"),
        (default,"spare tire","24 inch spare tire");

        Databend Cloud 中創(chuàng)建 Database

              
              create database debezium

        NOTE: 用戶可以不必先在 Databend 中創(chuàng)建表,系統(tǒng)檢測(cè)到后會(huì)自動(dòng)為用戶建表。

        啟動(dòng) Debezium Server Databend

              
              bash run.sh

        首次啟動(dòng)會(huì)進(jìn)入 init snapshot 模式,通過配置的 Batch Size 全量將 MySQL 中的數(shù)據(jù)同步到 Databend,所以在 Databend 中可以看到 MySQL 中的數(shù)據(jù)已經(jīng)同步過來了:

        改變 Mysql 表結(jié)構(gòu)

              
              alter table products add columm a int;

        在 products 表中新增一列 a int,由于我們已經(jīng)在配置文件中使用 debezium.sink.databend.schema.evolution=true 開啟了表結(jié)構(gòu)自動(dòng)同步所以在 Databend Cloud 中也可以看到目標(biāo)表的結(jié)構(gòu)也隨之變更了:

        09115a790f4b09b55c0ffcafaf0316e3.webp

        此時(shí)在 mysql 中插入數(shù)據(jù),新的數(shù)據(jù)就會(huì)以新的 Schema 形式寫入目標(biāo)表:

        3272105ea13e82dbfc2ac2058c1db8dc.webp

        結(jié)論

        Debezium Server Databend 在支持 Auto Schema Evolution 之后,用戶無需在數(shù)據(jù)源發(fā)生 schema 變更時(shí)手動(dòng)介入,大大降低用戶的運(yùn)維成本;只需對(duì)同步任務(wù)進(jìn)行簡(jiǎn)單配置即可將多表、多個(gè)數(shù)據(jù)庫同步至下游,提高了數(shù)據(jù)同步的效率并且降低了用戶的開發(fā)難度。


        ?

        瀏覽 37
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            91社区成人影院 | 国产h在线 | japanese女王淫脚脚交 | 免费 无码 精品 国产76在线 | 国內毛片 | metart精品白嫩的asspics | 新天堂在线 | 中文字幕 - 【水蜜桃】免费高清视频 | AAAAAAA免费看片看最后的 | 日本欧美在线 |