Redis 6.0 IO線程功能分析
Redis多線程原理
Redis 6.0?的亮點之一就是支持多線程,Redis 分?主線程?和?IO線程,IO線程?只用于讀取客戶端的命令和發(fā)送回復(fù)數(shù)據(jù)給客戶端,處理客戶端命令還是在?主線程?進行,如下圖所示:

從上圖可知,主線程?主要負責(zé)接收客戶端連接,并且分發(fā)到各個?IO線程,而?IO線程?負責(zé)讀取客戶端命令。命令讀取完成后,由?主線程?執(zhí)行命令。主線程?執(zhí)行完命令后,再由?IO線程?把回復(fù)數(shù)據(jù)發(fā)送給客戶端。
讀者可能會問,為什么處理命令不在?IO線程?進行,我覺得主要有兩個原因:
如果處理命令在?
IO線程?進行,那么就會涉及到競爭的問題。因為 Redis 的數(shù)據(jù)庫是共享的,所以如果多個線程同時操作數(shù)據(jù)庫,那么就必須要對數(shù)據(jù)庫進行上鎖,而上鎖是一個比較耗時的操作(因為上鎖可能會導(dǎo)致線程上下文切換)。由于 Redis 6.0 以前一直都是由單線程執(zhí)行命令的,所以如果要改為多線程執(zhí)行命令,那么需要修改大量代碼,而且可能會引入新的問題(比如bug)。所以,為了穩(wěn)定性,繼續(xù)使用單線程執(zhí)行命令是最好的選擇。
為什么要使用多線程呢?主要為了使用多核CPU的優(yōu)勢,下面是使用多線程的測試數(shù)據(jù)(數(shù)據(jù)來源網(wǎng)絡(luò)):


從上面的測試結(jié)果可以看出,多線程版本的 Redis 讀寫QPS都要比單線程版本的高。
Redis 多線程實現(xiàn)
要開啟 Redis 的?IO線程?功能,可以在配置文件中加入以下配置項:
io-threads-do-reads yes # 開啟IO線程
io-threads 6 # 設(shè)置IO線程數(shù)
Redis 在啟動時會根據(jù)配置文件中設(shè)置的?IO線程?數(shù)來啟動?IO線程,啟動?IO線程?在函數(shù)?initThreadedIO()?中完成,代碼如下:
void initThreadedIO(void) {
io_threads_active = 0;
if (server.io_threads_num == 1) return;
...
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
initThreadedIO()?函數(shù)的主要工作是:
為每個IO線程創(chuàng)建一個鏈表,用于放置要進行IO操作的客戶端連接。
為每個IO線程創(chuàng)建一個鎖,用于主線程與IO線程的通信。
調(diào)用?
pthread_create()?系統(tǒng)調(diào)用來創(chuàng)建IO線程,IO線程的主體函數(shù)是?IOThreadMain()。
下面我們來分析一下IO線程的主體函數(shù)主要完成的工作:
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
...
while (1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
if (io_threads_pending[id] == 0) { // 不等于0表示有客戶端連接需要處理
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
...
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
...
}
}
IO線程的主體函數(shù)主要完成以下幾個操作:
等待主線程分配客戶端連接(對應(yīng)IO線程的?
io_threads_list?鏈表不為空)。判斷當(dāng)前是進行讀操作還是寫操作(
io_threads_op?等于?IO_THREADS_OP_WRITE?表示要進行寫操作,而?io_threads_op?等于?IO_THREADS_OP_READ?表示要進行讀操作)。如果是進行寫操作,那么就調(diào)用?writeToClient()?函數(shù)向客戶端連接進行發(fā)送數(shù)據(jù)。如果是讀操作,那么就調(diào)用?readQueryFromClient()?函數(shù)讀取客戶端連接的請求。完成對客戶端連接的讀寫操作后,需要清空對應(yīng)IO線程的?
io_threads_list?鏈表和計數(shù)器?io_threads_pending,用于通知主線程已經(jīng)完成讀寫操作。
那么,主線程是怎樣分配客戶端連接給各個IO線程的呢?
主線程在接收到客戶端連接后,會把客戶端連接添加到事件驅(qū)動庫中監(jiān)聽其讀事件,讀事件的回調(diào)函數(shù)為?readQueryFromClient()。也就是說,當(dāng)客戶端連接可讀時會觸發(fā)調(diào)用?readQueryFromClient()?函數(shù),而?readQueryFromClient()?函數(shù)會調(diào)用?postponeClientRead()?函數(shù)判斷當(dāng)前 Redis 是否開啟了?IO線程?功能,代碼如下:
int postponeClientRead(client *c) {
if (io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
postponeClientRead()?函數(shù)主要判斷 Redis 是否開啟了?IO線程?功能,如果開啟了就調(diào)用?listAddNodeHead()?函數(shù)把客戶端連接添加到?clients_pending_read?鏈表中,并且設(shè)置客戶端連接的?CLIENT_PENDING_READ?標(biāo)志位,表示當(dāng)前連接已經(jīng)在?clients_pending_read?鏈表中,防止二次添加。
把客戶端連接添加到?clients_pending_read?鏈表后,主線程會在?handleClientsWithPendingReadsUsingThreads()?函數(shù)中把客戶端連接分配給各個?IO線程。代碼如下:
int handleClientsWithPendingReadsUsingThreads(void) {
...
/* 分配給各個IO線程 */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 設(shè)置各個IO線程負責(zé)的客戶端連接數(shù)
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主線程也要負責(zé)一部分客戶端連接的讀寫操作
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 等待所有IO線程完成
while (1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
...
// 執(zhí)行各個客戶端連接的命令
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
continue;
}
}
processInputBuffer(c);
}
return processed;
}
handleClientsWithPendingReadsUsingThreads()?函數(shù)主要完成以下幾個操作:
分配客戶端連接給各個?
IO線程(添加到對應(yīng)?IO線程?的?io_threads_list?鏈表中),分配策略為輪詢。設(shè)置各個?
IO線程?負責(zé)的客戶端連接數(shù)?io_threads_pending。處理主線程負責(zé)那部分客戶端連接的讀寫操作。
等待所有?
IO線程?完成讀取客戶端連接請求的命令。執(zhí)行各個客戶端連接請求的命令。
前面說過,IO線程?在完成讀取客戶端連接的請求后,會把?io_threads_pending?計數(shù)器清零,主線程就是通過檢測?io_threads_pending?計數(shù)器來判斷是否所有?IO線程?都完成了對客戶端連接的讀取命令操作。
但這里要吐槽一下的是,在等待?IO線程?讀取客戶端請求時,居然用了一個死循環(huán)來等待,這樣有可能會導(dǎo)致CPU使用率飆升的問題,有可能影響其他服務(wù)的運行(不知道作者怎么想的)。我覺得比較合適的方式是,各個?IO線程?完成了讀取命令操作后,通過一個信號來通知主線程。
