Debezium Server Databend support Auto Schema Evolution
背景
Debezium Server Databend 是一個(gè)基于 Debezium Engine 自研的輕量級(jí) CDC 項(xiàng)目,用于實(shí)時(shí)捕獲數(shù)據(jù)庫更改并將其作為事件流傳遞最終將數(shù)據(jù)寫入目標(biāo)數(shù)據(jù)庫 Databend。它提供了一種簡(jiǎn)單的方式來監(jiān)視和捕獲關(guān)系型數(shù)據(jù)庫的變化,并支持將這些變化轉(zhuǎn)換為可消費(fèi)事件。使用 Debezium server databend 實(shí)現(xiàn) CDC 無須依賴大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一個(gè)啟動(dòng)腳本即可開啟實(shí)時(shí)數(shù)據(jù)同步。
CDC 過程中的表 Schema 變更處理是上游數(shù)據(jù)庫中十分常見的用戶場(chǎng)景,也是數(shù)據(jù)同步框架實(shí)現(xiàn)的難點(diǎn)。針對(duì)該場(chǎng)景,Debezium-server-databend 0.3.0 引入了 Auto Schema Evolution 的能力,在每一批次的數(shù)據(jù)中協(xié)調(diào)并控制作業(yè)拓?fù)渲械?schema 變更事件處理。
實(shí)現(xiàn)過程
Debezium Server Databend 實(shí)現(xiàn) Auto Schema Evolution 功能的原理大致是:
首先在配置文件中新增 debezium.sink.databend.schema.evolution 的配置來控制是否開啟自動(dòng)同步表結(jié)構(gòu)變更的功能,默認(rèn)為 false 不開啟該功能。
當(dāng)上游數(shù)據(jù)源發(fā)生 schema 變更時(shí),先將流水線中已經(jīng)讀出的的數(shù)據(jù)全部刷出以保證進(jìn)入數(shù)據(jù)流的這一批 schema 的一致性。
然后先將該類 schemachangekey 事件暫存到 schemaEvolutionEvents 的 ArrayList 中。
List<DatabendChangeEvent> schemaEvolutionEvents = new ArrayList<>();
for (DatabendChangeEvent event : events) {
if (DatabendUtil.isSchemaChanged(event.schema()) && isSchemaEvolutionEnabled) {
schemaEvolutionEvents.add(event);
}
}
先將上面刷出的數(shù)據(jù)執(zhí)行寫入操作,寫入這批數(shù)據(jù)之后且解析 schema 變更的事件之前的時(shí)間里不會(huì)有新的數(shù)據(jù)進(jìn)來。數(shù)據(jù)處理完后再去解析 schema change events,如果事件類型屬于 DDL 并且為 alter table 語句,就對(duì)目標(biāo) database.table 執(zhí)行該 DDL。
// handle schema evolution
try {
schemaEvolution(table, schemaEvolutionEvents);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
public void schemaEvolution(RelationalTable table, List<DatabendChangeEvent> events) {
for (DatabendChangeEvent event : events) {
Map<String, Object> values = event.valueAsMap();
for (Map.Entry<String, Object> entry : values.entrySet()) {
if (entry.getKey().contains("ddl") && entry.getValue().toString().toLowerCase().contains("alter table")) {
String tableName = getFirstWordAfterAlterTable(entry.getValue().toString());
String ddlSql = replaceFirstWordAfterTable(entry.getValue().toString(), table.databaseName + "." + tableName);
try (PreparedStatement statement = connection.prepareStatement(ddlSql)) {
System.out.println(ddlSql);
statement.execute(ddlSql);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage());
}
}
}
}
}
當(dāng) schema 變更事件處理成功后,會(huì)繼續(xù)新的數(shù)據(jù)同步流程。
基本的處理流程如下圖所示:
實(shí)踐&演示
Debezium Server Databend
-
Clone 項(xiàng)目:
git clone ``https://github.com/databendcloud/debezium-server-databend.git -
從項(xiàng)目根目錄開始:
debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true
debezium.sink.databend.schema.evolution=true // Enable Auto Schema Evolution
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN- 使用提供的腳本運(yùn)行服務(wù):
bash run.sh - Debezium Server with Databend 將會(huì)啟動(dòng)
- 構(gòu)建和打包 debezium server:
mvn -Passembly -Dmaven.test.skip package - 構(gòu)建完成后,解壓服務(wù)器分發(fā)包:
unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist - 進(jìn)入解壓后的文件夾:
cd databendDist - 創(chuàng)建
application.properties文件并修改:nano conf/application.properties,將下面的 application.properties 拷貝進(jìn)去,根據(jù)用戶實(shí)際情況修改相應(yīng)的配置。
- 使用提供的腳本運(yùn)行服務(wù):
Mysql 準(zhǔn)備表和數(shù)據(jù)
創(chuàng)建數(shù)據(jù)庫 mydb 和表 products,并插入數(shù)據(jù):
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");
Databend Cloud 中創(chuàng)建 Database
create database debezium
NOTE: 用戶可以不必先在 Databend 中創(chuàng)建表,系統(tǒng)檢測(cè)到后會(huì)自動(dòng)為用戶建表。
啟動(dòng) Debezium Server Databend
bash run.sh
首次啟動(dòng)會(huì)進(jìn)入 init snapshot 模式,通過配置的 Batch Size 全量將 MySQL 中的數(shù)據(jù)同步到 Databend,所以在 Databend 中可以看到 MySQL 中的數(shù)據(jù)已經(jīng)同步過來了:
改變 Mysql 表結(jié)構(gòu)
alter table products add columm a int;
在 products 表中新增一列 a int,由于我們已經(jīng)在配置文件中使用 debezium.sink.databend.schema.evolution=true 開啟了表結(jié)構(gòu)自動(dòng)同步所以在 Databend Cloud 中也可以看到目標(biāo)表的結(jié)構(gòu)也隨之變更了:
此時(shí)在 mysql 中插入數(shù)據(jù),新的數(shù)據(jù)就會(huì)以新的 Schema 形式寫入目標(biāo)表:
結(jié)論
Debezium Server Databend 在支持 Auto Schema Evolution 之后,用戶無需在數(shù)據(jù)源發(fā)生 schema 變更時(shí)手動(dòng)介入,大大降低用戶的運(yùn)維成本;只需對(duì)同步任務(wù)進(jìn)行簡(jiǎn)單配置即可將多表、多個(gè)數(shù)據(jù)庫同步至下游,提高了數(shù)據(jù)同步的效率并且降低了用戶的開發(fā)難度。
?
