Flink新增特性 | CDC(Change Data Capture) 原理和實(shí)踐應(yīng)用
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

使用flink sql進(jìn)行數(shù)據(jù)同步,可以將數(shù)據(jù)從一個(gè)數(shù)據(jù)同步到其他的地方,比如mysql、elasticsearch等。
可以在源數(shù)據(jù)庫(kù)上實(shí)時(shí)的物化一個(gè)聚合視圖
因?yàn)橹皇窃隽客剑钥梢詫?shí)時(shí)的低延遲的同步數(shù)據(jù)
使用EventTime join 一個(gè)temporal表以便可以獲取準(zhǔn)確的結(jié)果

數(shù)據(jù)庫(kù)之間的增量數(shù)據(jù)同步
審計(jì)日志
數(shù)據(jù)庫(kù)之上的實(shí)時(shí)物化視圖
基于CDC的維表join
…
Flink? CDC使用方式
目前Flink支持兩種內(nèi)置的connector,PostgreSQL和mysql,接下來我們以mysql為例。


使用這種架構(gòu)是好處有:
減少canal和kafka的維護(hù)成本,鏈路更短,延遲更低
flink提供了exactly once語義
可以從指定position讀取
去掉了kafka,減少了消息的存儲(chǔ)成本
<dependency><groupId>com.alibaba.ververicagroupId><artifactId>flink-connector-mysql-cdcartifactId><version>1.1.0version>dependency>
如果是sql客戶端使用,需要下載?flink-sql-connector-mysql-cdc-1.1.0.jar?并且放到
連接mysql數(shù)據(jù)庫(kù)的示例sql如下:
-- creates a mysql cdc table sourceCREATE TABLE mysql_binlog (id INT NOT NULL,name STRING,description STRING,weight DECIMAL(10,3)) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'inventory','table-name' = 'products');
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;public class MySqlBinlogSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("inventory") // monitor all tables under inventory database.username("flinkuser").password("flinkpw").deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute();}}
<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka_2.11artifactId><version>1.11.0version>dependency>
CREATE TABLE topic_products (id BIGINT,name STRING,description STRING,weight DECIMAL(10, 2)) WITH ('connector' = 'kafka','topic' = 'products_binlog','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'canal-json' -- using canal-json as the format)
<dependency><groupId>com.alibaba.ververicagroupId><artifactId>flink-format-changelog-jsonartifactId><version>1.0.0version>dependency>
-- assuming we have a user_behavior logsCREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3)) WITH ('connector' = 'kafka', -- using kafka connector'topic' = 'user_behavior', -- kafka topic'scan.startup.mode' = 'earliest-offset', -- reading from the beginning'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address'format' = 'json' -- the data format is json);-- we want to store the the UV aggregation result in kafka using changelog-json formatcreate table day_uv (day_str STRING,uv BIGINT) WITH ('connector' = 'kafka','topic' = 'day_uv','scan.startup.mode' = 'earliest-offset', -- reading from the beginning'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address'format' = 'changelog-json' -- the data format is json);-- write the UV results into kafka using changelog-json formatINSERT INTO day_uvSELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uvFROM user_behaviorGROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');-- reading the changelog back againSELECT * FROM day_uv;

版權(quán)聲明:
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??
評(píng)論
圖片
表情




