【Mysql源碼分析】基于行的復(fù)制實現(xiàn)之“主從關(guān)系建立”
作者:c rain
來源:SegmentFault 思否社區(qū)
前言
經(jīng)常聽到別人說Mysql的SBR、RBR、MBR,如果不清楚,那么可以跟著文章一起來學(xué)習(xí)。由于涉及到主從的內(nèi)容比較多,需要拆分成多篇內(nèi)容來概述,這章先從基礎(chǔ)知識和主從關(guān)系建立開始講起。還會出一篇文章詳細講解從主同步。
1.了解什么是SBR、RBR、MBR?
2.了解下主從配置該如何配置?
3.了解主從關(guān)系如何建立?
1.配置Mysql主從
在本文中,分為一主一從。主監(jiān)聽的端口為3306,從監(jiān)聽的端口為3309。
1.1主服務(wù)配置
master配置,配置文件my.cnf:
[mysqld]
port=3306
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data
socket=/tmp/mysql.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2?#表名存儲為給定的大小寫但是比較的時候是小寫的
log_bin=mysql-bin
server_id?=10
主服務(wù)啟動Mysql命令:
#?sudo?bin/mysqld?--defaults-file=/usr/local/mysql8.0.20/etc/my.cnf?--user=root
客戶端連接master端
#?mysql?--socket=/tmp/mysql.sock?-u?root
啟動master后,需要創(chuàng)建一個用戶,用于主從同步:
mysql>?GRANT?REPLICATION?SLAVE?ON?*.*?TO?repl@'localhost'?IDENTIFIED?BY?'123456';
查看主服務(wù)狀態(tài)
mysql>?show?master?status?\G;
1.2 從服務(wù)配置
slave配置,配置文件salve.conf:
[mysqld]
port=3309
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data1
socket=/tmp/mysqlslave.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2?#表名存儲為給定的大小寫但是比較的時候是小寫的
log_bin=mysql-bin
server_id=2
relay_log=/usr/local/mysql8.0.20/mysql-relay-bin
read_only=1?????#執(zhí)行模式
從服務(wù)啟動Mysql命令:
#?sudo?bin/mysqld?--defaults-file=/usr/local/mysql8.0.20/etc/salve.conf?--user=root
連接Slave端:
#?mysql?--socket=/tmp/mysqlslave.sock?-u?root
Slave端主從同步配置:
mysql>
change?master?to?master_host='localhost',?
master_user='repl',
master_password='123456',
master_log_file='mysql-bin.000001',
master_log_pos=0;
Slave/Master
#查看binlog事件
mysql>?SHOW?BINLOG?EVENTS;
如果發(fā)現(xiàn)主從沒有同步,可以使用如下命令查看相應(yīng)的狀態(tài):
#查看slave狀態(tài)
mysql>?show?slave?status;
如果遇到從庫報這個錯誤:Got fatal error 1236 from master when reading data from binary log: 'Could not find first log file name in binary log index file'
Got fatal error 1236 from master when reading data from binary log: 'could not find next log'
此時可以操作如下三個命令進行處理:
#關(guān)閉slave
mysql>?stop?slave;
#重置slave
mysql>?reset?slave;
#啟動slave
mysql>?start?slave;
2.MYSQL中BINLOG_FORMAT的三種模式
mysql三種復(fù)制方式:基于SQL語句的復(fù)制(statement-based replication, SBR),基于行的復(fù)制(row-based replication, RBR),混合模式復(fù)制(mixed-based replication, MBR)。對應(yīng)的,binlog的格式也有三種:STATEMENT,ROW,MIXED。
2.1 STATEMENT模式(SBR)
每一條會修改數(shù)據(jù)的sql語句會記錄到binlog中。優(yōu)點是并不需要記錄每一條sql語句和每一行的數(shù)據(jù)變化,減少了binlog日志量,節(jié)約IO,提高性能。缺點是在某些情況下會導(dǎo)致master-slave中的數(shù)據(jù)不一致(如sleep()函數(shù), last_insert_id(),以及user-defined functions(udf)等會出現(xiàn)問題)
2.2 ROW模式(RBR)
不記錄每條sql語句的上下文信息,僅需記錄哪條數(shù)據(jù)被修改了,修改成什么樣了。而且不會出現(xiàn)某些特定情況下的存儲過程、或function、或trigger的調(diào)用和觸發(fā)無法被正確復(fù)制的問題。缺點是會產(chǎn)生大量的日志,尤其是alter table的時候會讓日志暴漲。
2.3 MIXED模式(MBR)
以上兩種模式的混合使用,一般的復(fù)制使用STATEMENT模式保存binlog,對于STATEMENT模式無法復(fù)制的操作使用ROW模式保存binlog,MySQL會根據(jù)執(zhí)行的SQL語句選擇日志保存方式。
binlog復(fù)制配置在mysql的配置文件my.cnf中,可以通過一下選項配置binglog相關(guān),配置如下:
#binlog日志格式,mysql默認采用statement,建議使用mixed
binlog_format???????????=?MIXED????
#binlog日志文件
log-bin?????????????????=?/data/mysql/mysql-bin.log??
#binlog過期清理時間
expire_logs_days????????=?7????
#binlog每個日志文件大小
max_binlog_size?????????=?100m??????????????????
#binlog緩存大小
binlog_cache_size???????=?4m????????????????????
#最大binlog緩存大小
max_binlog_cache_size???=?512m
2.4 優(yōu)缺點對比
對于執(zhí)行的SQL語句中包含now()這樣的時間函數(shù),會在日志中產(chǎn)生對應(yīng)的unix_timestamp()*1000的時間字符串,slave在完成同步時,取用的是sqlEvent發(fā)生的時間來保證數(shù)據(jù)的準確性。另外對于一些功能性函數(shù)slave能完成相應(yīng)的數(shù)據(jù)同步,而對于上面指定的一些類似于UDF函數(shù),導(dǎo)致Slave無法知曉的情況,則會采用ROW格式存儲這些Binlog,以保證產(chǎn)生的Binlog可以供Slave完成數(shù)據(jù)同步。
現(xiàn)在來比較以下 SBR 和 RBR 這2種模式各自的優(yōu)缺點:
SBR 的優(yōu)點:
技術(shù)比較成熟 binlog文件較小 binlog中包含了所有數(shù)據(jù)庫更改信息,可以據(jù)此來審核數(shù)據(jù)庫的安全等情況 binlog可以用于實時的還原,而不僅僅用于復(fù)制 主從版本可以不一樣,從服務(wù)器版本可以比主服務(wù)器版本高
SBR 的缺點:
不是所有的UPDATE語句都能被復(fù)制,尤其是包含不確定操作的時候。 調(diào)用具有不確定因素的 UDF 時復(fù)制也可能出問題 使用以下函數(shù)的語句也無法被復(fù)制: LOAD_FILE() UUID() USER() FOUND_ROWS() SYSDATE() (除非啟動時啟用了 --sysdate-is-now 選項)
INSERT ... SELECT 會產(chǎn)生比 RBR 更多的行級鎖
復(fù)制需要進行全表掃描(WHERE 語句中沒有使用到索引)的 UPDATE 時,需要比 RBR 請求更多的行級鎖
對于有 AUTO_INCREMENT 字段的 InnoDB表而言,INSERT 語句會阻塞其他 INSERT 語句
對于一些復(fù)雜的語句,在從服務(wù)器上的耗資源情況會更嚴重,而 RBR 模式下,只會對那個發(fā)生變化的記錄產(chǎn)生影響
存儲函數(shù)(不是存儲過程)在被調(diào)用的同時也會執(zhí)行一次 NOW() 函數(shù),這個可以說是壞事也可能是好事
確定了的 UDF 也需要在從服務(wù)器上執(zhí)行
數(shù)據(jù)表必須幾乎和主服務(wù)器保持一致才行,否則可能會導(dǎo)致復(fù)制出錯
執(zhí)行復(fù)雜語句如果出錯的話,會消耗更多資源
RBR 的優(yōu)點:
INSERT ... SELECT 包含 AUTO_INCREMENT 字段的 INSERT 沒有附帶條件或者并沒有修改很多記錄的 UPDATE 或 DELETE 語句執(zhí)行 INSERT,UPDATE,DELETE 語句時鎖更少,從服務(wù)器上采用多線程來執(zhí)行復(fù)制成為可能。
RBR 的缺點:
binlog 大了很多 復(fù)雜的回滾時 binlog 中會包含大量的數(shù)據(jù)
UDF 產(chǎn)生的大 BLOB 值會導(dǎo)致復(fù)制變慢
當在非事務(wù)表上執(zhí)行一段堆積的SQL語句時,最好采用 SBR 模式,否則很容易導(dǎo)致主從服務(wù)器的數(shù)據(jù)不一致情況發(fā)生。
如果是采用 INSERT,UPDATE,DELETE 直接操作表的情況,則日志格式根據(jù) binlog_format 的設(shè)定而記錄
如果是采用 GRANT,REVOKE,SET PASSWORD 等管理語句來做的話,那么無論如何都采用 SBR 模式記錄
注:采用 RBR 模式后,能解決很多原先出現(xiàn)的主鍵重復(fù)問題。
2.5 如何查看復(fù)制格式
mysql>?show?variables?like?'binlog_format';

static?Sys_var_enum?Sys_binlog_format(
????"binlog_format",
????"What?form?of?binary?logging?the?master?will?"
????"use:?either?ROW?for?row-based?binary?logging,?STATEMENT?"
????"for?statement-based?binary?logging,?or?MIXED.?MIXED?is?statement-"
????"based?binary?logging?except?for?those?statements?where?only?row-"
????"based?is?correct:?those?which?involve?user-defined?functions?(i.e.?"
????"UDFs)?or?the?UUID()?function;?for?those,?row-based?binary?logging?is?"
????"automatically?used.?If?NDBCLUSTER?is?enabled?and?binlog-format?is?"
????"MIXED,?the?format?switches?to?row-based?and?back?implicitly?per?each?"
????"query?accessing?an?NDBCLUSTER?table",
????SESSION_VAR(binlog_format),?CMD_LINE(REQUIRED_ARG,?OPT_BINLOG_FORMAT),
????binlog_format_names,?
????DEFAULT(BINLOG_FORMAT_ROW),?//默認binlog的同步為行復(fù)制
????NO_MUTEX_GUARD,
????NOT_IN_BINLOG,?ON_CHECK(binlog_format_check),
????ON_UPDATE(fix_binlog_format_after_update));
//?Values?for?binlog_format?sysvar
enum?enum_binlog_format?{
??BINLOG_FORMAT_MIXED?=?0,??///<混合模式?statement?if?safe,?otherwise?row?-?autodetected
??BINLOG_FORMAT_STMT?=?1,???///??BINLOG_FORMAT_ROW?=?2,????///<行復(fù)制?row-based
??BINLOG_FORMAT_UNSPEC?=
??????3??///};
3.建立主從關(guān)系

3.1 slave端start_salve方法
bool?start_slave(THD?*thd,?LEX_SLAVE_CONNECTION?*connection_param,
?????????????????LEX_MASTER_INFO?*master_param,?int?thread_mask_input,
?????????????????Master_info?*mi,?bool?set_mts_settings)?{
??bool?is_error?=?false;
??int?thread_mask;
??DBUG_TRACE;
????
??lock_slave_threads(mi);??//停止運行線程
??//?獲取已停止線程的掩碼
??init_thread_mask(&thread_mask,?mi,?true?/*?inverse?*/);
??/*
????我們將停止下面的所有線程。但如果用戶想只啟動一個線程,
????就好像另一個線程正在運行一樣(就像我們不要想碰另一根線),
????所以將位設(shè)置為0其他線程
??*/
??if?(thread_mask_input)?{
????thread_mask?&=?thread_mask_input;
??}
??if?(thread_mask)??//?一些線程停止,啟動它們
??{
????if?(load_mi_and_rli_from_repositories(mi,?false,?thread_mask))?{
??????is_error?=?true;
??????my_error(ER_MASTER_INFO,?MYF(0));
????}?else?if?(*mi->host?||?!(thread_mask?&?SLAVE_IO))?{
??????/*
????????如果我們要啟動IO線程,我們需要考慮通過啟動從機提供的選項。
??????*/
??????if?(thread_mask?&?SLAVE_IO)?{
????????if?(connection_param->user)?{?//設(shè)置用戶
??????????mi->set_start_user_configured(true);
??????????mi->set_user(connection_param->user);
????????}
????????if?(connection_param->password)?{?//設(shè)置密碼
??????????mi->set_start_user_configured(true);
??????????mi->set_password(connection_param->password);
????????}
????????if?(connection_param->plugin_auth)?//設(shè)置授權(quán)插件
??????????mi->set_plugin_auth(connection_param->plugin_auth);
????????if?(connection_param->plugin_dir)?//插件目錄
??????????mi->set_plugin_dir(connection_param->plugin_dir);
??????}
????????//...
????????//初始化設(shè)置
????????int?slave_errno?=?mi->rli->init_until_option(thd,?master_param);
????????if?(slave_errno)?{
??????????my_error(slave_errno,?MYF(0));
??????????is_error?=?true;
????????}
????????if?(!is_error)?is_error?=?check_slave_sql_config_conflict(mi->rli);
??????}?else?if?(master_param->pos?||?master_param->relay_log_pos?||
?????????????????master_param->gtid)
????????push_warning(thd,?Sql_condition::SL_NOTE,?ER_UNTIL_COND_IGNORED,
?????????????????????ER_THD(thd,?ER_UNTIL_COND_IGNORED));
??????if?(!is_error)
????????//啟動slave線程
????????is_error?=
????????????start_slave_threads(false?/*need_lock_slave=false*/,
????????????????????????????????true?/*wait_for_start=true*/,?mi,?thread_mask);
????}?else?{
??????is_error?=?true;
??????my_error(ER_BAD_SLAVE,?MYF(0));
????}
??}?else?{
????/*?如果所有線程都已啟動,則沒有錯誤,只有一個警告?*/
????push_warning_printf(
????????thd,?Sql_condition::SL_NOTE,?ER_SLAVE_CHANNEL_WAS_RUNNING,
????????ER_THD(thd,?ER_SLAVE_CHANNEL_WAS_RUNNING),?mi->get_channel());
??}
??/*
????如果有人試圖啟動,請清除啟動信息,IO線程以避免任何安全問題。
??*/
??if?(is_error?&&?(thread_mask?&?SLAVE_IO)?==?SLAVE_IO)?mi->reset_start_info();
??unlock_slave_threads(mi);
??mi->channel_unlock();
??return?is_error;
}
bool?start_slave_threads(bool?need_lock_slave,?bool?wait_for_start,
?????????????????????????Master_info?*mi,?int?thread_mask)?{
??mysql_mutex_t?*lock_io?=?nullptr,?*lock_sql?=?nullptr,
????????????????*lock_cond_io?=?nullptr,?*lock_cond_sql?=?nullptr;
??mysql_cond_t?*cond_io?=?nullptr,?*cond_sql?=?nullptr;
??bool?is_error?=?false;
??DBUG_TRACE;
??DBUG_EXECUTE_IF("uninitialized_master-info_structure",?mi->inited?=?false;);
??//...
??
??if?(thread_mask?&?SLAVE_IO)??//判斷是否支持SLAVE_IO
????is_error?=?start_slave_thread(
#ifdef?HAVE_PSI_THREAD_INTERFACE
????????key_thread_slave_io,
#endif
????????handle_slave_io,?lock_io,?lock_cond_io,?cond_io,?&mi->slave_running,
????????&mi->slave_run_id,?mi);??//調(diào)用handle_slave_io方法
????????
??if?(!is_error?&&?(thread_mask?&?SLAVE_SQL))?{?//判斷是否支持SLAVE_SQL
???
????//...
????if?(!is_error)
??????is_error?=?start_slave_thread(
#ifdef?HAVE_PSI_THREAD_INTERFACE
??????????key_thread_slave_sql,
#endif
??????????handle_slave_sql,?lock_sql,?lock_cond_sql,?cond_sql,
??????????&mi->rli->slave_running,?&mi->rli->slave_run_id,?mi);?//調(diào)用handle_slave_sql方法
????if?(is_error)
??????terminate_slave_threads(mi,?thread_mask?&?SLAVE_IO,
??????????????????????????????rpl_stop_slave_timeout,?need_lock_slave);
??}
??return?is_error;
}

3.2 slave端handle_slave_io方法
extern?"C"?void?*handle_slave_io(void?*arg)?{
??//...
??my_thread_init();
??{
????//初始化slave線程
????if?(init_slave_thread(thd,?SLAVE_THD_IO))?{
??????mysql_cond_broadcast(&mi->start_cond);
??????mysql_mutex_unlock(&mi->run_lock);
??????mi->report(ERROR_LEVEL,?ER_SLAVE_FATAL_ERROR,
?????????????????ER_THD(thd,?ER_SLAVE_FATAL_ERROR),
?????????????????"Failed?during?slave?I/O?thread?initialization?");
??????goto?err;
????}
????//...
????
????mysql_cond_broadcast(&mi->start_cond);?//調(diào)用做喚醒操作
????
????//...
????
????//發(fā)起登陸操作
????successfully_connected?=?!safe_connect(thd,?mysql,?mi);
????//?we?can?get?killed?during?safe_connect
#ifdef?HAVE_SETNS
????if?(mi->is_set_network_namespace())?{
??????//?Restore?original?network?namespace?used?to?be?before?connection?has
??????//?been?created
??????successfully_connected?=
??????????restore_original_network_namespace()?|?successfully_connected;
????}
#endif
????//...
????/*
??????注冊slave到master
????*/
????THD_STAGE_INFO(thd,?stage_registering_slave_on_master);
????if?(register_slave_on_master(mysql,?mi,?&suppress_warnings))?{
??????if?(!check_io_slave_killed(thd,?mi,
?????????????????????????????????"Slave?I/O?thread?killed?"
?????????????????????????????????"while?registering?slave?on?master"))?{
????????LogErr(ERROR_LEVEL,?ER_RPL_SLAVE_IO_THREAD_CANT_REGISTER_ON_MASTER);
????????if?(try_to_reconnect(thd,?mysql,?mi,?&retry_count,?suppress_warnings,
?????????????????????????????reconnect_messages[SLAVE_RECON_ACT_REG]))
??????????goto?err;
??????}?else
????????goto?err;
??????goto?connected;
????}
????//...
????
????while?(!io_slave_killed(thd,?mi))?{
??????MYSQL_RPL?rpl;
??????THD_STAGE_INFO(thd,?stage_requesting_binlog_dump);
??????if?(request_dump(thd,?mysql,?&rpl,?mi,?&suppress_warnings))?{?//發(fā)起dump指令
????????LogErr(ERROR_LEVEL,?ER_RPL_SLAVE_ERROR_REQUESTING_BINLOG_DUMP,
???????????????mi->get_for_channel_str());
????????if?(check_io_slave_killed(thd,?mi,
??????????????????????????????????"Slave?I/O?thread?killed?while?\
requesting?master?dump")?||
????????????try_to_reconnect(thd,?mysql,?mi,?&retry_count,?suppress_warnings,
?????????????????????????????reconnect_messages[SLAVE_RECON_ACT_DUMP]))
??????????goto?err;
????????goto?connected;
??????}
??????//...
????}
??
??//...
??
??my_thread_end();?//線程結(jié)束
#if?OPENSSL_VERSION_NUMBER?0x10100000L
??ERR_remove_thread_state(0);
#endif?/*?OPENSSL_VERSION_NUMBER?0x10100000L?*/
??my_thread_exit(nullptr);?//退出線程
??return?(nullptr);??//?Avoid?compiler?warnings
}

3.4 master建立實現(xiàn)
bool?dispatch_command(THD?*thd,?const?COM_DATA?*com_data,
??????????????????????enum?enum_server_command?command)?{
??
??//...
??
??switch?(command)?{
????
????case?COM_REGISTER_SLAVE:?{?//注冊slave到master
??????//?TODO:?access?of?protocol_classic?should?be?removed
??????if?(!register_slave(thd,?thd->get_protocol_classic()->get_raw_packet(),
??????????????????????????thd->get_protocol_classic()->get_packet_length()))
????????my_ok(thd);
??????break;
????}
????
????//...
????
????case?COM_BINLOG_DUMP_GTID:?//binlog_dump_gtid
??????//?TODO:?access?of?protocol_classic?should?be?removed
??????error?=?com_binlog_dump_gtid(
??????????thd,?(char?*)thd->get_protocol_classic()->get_raw_packet(),
??????????thd->get_protocol_classic()->get_packet_length());
??????break;
????case?COM_BINLOG_DUMP:?//binlog_dump
??????//?TODO:?access?of?protocol_classic?should?be?removed
??????error?=?com_binlog_dump(
??????????thd,?(char?*)thd->get_protocol_classic()->get_raw_packet(),
??????????thd->get_protocol_classic()->get_packet_length());
??????break;
??}
??return?error;
}
bool?com_binlog_dump(THD?*thd,?char?*packet,?size_t?packet_length)?{
??DBUG_TRACE;
??ulong?pos;
??ushort?flags?=?0;
??const?uchar?*packet_position?=?(uchar?*)packet;
??size_t?packet_bytes_todo?=?packet_length;
??DBUG_ASSERT(!thd->status_var_aggregated);
??thd->status_var.com_other++;
??thd->enable_slow_log?=?opt_log_slow_admin_statements;
??if?(check_global_access(thd,?REPL_SLAVE_ACL))?return?false;
??/*
????4?bytes?is?too?little,?but?changing?the?protocol?would?break
????compatibility.??This?has?been?fixed?in?the?new?protocol.?@see
????com_binlog_dump_gtid().
??*/
??READ_INT(pos,?4);
??READ_INT(flags,?2);
??READ_INT(thd->server_id,?4);
??DBUG_PRINT("info",
?????????????("pos=%lu?flags=%d?server_id=%d",?pos,?flags,?thd->server_id));
??kill_zombie_dump_threads(thd);
??query_logger.general_log_print(thd,?thd->get_command(),?"Log:?'%s'??Pos:?%ld",
?????????????????????????????????packet?+?10,?(long)pos);
??mysql_binlog_send(thd,?thd->mem_strdup(packet?+?10),?(my_off_t)pos,?nullptr,
????????????????????flags);?//發(fā)送binlog
??unregister_slave(thd,?true,?true?/*need_lock_slave_list=true*/);
??/*?如果我們到了這里,線程需要終止?*/
??return?true;
error_malformed_packet:
??my_error(ER_MALFORMED_PACKET,?MYF(0));
??return?true;
}

4.主從建立過程中數(shù)據(jù)包

#查詢當前時間戳
SELECT?UNIX_TIMESTAMP()
#查詢master的serverid
SELECT?@@GLOBAL.SERVER_ID
#設(shè)置心跳周期,單位為納秒,其實只有30s。初始化心跳如圖4-2
SET?@master_heartbeat_period=?30000001024
#設(shè)置master_binlog_checksum
SET?@master_binlog_checksum=?@@global.binlog_checksum
#查詢master_binlog_checksum
SELECT?@master_binlog_checksum
#獲得是否支持gtid
SELECT?@@GLOBAL.GTID_MODE
#查詢server?uuid
SELECT?@@GLOBAL.SERVER_UUID
#設(shè)置slave?uuid
SET?@slave_uuid=?'2dc27df4-e143-11ea-b396-cc679ee1902b'

總結(jié)
mysql復(fù)制模式分為三種:STATEMENT模式(SBR)、ROW模式(RBR)、MIXED模式(MBR)。 mysql8.0.20默認ROW模式(RBR)。 發(fā)送binlog支持兩種形式:COM_BINLOG_DUMP與COM_BINLOG_DUMP_GTID。 默認情況master_heartbeat_period為30秒,單位為納秒。 IO Thread與SQL Thread其實就是對應(yīng)handle_slave_io方法與handle_slave_sql方法。

