在Go中如何正確重試請求?

導語 | 我們平時在開發(fā)中肯定避不開的一個問題是如何在不可靠的網絡服務中實現(xiàn)可靠的網絡通信,其中http請求重試是經常用的技術。但是Go標準庫net/http 實際上是沒有重試這個功能的,所以本篇文章主要講解如何在Go中實現(xiàn)請求重試。
一、概述
要理解cpo機制的產生和使用,并不是一件容易的事。說實話,筆者第一次看到這個機制。
一般而言,對于網絡通信失敗的處理分為以下幾步:
感知錯誤。通過不同的錯誤碼來識別不同的錯誤,在HTTP中status code可以用來識別不同類型的錯誤;
重試決策。這一步主要用來減少不必要的重試,比如HTTP的4xx的錯誤,通常4xx表示的是客戶端的錯誤,這時候客戶端不應該進行重試操作,或者在業(yè)務中自定義的一些錯誤也不應該被重試。根據(jù)這些規(guī)則的判斷可以有效的減少不必要的重試次數(shù),提升響應速度;
重試策略。重試策略就包含了重試間隔時間,重試次數(shù)等。如果次數(shù)不夠,可能并不能有效的覆蓋這個短時間故障的時間段,如果重試次數(shù)過多,或者重試間隔太小,又可能造成大量的資源(CPU、內存、線程、網絡)浪費。這個我們后面再說;
對沖策略。對沖是指在不等待響應的情況主動發(fā)送單次調用的多個請求,然后取首個返回的回包。這個概念是grpc中的概念,我把它也借用過來;
熔斷降級。如果重試之后還是不行,說明這個故障不是短時間的故障,而是長時間的故障。那么可以對服務進行熔斷降級,后面的請求不再重試,這段時間做降級處理,減少沒必要的請求,等服務端恢復了之后再進行請求,這方面的實現(xiàn)很多go-zero、sentinel、hystrix-go,也蠻有意思的;
二、重試策略
重試策略可以分為很多種,一方面要考慮到本次請求時長過長而影響到的業(yè)務忍受度,另一方面要考慮到重試會對下游服務產生過多的請求而帶來的影響,總之就是一個trade-off的問題。
所以對于重試算法,一般是在重試之間加一個gap時間,感興趣的朋友也可以去看看這篇文章(https://aws.amazon.com/cn/blogs/architecture/exponential-backoff-and-jitter/)。結合我們自己平時的實踐加上這篇文章的算法一般可以總結出以下幾條規(guī)則:
線性間隔(Linear Backoff):每次重試間隔時間是固定的進行重試,如每1s重試一次;
線性間隔+隨機時間(Linear Jitter Backoff):有時候每次重試間隔時間一致可能會導致多個請求在同一時間請求,那么我們可以加入一個隨機時間,在線性間隔時間的基礎上波動一個百分比的時間;
指數(shù)間隔(Exponential Backoff):每次間隔時間是2指數(shù)型的遞增,如等3s 9s 27s后重試;
指數(shù)間隔+隨機時間(Exponential Jitter Backoff):這個就和第二個類似了,在指數(shù)遞增的基礎上添加一個波動時間;
上面有兩種策略都加入了擾動(jitter),目的是防止驚群問題 (Thundering Herd Problem)的發(fā)生。
所謂驚群問題當許多進程都在等待被同一事件喚醒的時候,當事件發(fā)生后最后只有一個進程能獲得處理。其余進程又造成阻塞,這會造成上下文切換的浪費。所以加入一個隨機時間來避免同一時間同時請求服務端還是很有必要的。
使用net/http重試所帶來的問題
重試這個操作其實對于Go來說其實還不能直接加一個for循環(huán)根據(jù)次數(shù)來進行,對于Get請求重試的時候沒有請求體,可以直接進行重試,但是對于Post請求來說需要把請求體放到Reader里面,如下:
req, _ := http.NewRequest("POST", "localhost", strings.NewReader("hello"))服務端收到請求之后就會從這個Reader中調用Read()函數(shù)去讀取數(shù)據(jù),通常情況當服務端去讀取數(shù)據(jù)的時候,offset會隨之改變,下一次再讀的時候會從offset位置繼續(xù)向后讀取。所以如果直接重試,會出現(xiàn)讀不到 Reader的情況。
我們可以先弄一個例子:
func main() {go func() {http.HandleFunc("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {time.Sleep(time.Millisecond * 20)body, _ := ioutil.ReadAll(r.Body)fmt.Printf("received body with length %v containing: %v\n", len(body), string(body))w.WriteHeader(http.StatusOK)}))http.ListenAndServe(":8090", nil)}()fmt.Print("Try with bare strings.Reader\n")retryDo(req)}func retryDo() {originalBody := []byte("abcdefghigklmnopqrst")reader := strings.NewReader(string(originalBody))req, _ := http.NewRequest("POST", "http://localhost:8090/", reader)client := http.Client{Timeout: time.Millisecond * 10,}for {_, err := client.Do(req)if err != nil {fmt.Printf("error sending the first time: %v\n", err)}time.Sleep(1000)}}// output:error sending the first time: Post "http://localhost:8090/": context deadline exceeded (Client.Timeout exceeded while awaiting headers)error sending the first time: Post "http://localhost:8090/": http: ContentLength=20 with Body length 0error sending the first time: Post "http://localhost:8090/": http: ContentLength=20 with Body length 0received body with length 20 containing: abcdefghigklmnopqrsterror sending the first time: Post "http://localhost:8090/": http: ContentLength=20 with Body length 0....
在上面這個例子中,在客戶端設值了10ms的超時時間。在服務端模擬請求處理超時情況,先sleep 20ms,然后再讀請求數(shù)據(jù),這樣必然會超時。
當再次請求的時候,發(fā)現(xiàn)client請求的Body數(shù)據(jù)并不是我們預期的20個長度,而是0,導致了err。因此需要將Body這個Reader進行重置,如下:
func resetBody(request *http.Request, originalBody []byte) {request.Body = io.NopCloser(bytes.NewBuffer(originalBody))request.GetBody = func() (io.ReadCloser, error) {return io.NopCloser(bytes.NewBuffer(originalBody)), nil}}
上面這段代碼中,我們使用io.NopCloser對請求的Body數(shù)據(jù)進行了重置,避免下次請求的時候出現(xiàn)非預期的異常。
那么相對于上面簡陋的例子,還可以完善一下,加上我們上面說的StatusCode重試判斷、重試策略、重試次數(shù)等等,可以寫成這樣:
func retryDo(req *http.Request, maxRetries int, timeout time.Duration,backoffStrategy BackoffStrategy) (*http.Response, error) {var (originalBody []byteerr error)if req != nil && req.Body != nil {originalBody, err = copyBody(req.Body)resetBody(req, originalBody)}if err != nil {return nil, err}AttemptLimit := maxRetriesif AttemptLimit <= 0 {AttemptLimit = 1}client := http.Client{Timeout: timeout,}var resp *http.Response//重試次數(shù)for i := 1; i <= AttemptLimit; i++ {resp, err = client.Do(req)if err != nil {fmt.Printf("error sending the first time: %v\n", err)}// 重試 500 以上的錯誤碼if err == nil && resp.StatusCode < 500 {return resp, err}// 如果正在重試,那么釋放fdif resp != nil {resp.Body.Close()}// 重置bodyif req.Body != nil {resetBody(req, originalBody)}time.Sleep(backoffStrategy(i) + 1*time.Microsecond)}// 到這里,說明重試也沒用return resp, req.Context().Err()}func copyBody(src io.ReadCloser) ([]byte, error) {b, err := ioutil.ReadAll(src)if err != nil {return nil, ErrReadingRequestBody}src.Close()return b, nil}func resetBody(request *http.Request, originalBody []byte) {request.Body = io.NopCloser(bytes.NewBuffer(originalBody))request.GetBody = func() (io.ReadCloser, error) {return io.NopCloser(bytes.NewBuffer(originalBody)), nil}}
三、對沖策略
上面講的是重試的概念,那么有時候我們接口只是偶然會出問題,并且我們的下游服務并不在乎多請求幾次,那么我們可以借用grpc里面的概念:對沖策略(Hedged requests)。
對沖是指在不等待響應的情況主動發(fā)送單次調用的多個請求,然后取首個返回的回包。對沖和重試的區(qū)別點主要在:對沖在超過指定時間沒有響應就會直接發(fā)起請求,而重試則必須要服務端響應后才會發(fā)起請求。所以對沖更像是比較激進的重試策略。
使用對沖的時候需要注意一點是,因為下游服務可能會做負載均衡策略,所以要求請求的下游服務一般是要求冪等的,能夠在多次并發(fā)請求中是安全的,并且是符合預期的。
對沖請求一般是用來處理“長尾”請求的,關于”長尾“請求的概念可以看這篇文章:https://segmentfault.com/a/1190000039978117
四、并發(fā)模式的處理
因為對沖重試加上了并發(fā)的概念,要用到goroutine來并發(fā)請求,所以我們可以把數(shù)據(jù)封裝到channel里面來進行消息的異步處理。
并且由于是多個goroutine處理消息,我們需要在每個goroutine處理完畢,但是都失敗的情況下返回err,不能直接由于channel等待卡住主流程,這一點十分重要。
但是由于在Go中是無法獲取每個goroutine的執(zhí)行結果的,我們又只關注正確處理結果,需要忽略錯誤,所以需要配合WaitGroup來實現(xiàn)流程控制,示例如下:
func main() {totalSentRequests := &sync.WaitGroup{}allRequestsBackCh := make(chan struct{})multiplexCh := make(chan struct {result stringretry int})go func() {//所有請求完成之后會close掉allRequestsBackChtotalSentRequests.Wait()close(allRequestsBackCh)}()for i := 1; i <= 10; i++ {totalSentRequests.Add(1)go func() {// 標記已經執(zhí)行完defer totalSentRequests.Done()// 模擬耗時操作time.Sleep(500 * time.Microsecond)// 模擬處理成功if random.Intn(500)%2 == 0 {multiplexCh <- struct {result stringretry int}{"finsh success", i}}// 處理失敗不關心,當然,也可以加入一個錯誤的channel中進一步處理}()}select {case <-multiplexCh:fmt.Println("finish success")case <-allRequestsBackCh:// 到這里,說明全部的 goroutine 都執(zhí)行完畢,但是都請求失敗了fmt.Println("all req finish,but all fail")}}
從上面這段代碼看為了進行流程控制,多用了兩個channel:totalSentRequests、allRequestsBackCh,多用了一個goroutine異步關停allRequestsBackCh,才實現(xiàn)的流程控制,實在太過于麻煩,有新的實現(xiàn)方案的同學不妨和我探討一下。
除了上面的并發(fā)請求控制的問題,對于對沖重試來說,還需要注意的是,由于請求不是串行的,所以http.Request的上下文會變,所以每次請求前需要clone一次context,保證每個不同請求的context是獨立的。但是每次clone之后Reader的offset位置又變了,所以我們還需要進行重新reset:
func main() {req, _ := http.NewRequest("POST", "localhost", strings.NewReader("hello"))req2 := req.Clone(req.Context())contents, _ := io.ReadAll(req.Body)contents2, _ := io.ReadAll(req2.Body)fmt.Printf("First read: %v\n", string(contents))fmt.Printf("Second read: %v\n", string(contents2))}//output:First read: helloSecond read:
所以結合一下上面的例子,我們可以將對沖重試的代碼變?yōu)椋?/span>
func retryHedged(req *http.Request, maxRetries int, timeout time.Duration,backoffStrategy BackoffStrategy) (*http.Response, error) {var (originalBody []byteerr error)if req != nil && req.Body != nil {originalBody, err = copyBody(req.Body)}if err != nil {return nil, err}AttemptLimit := maxRetriesif AttemptLimit <= 0 {AttemptLimit = 1}client := http.Client{Timeout: timeout,}// 每次請求copy新的requestcopyRequest := func() (request *http.Request) {request = req.Clone(req.Context())if request.Body != nil {resetBody(request, originalBody)}return}multiplexCh := make(chan struct {resp *http.Responseerr errorretry int})totalSentRequests := &sync.WaitGroup{}allRequestsBackCh := make(chan struct{})go func() {totalSentRequests.Wait()close(allRequestsBackCh)}()var resp *http.Responsefor i := 1; i <= AttemptLimit; i++ {totalSentRequests.Add(1)go func() {// 標記已經執(zhí)行完defer totalSentRequests.Done()req = copyRequest()resp, err = client.Do(req)if err != nil {fmt.Printf("error sending the first time: %v\n", err)}// 重試 500 以上的錯誤碼if err == nil && resp.StatusCode < 500 {multiplexCh <- struct {resp *http.Responseerr errorretry int}{resp: resp, err: err, retry: i}return}// 如果正在重試,那么釋放fdif resp != nil {resp.Body.Close()}// 重置bodyif req.Body != nil {resetBody(req, originalBody)}time.Sleep(backoffStrategy(i) + 1*time.Microsecond)}()}select {case res := <-multiplexCh:return res.resp, res.errcase <-allRequestsBackCh:// 到這里,說明全部的 goroutine 都執(zhí)行完畢,但是都請求失敗了return nil, errors.New("all req finish,but all fail")}}
五、熔斷&降級
因為在我們使用http調用的時候,調用的外部服務很多時候其實并不可靠,很有可能因為外部的服務問題導致自身服務接口調用等待,從而調用時間過長,產生大量的調用積壓,慢慢耗盡服務資源,最終導致服務調用雪崩的發(fā)生,所以在服務中使用熔斷降級是非常有必要的一件事。
其實熔斷降級的概念總體上來說,實現(xiàn)都差不多。核心思想就是通過全局的計數(shù)器,用來統(tǒng)計調用次數(shù)、成功/失敗次數(shù)。通過統(tǒng)計的計數(shù)器來判斷熔斷器的開關,熔斷器的狀態(tài)由三種狀態(tài)表示:closed、open、half open,下面借用了sentinel的圖來表示三者的關系:

首先初始狀態(tài)是closed,每次調用都會經過計數(shù)器統(tǒng)計總次數(shù)和成功/失敗次數(shù),然后在達到一定閾值或條件之后熔斷器會切換到open狀態(tài),發(fā)起的請求會被拒絕。
熔斷器規(guī)則中會配置一個熔斷超時重試的時間,經過熔斷超時重試時長后熔斷器會將狀態(tài)置為half-open狀態(tài)。這個狀態(tài)對于sentinel來說會發(fā)起定時探測,對于go-zero來說會允許通過一定比例的請求,不管是主動定時探測,還是被動通過的請求調用,只要請求的結果返回正常,那么就需要重置計數(shù)器恢復到closed狀態(tài)。
一般而言會支持兩種熔斷策略:
錯誤比率:熔斷時間窗口內的請求數(shù)閾值錯誤率大于錯誤率閾值,從而觸發(fā)熔斷。
平均RT(響應時間):熔斷時間窗口內的請求數(shù)閾值大于平均 RT 閾值,從而觸發(fā)熔斷。
比如我們使用hystrix-go來處理我們的服務接口的熔斷,可以結合我們上面說的重試從而進一步保障我們的服務。
hystrix.ConfigureCommand("my_service", hystrix.CommandConfig{ErrorPercentThreshold: 30,})_ = hystrix.Do("my_service", func() error {req, _ := http.NewRequest("POST", "http://localhost:8090/", strings.NewReader("test"))_, err := retryDo(req, 5, 20*time.Millisecond, ExponentialBackoff)if err != nil {fmt.Println("get error:%v",err)return err}return nil}, func(err error) error {fmt.Printf("handle error:%v\n", err)return nil})
上面這個例子中就利用hystrix-go設置了最大錯誤百分比等于30,超過這個閾值就會進行熔斷。
總結
這篇文章從接口調用出發(fā),探究了重試的幾個要點,講解了重試的幾種策略;然后在實踐環(huán)節(jié)中講解了直接使用net/http重試會有什么問題,對于對沖策略使用channel加上waitgroup來實現(xiàn)并發(fā)請求控制;最后使用hystrix-go來對故障服務進行熔斷,防止請求堆積引起資源耗盡的問題。
參考資料:
1.從gRPC的重試策略說起
2.Go HTTP如何正確重試
3.熔斷原理與實現(xiàn)
4.處理過載
5.Google怎么解決長尾延遲問題
作者簡介
羅志赟
騰訊后臺開發(fā)工程師
騰訊后臺開發(fā)工程師,深入研究過Go runtime相關代碼,喜歡專研技術細節(jié),探索技術中有趣的實現(xiàn)分享給大家。
推薦閱讀


