從 0 到 1 為 Databend 設(shè)計(jì)實(shí)現(xiàn)輕量級(jí) CDC
什么是 CDC
CDC(Change Data Capture)是一種數(shù)據(jù)同步技術(shù),用于實(shí)時(shí)捕獲和傳遞數(shù)據(jù)庫中的數(shù)據(jù)更改。通過 CDC,我們可以將數(shù)據(jù)庫中的變更事件捕獲并轉(zhuǎn)換成數(shù)據(jù)流,然后將其傳遞給其他系統(tǒng)或應(yīng)用程序,以實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)同步和分發(fā)。常見的 CDC 格式為:
{
"op": "Update", // "Insert", "Delete",
"event_time": "2023-07-22 12:00:00",
"payload": {
"id": 123,
"author": "Franz Kafka",
"title": "ToDatabend",
"published_at": "2023-07-01"
}
}
一種生產(chǎn)可用的 CDC 系統(tǒng)架構(gòu)可以是下圖:

現(xiàn)階段主流 CDC 方案和架構(gòu)
在目前的技術(shù)發(fā)展中,有幾種主流的CDC方案和架構(gòu):
1. Flink CDC
Flink CDC[1] 是基于 Apache Flink 的 CDC 方案。它可以實(shí)時(shí)捕獲數(shù)據(jù)庫的變更事件,并將其轉(zhuǎn)換成流數(shù)據(jù)。Flink CDC 提供了非常多的連接器組件,可以在異構(gòu)的數(shù)據(jù)庫之間實(shí)現(xiàn)數(shù)據(jù)流動(dòng)。
Databend 也提供了 flink-databend-connector,可以與 MySQL,PG 等 RDBMS 構(gòu)建實(shí)時(shí)數(shù)據(jù)同步。
2. Kafka Connector
Kafka Connector[2] 是基于 Apache Kafka 的 CDC 方案。Kafka 作為分布式消息隊(duì)列,可以用于數(shù)據(jù)傳遞和分發(fā)。Kafka Connector 允許將數(shù)據(jù)從數(shù)據(jù)庫中捕獲,并將其發(fā)布到 Kafka 主題中,供其他系統(tǒng)消費(fèi)。
3. Canal
Canal[3] 是阿里巴巴開源的 CDC 解決方案。它可以捕獲 MySQL 數(shù)據(jù)庫的變更,并將其轉(zhuǎn)換成消息格式輸出,常用于數(shù)據(jù)同步和業(yè)務(wù)解耦。限制是只能基于 MySQL 數(shù)據(jù)庫增量日志解析。
Debezium Server
Debezium Server[4] 是一個(gè)基于 Debezium Engine 的 CDC 項(xiàng)目。Debezium Engine 是 Debezium 項(xiàng)目的核心,用于捕獲數(shù)據(jù)庫的變更事件。Debezium Server 構(gòu)建在該引擎之上,提供了一種輕量級(jí)的 CDC 解決方案,用于實(shí)時(shí)捕獲數(shù)據(jù)庫更改,并將其轉(zhuǎn)換為事件流,最終將數(shù)據(jù)寫入目標(biāo)數(shù)據(jù)庫。
Debezium Server Databend
Debezium Server Databend[5] 是一個(gè)基于 Debezium Engine 自研的輕量級(jí) CDC 項(xiàng)目,用于實(shí)時(shí)捕獲數(shù)據(jù)庫更改并將其作為事件流傳遞最終將數(shù)據(jù)寫入目標(biāo)數(shù)據(jù)庫 Databend。它提供了一種簡(jiǎn)單的方式來監(jiān)視和捕獲關(guān)系型數(shù)據(jù)庫的變化,并支持將這些變化轉(zhuǎn)換為可消費(fèi)事件。使用 Debezium server databend 實(shí)現(xiàn) CDC 無須依賴大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一個(gè)啟動(dòng)腳本即可開啟實(shí)時(shí)數(shù)據(jù)同步。
源碼分析 debezium-server-databend 實(shí)現(xiàn)
Debezium Server Databend 實(shí)現(xiàn)了輕量級(jí)CDC方案,通過 Debezium Engine 捕獲數(shù)據(jù)庫變更事件,將其轉(zhuǎn)換成事件流,然后將事件流傳遞給 Databend 數(shù)據(jù)庫,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)同步。下面先從代碼層面分析一下該組件的實(shí)現(xiàn)原理。
主要代碼的結(jié)構(gòu)是:
.
├── DatabendChangeConsumer.java
├── DatabendChangeEvent.java
├── DatabendTypes.java
├── DatabendUtil.java
├── DebeziumMetrics.java
├── batchsizewait
│ ├── InterfaceBatchSizeWait.java
│ ├── MaxBatchSizeWait.java
│ └── NoBatchSizeWait.java
└── tablewriter
├── AppendTableWriter.java
├── BaseTableWriter.java
├── RelationalTable.java
├── TableNotFoundException.java
├── TableWriterFactory.java
└── UpsertTableWriter.java
Debezium server 的入口邏輯在 DatabendChangeConsumer 中,繼承 BaseChangeConsumer 并實(shí)現(xiàn)相應(yīng)方法, 作用是加載配置,初始化 server, database 以及處理 batch events :
/**
* Implementation of the consumer that delivers the messages to databend database tables.
*
* @author hantmac
*/
@Named("databend")
@Dependent
public class DatabendChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
...// @ConfigProperty(name = "debezium.sink.databend.xxx", defaultValue = "")
void connect() throws Exception {
...
}
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {...}
}
核心代碼是在 handleBatch 中,在這里接收變更事件并發(fā)送到 tablewriter 中進(jìn)一步處理。
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Instant start = Instant.now();
//group events by destination
Map<String, List<DatabendChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
-> {
try {
return new DatabendChangeEvent(e.destination(),
valDeserializer.deserialize(e.destination(), getBytes(e.value())),
e.key() == null ? null : valDeserializer.deserialize(e.destination(), getBytes(e.key())),
mapper.readTree(getBytes(e.value())).get("schema"),
e.key() == null ? null : mapper.readTree(getBytes(e.key())).get("schema")
);
} catch (IOException ex) {
throw new DebeziumException(ex);
}
})
.collect(Collectors.groupingBy(DatabendChangeEvent::destination));
// consume list of events for each destination table
for (Map.Entry<String, List<DatabendChangeEvent>> tableEvents : result.entrySet()) {
RelationalTable tbl = this.getDatabendTable(mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0).schema()); // 獲取 tablewriter 實(shí)例
tableWriter.addToTable(tbl, tableEvents.getValue()); // 將事件推送至 tablewriter
}
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Processed event '{}'", record);
committer.markProcessed(record);
}
committer.markBatchFinished();
this.logConsumerProgress(records.size());
batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
TableWriterFactory中提供了 Append 和 Upsert 兩種模式:
public BaseTableWriter get(final Connection connection) {
if (upsert) {
return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes);
} else {
return new AppendTableWriter(connection, identifierQuoteCharacter.orElse(""));
}
}
每種模式都實(shí)現(xiàn)了 addTable 方法。addTable 的主要邏輯是解析 Debezium Event 中的字段 Name,Type,Value 然后調(diào)用 databend JDBC 將數(shù)據(jù)寫入到目標(biāo)表。Upsert mode相對(duì)復(fù)雜,這里以 upsert 為例來看下源碼:
public void addToTable(final RelationalTable table, final List<DatabendChangeEvent> events) {final String upsertSql = table.preparedUpsertStatement(this.identifierQuoteCharacter);int inserts = 0;List<DatabendChangeEvent> deleteEvents = new ArrayList<>();try (PreparedStatement statement = connection.prepareStatement(upsertSql)) {connection.setAutoCommit(false);for (DatabendChangeEvent event : events) {System.out.println(event.operation());// NOTE: if upsertKeepDeletes = true, delete event data will insert into target tableif (upsertKeepDeletes || !event.operation().equals("d")) {Map<String, Object> values = event.valueAsMap();addParametersToStatement(statement, values, event.keyAsMap());statement.addBatch();} else if (event.operation().equals("d")) {// here use soft delete// if true delete, we can use this condition event.keyAsMap().containsKey(deleteColumn)deleteEvents.add(event);}}int[] batchResult = statement.executeBatch();inserts = Arrays.stream(batchResult).sum();System.out.println(String.format("insert rows %d", inserts));} catch (SQLException e) {e.printStackTrace();throw new RuntimeException(e.getMessage());}// handle delete eventtry {deleteFromTable(table, deleteEvents);} catch (Exception e) {throw new RuntimeException(e.getMessage());}}
Append mode
在 CDC 中,Append Mode 是一種數(shù)據(jù)寫入模式。當(dāng)數(shù)據(jù)庫的一條記錄發(fā)生變化時(shí),CDC 會(huì)將該變化作為一條新的事件追加到事件流中。
Upsert mode
Upsert Mode 是另一種數(shù)據(jù)寫入模式。當(dāng)數(shù)據(jù)庫的一條記錄發(fā)生變化時(shí),CDC 會(huì)將該變化作為一個(gè)更新操作,如果記錄不存在,則作為插入操作,以實(shí)現(xiàn)數(shù)據(jù)的更新和插入。
Upsert mode 用到了 Databend 的 Replace into[6] 語法,所以需要用戶指定一個(gè) conflict key,這里我們提供一個(gè)配置:
debezium.sink.databend.database.primaryKey=id
如果沒有提供該配置,就會(huì)退化成追加模式。
Delete
Delete操作是指數(shù)據(jù)庫中的記錄被刪除,CDC 會(huì)將該操作作為一個(gè)事件寫入事件流,以通知其他系統(tǒng)該記錄已被刪除。
Debezim Server 對(duì) Delete 的處理比較復(fù)雜,在 DELETE 操作下會(huì)生成兩條事件記錄:
-
一個(gè)包含 "op": "d",其他的行數(shù)據(jù)以及字段; -
一個(gè)tombstones記錄,它具有與被刪除行相同的鍵,但值為null。
這兩條事件會(huì)同時(shí)發(fā)出,在 Debezium Server Databend 中我們選擇對(duì) Delete 數(shù)據(jù)實(shí)行軟刪除,這就要求我們?cè)?target table 中擁有 __deleted 字段,當(dāng) Delete 事件過來的時(shí)候我們將該字段置為 TRUE 后插入到目標(biāo)表。
這樣設(shè)計(jì)的好處是,有些用戶最開始想要保留這些數(shù)據(jù),但可能未來會(huì)想到將其刪除,這樣就為用戶提供了可選的方案,未來想要?jiǎng)h除這些數(shù)據(jù)的時(shí)候,只需要 delete from table where __deleted=true 即可。
關(guān)于 Debezium 對(duì)刪除事件的說明以及處理方式,詳情可參考文檔[6]。
使用輕量級(jí) CDC debezium-server-databend 構(gòu)建 MySQL 到 Databend 的 實(shí)時(shí)數(shù)據(jù)同步
下面用一個(gè)實(shí)際案例展示如何基于 Debezium server databend 快速構(gòu)建 MySQL 到 Databend 的實(shí)時(shí)數(shù)據(jù)同步。
準(zhǔn)備階段
準(zhǔn)備一臺(tái)已經(jīng)安裝了 Docker ,docker-compose 以及 Java 11 環(huán)境 的 Linux 或者 MacOS 。
準(zhǔn)備教程所需要的組件
接下來的教程將以 docker-compose 的方式準(zhǔn)備所需要的組件。
debezium-MySQL
docker-compose.yaml
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
Debezium Server Databend
-
Clone 項(xiàng)目: git clone ``https://github.com/databendcloud/debezium-server-databend.git -
從項(xiàng)目根目錄開始: -
構(gòu)建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package -
構(gòu)建完成后,解壓服務(wù)器分發(fā)包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist -
進(jìn)入解壓后的文件夾: cd databendDist -
創(chuàng)建 application.properties文件并修改:nano conf/application.properties,將下面的 application.properties 拷貝進(jìn)去,根據(jù)用戶實(shí)際情況修改相應(yīng)的配置。 -
使用提供的腳本運(yùn)行服務(wù): bash run.sh -
Debezium Server with Databend 將會(huì)啟動(dòng)
同時(shí)我們也提供了相應(yīng)的 Docker image,可以在容器中一鍵啟動(dòng):
version: '2.1'
services:
debezium:
image: ghcr.io/databendcloud/debezium-server-databend:pr-2
ports:
- "8080:8080"
- "8083:8083"
volumes:
- $PWD/conf:/app/conf
- $PWD/data:/app/data
NOTE: 在容器中啟動(dòng)注意所連接數(shù)據(jù)庫的網(wǎng)絡(luò)。
Debezium Server Databend Application Properties
本文章使用下面提供的配置,更多的參數(shù)說明以及配置可以參考文檔[7]。
debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
準(zhǔn)備數(shù)據(jù)
在 MySQL 數(shù)據(jù)庫中準(zhǔn)備數(shù)據(jù)
進(jìn)入 MySQL 容器
docker-compose exec mysql mysql -uroot -p123456
創(chuàng)建數(shù)據(jù)庫 mydb 和表 products,并插入數(shù)據(jù):
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");
在 Databend 中創(chuàng)建 Database
NOTE: 用戶可以不必先在 Databend 中創(chuàng)建表,系統(tǒng)檢測(cè)到后會(huì)自動(dòng)為用戶建表。
啟動(dòng) Debezium Server Databend
bash run.sh
首次啟動(dòng)會(huì)進(jìn)入 init snapshot 模式,通過配置的 Batch Size 全量將 MySQL 中的數(shù)據(jù)同步到 Databend,所以在 Databend 中可以看到 MySQL 中的數(shù)據(jù)已經(jīng)同步過來了:
同步 Insert 數(shù)據(jù)
我們繼續(xù)往 MySQL 中插入 5 條數(shù)據(jù):
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");
Debezium server databend 日志:
同時(shí)在 Databend 中可以查到 5 條數(shù)據(jù)已經(jīng)同步過來了:
同步 Update 數(shù)據(jù)
配置文件中 debezium.sink.databend.upsert=true ,所以我們也可以處理 Update/Delete 的事件。
在 MySQL 中更新 id=10 的數(shù)據(jù):
update products set name="from debezium" where id=10;
在 Databend 中可以查到 id 為 10 的數(shù)據(jù)已經(jīng)被更新:
同步 Delete 數(shù)據(jù)
在配置文件中,有以下的配置,既可開啟處理 Delete 事件的能力:
debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
在 MySQL 中刪除 id=12 的數(shù)據(jù):
delete from products where id=12;
在 Databend 中可以觀察到 id=12 的值的 __deleted 字段已經(jīng)被置為 true。
環(huán)境清理
操作結(jié)束后,在 docker-compose.yml 文件所在的目錄下執(zhí)行如下命令停止所有容器:
docker-compose down
結(jié)論
文章介紹了 databend 的輕量級(jí) CDC 實(shí)現(xiàn)原理,演示了基于輕量級(jí) CDC debezium server databend 構(gòu)建 MySQL 到 Databend 的 實(shí)時(shí)數(shù)據(jù)同步的全部過程,這種方式不需要依賴 Flink, Kafka 等大型組件,啟動(dòng)和管理非常方便。
參考資料
Flink CDC: https://ververica.github.io/flink-cdc-connectors/release-2.1/content/about.html
[2]Kafka Connector: https://docs.confluent.io/platform/current/connect/index.html
[3]Canal: https://github.com/alibaba/canal
[4]Debezium Server: https://debezium.io/documentation/reference/1.6/overview.html
[5]Debezium Server Databend: https://github.com/databendcloud/debezium-server-databend
[6]replace into: https://databend.rs/doc/sql-commands/dml/dml-replace
[7]Debezium Event Flattening: https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
[8]debezium-server-databend 配置文檔: https://github.com/databendcloud/debezium-server-databend/blob/main/docs/docs.md
