clickhouse使用心得
clickhouse目前用在實(shí)時(shí)BI后臺,只要數(shù)據(jù)穩(wěn)定落庫了,出報(bào)表很快,臨時(shí)查詢也很快,在使用過程中,對它的一些優(yōu)點(diǎn)和不足也是深有體會,這里總結(jié)一下,不能做到面面俱到,但盡可能詳細(xì)的介紹實(shí)際應(yīng)用需要注意的問題和應(yīng)用技巧。
我們是通過編寫Flink程序,消費(fèi)kafka數(shù)據(jù),將數(shù)據(jù)清洗,擴(kuò)充維度,然后落在clickhouse里面,半年以來,F(xiàn)link程序很少出問題,數(shù)據(jù)落庫也很穩(wěn)定。對于clickhouse,使用的是騰訊云的clickhouse服務(wù),有副本的集群,中間擴(kuò)充了幾次磁盤,服務(wù)也是挺穩(wěn)定的,整體看來,整個(gè)BI后臺,都能穩(wěn)定的提供數(shù)據(jù)報(bào)表。為了書寫方便,接下來clickhouse用ck縮寫。
ck里面引用mysql外部數(shù)據(jù)表
通常需要在ck里面要用mysql里面的表,比如mysql里面存在一張維表,我們需要根據(jù)id查詢出某個(gè)名稱,這個(gè)時(shí)候,不需要把數(shù)據(jù)導(dǎo)一份過來,就可以把mysql表映射到ck里面,或者直接整個(gè)mysql數(shù)據(jù)庫映射到ck某個(gè)庫里面,就能操作mysql這個(gè)數(shù)據(jù)庫所有表,使用sql語法關(guān)聯(lián)查詢mysql和ck的表。
MySQL引擎用于將遠(yuǎn)程的MySQL服務(wù)器中的表映射到ClickHouse中,并允許您對表進(jìn)行INSERT和SELECT查詢,以方便您在ClickHouse與MySQL之間進(jìn)行數(shù)據(jù)交換。
創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MySQL('host:port', ['database' | database], 'user', 'password')
比如,我們在mysql里面創(chuàng)建一張表:
mysql> USE test;
Database changed
mysql> CREATE TABLE `mysql_table` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `float` FLOAT NOT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into mysql_table (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)
mysql> select * from mysql_table;
+------+-----+
| int_id | value |
+------+-----+
| 1 | 2 |
+------+-----+
1 row in set (0,00 sec)
我們?nèi)k里面創(chuàng)建一個(gè)數(shù)據(jù)庫,跟mysql這個(gè)數(shù)據(jù)庫關(guān)聯(lián)起來。
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password')
這樣就在ck里面創(chuàng)建了一個(gè)mysql_db,這個(gè)數(shù)據(jù)庫跟mysql的test數(shù)據(jù)庫是映射在一起了,我們在ck里面直接查詢:
SELECT * FROM mysql_db.mysql_table
┌─int_id─┬─value─┐
│ 1 │ 2 │
└────────┴───────┘
數(shù)據(jù)庫引擎可以是mysql,也可以是其它數(shù)據(jù)庫,比如sqlite、PostgreSQL,更多可以查閱官方文檔:
https://clickhouse.com/docs/zh/engines/database-engines
ck帶副本的分布式表
帶副本的分布式表,就是分布式表,并且單個(gè)part也是有副本的,剛開始我們建表時(shí)候,也是花了一些時(shí)間,回憶下當(dāng)時(shí)的問題主要有以下:
1) 帶副本的分布式表的創(chuàng)建問題,怎么創(chuàng)建?
開始我們也是創(chuàng)建錯(cuò)了,發(fā)現(xiàn)數(shù)據(jù)不完整,每次只有一半,后來得知騰訊云的服務(wù)是帶副本的分布式集群,創(chuàng)建表也需要帶副本的分布式表,不然數(shù)據(jù)有丟失,建表分2步,語句如下:
-- 第一步:創(chuàng)建本地表,這個(gè)表會在每個(gè)機(jī)器節(jié)點(diǎn)上面創(chuàng)建,不要漏了on cluster cluster_name
CREATE TABLE test.table_local on cluster default_cluster
(
`id` Int32,
`uid` String,
`name` String,
`time` Int32,
`create_at` DateTime
)
ENGINE = ReplicatedMergeTree()
PARTITION BY toYYYYMM(create_at)
ORDER BY id;
-- 第二步:創(chuàng)建分布式表
CREATE TABLE test.table_all on cluster default_cluster as test.table_local
ENGINE = Distributed('default_cluster', 'test', 'table_local', sipHash64(id));
參數(shù)說明:
ReplicatedMergeTree:帶副本的表引擎;
PARTITION BY:數(shù)據(jù)分區(qū)方式;
sipHash64(id):分布式表在每個(gè)節(jié)點(diǎn)上面的數(shù)據(jù)分發(fā)方式;
具體可以看官方文檔,地址:
https://clickhouse.com/docs/zh/engines/table-engines/mergetree-family/replication
后文,都已這張表為例。
2)分布式表,插入數(shù)據(jù)要每個(gè)節(jié)點(diǎn)都執(zhí)行插入操作嗎?
不需要,用標(biāo)準(zhǔn)sql語法插入分布式表即可,比如:
insert into test.table_all values(.....)
3)分布式表的更新刪除操作,與mysql相同嗎?
不相同,只能說是相似,按照模板來使用即可,alter table語法:
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr
比如:
alter table test.table_local on cluster default_cluster update name = 'name' where id = 10000
注意:更新操作,需要用本地表test.table_local,不能用分布式表。
刪除操作也是一樣的:
alter table test.table_local on cluster default_cluster delete where id = 10000
4)分布式表,添加列,修改列的類型
-- 添加列
ALTER TABLE test.table_local ON CLUSTER default_cluster ADD COLUMN product String;
-- 修改列
ALTER TABLE test.table_local on cluster default_cluster MODIFY COLUMN uid Int64;
可以看到,ck帶副本的表與標(biāo)準(zhǔn)sql語法的區(qū)別在于使用了alter table和on cluster關(guān)鍵字,使用時(shí)候,套用模板即可。其它的一些DDL操作可以看具體官方文檔:
https://clickhouse.com/docs/zh/sql-reference/statements/alter
寫性能
ck提倡低頻、大批量寫,每秒鐘只寫幾次,每次插入上萬、十萬條數(shù)據(jù),這是比較合適的。因?yàn)槿绻晕⒘私庖幌碌讓釉砭椭?,ck會間隔合并數(shù)據(jù)塊,不宜頻繁寫入導(dǎo)致頻繁合并,影響性能。
在使用Flink導(dǎo)入數(shù)據(jù)的過程中,需要攢數(shù)據(jù),批量寫,我們通過Flink窗口函數(shù)積累數(shù)據(jù),每次寫5秒鐘的一批數(shù)據(jù)。記得剛開始使用ck的時(shí)候,開發(fā)沒注意這些,運(yùn)維就說要批量寫,后來基本就統(tǒng)一了。
添加索引需要注意
ck里面有一級稀疏索引,和二級跳數(shù)索引,二級索引是基于一級索引的,有時(shí)候一張表建完了,寫入數(shù)據(jù),我們發(fā)現(xiàn)查詢需要用到一些字段,需要加索引,語句:
-- 添加索引
alter table test.table_local on cluster default_cluster add index index_uid uid type minmax(100) GRANULARITY 2;
-- 使索引生效,對歷史數(shù)據(jù)也生效索引
ALTER TABLE test.table_local MATERIALIZE index index_uid;
也是用的alter table格式,這里需要注意的是,索引是在插入數(shù)據(jù)時(shí)候建立的,新建索引對歷史數(shù)據(jù)是不生效的,需要讓歷史數(shù)據(jù)也生效。
數(shù)據(jù)去重
ReplacingMergeTree引擎表會刪除排序鍵值相同的重復(fù)項(xiàng),排序鍵值就是建表時(shí)候跟在order by后面的字段。ck對更新不友好,性能很差,于是可以利用這個(gè)引擎,每次只管寫入,不需要更新,ck會自動(dòng)幫我們保存最新版本。建表語句如下:
CREATE TABLE test.test_local on cluster default_cluster (
`id` UInt64,
`type` Int32,
`username` String,
`password` String,
`phone` String COMMENT '手機(jī)號賬戶',
`nick` String,
`mobile` String,
`insert_time` DateTime DEFAULT '2023-07-31 00:00:00'
) ENGINE = ReplicatedReplacingMergeTree()
partition by dt
order by id;
CREATE TABLE test.test_all on cluster default_cluster as test.test_local ENGINE = Distributed('default_cluster', 'test', 'test_local', sipHash64(id));
insert_time字段需要有,放在最后,便于ck根據(jù)時(shí)候保留最新數(shù)據(jù)。
數(shù)據(jù)的去重只會在數(shù)據(jù)合并期間進(jìn)行。合并會在后臺一個(gè)不確定的時(shí)間進(jìn)行,因此你無法預(yù)先作出計(jì)劃。有一些數(shù)據(jù)可能仍未被處理。通常使用OPTIMIZE 語句手動(dòng)觸發(fā),比如今天程序異常停止了,我啟動(dòng)了程序, 大概率會有多個(gè)版本數(shù)據(jù),這個(gè)時(shí)候需要手動(dòng)合并一下:
OPTIMIZE table test.test_local on cluster default_cluster final;
這樣會觸發(fā)數(shù)據(jù)合并,這個(gè)過程耗費(fèi)性能,正常情況下,如果沒有多版本數(shù)據(jù),不需要觸發(fā)合并。如果沒有觸發(fā),查詢數(shù)據(jù)時(shí)候,會有多個(gè)版本,需要final關(guān)鍵字,查詢時(shí)候合并一下,如果查詢很多,將非常耗費(fèi)性能,這個(gè)時(shí)候可以選擇定期合并。
select * from test.test_all final where id = 10000
對于這種多個(gè)版本的表,有時(shí)候也是可以避開final的,比如去重,可以select distinct id from table,而不需要select distinct id from table final,這個(gè)final是可以省的,等等。
分布式表的刪除
需要?jiǎng)h除本地表和分布式表:
drop table test.test_local on cluster default_cluster;
drop table test.test_all on cluster default_cluster;
復(fù)雜數(shù)據(jù)類型map
有些業(yè)務(wù)數(shù)據(jù)某個(gè)字段是json類型,并且key數(shù)量不定,這個(gè)時(shí)候需要將其在ck里面定義為map類型,包容性好。
CREATE TABLE test.test_local2 on cluster default_cluster (
`id` UInt64,
`data` Map(String, String),
) ENGINE = ReplicatedMergeTree()
partition by dt
order by id;
CREATE TABLE test.test_all2 on cluster default_cluster as test.test_local2 ENGINE = Distributed('default_cluster', 'test', 'test_local2', sipHash64(id));
data Map(String, String) :這就定義了一個(gè)map類型字段,查詢時(shí)候可以這樣查:
select data['key'] from test.test_all2 limit 5
對于json,ck也是可以解的。
select JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b')
-- 結(jié)果
'[-100, 200.0, 300]'
select JSONExtractString('{"a": "hello", "b": [-100, 200.0, 300]}', 'a')
-- 結(jié)果
'hello'
更詳細(xì)官方文檔:https://clickhouse.com/docs/zh/sql-reference/functions/json-functions
關(guān)于成本
相比較而言,ck是能節(jié)省成本的,運(yùn)維是這么說的。經(jīng)常擴(kuò)容磁盤,而計(jì)算性能只擴(kuò)容了一次。
