1. Go中的HTTP請求之——HTTP1.1請求流程分析

        共 7831字,需瀏覽 16分鐘

         ·

        2022-05-17 10:46

        點擊上方“Go語言進階學習”,進行關注

        回復“Go語言”即可獲贈Python從入門到進階共10本電子書

        薄宦梗猶泛,故園蕪已平。

        前言

        http是目前應用最為廣泛, 也是程序員接觸最多的協(xié)議之一。今天筆者站在GoPher的角度對http1.1的請求流程進行全面的分析。希望讀者讀完此文后, 能夠有以下幾個收獲:

        1. 對http1.1的請求流程有一個大概的了解

        2. 在平時的開發(fā)中能夠更好地重用底層TCP連接

        3. 對http1.1的線頭阻塞能有一個更清楚的認識

        HTTP1.1流程

        今天內(nèi)容較多, 廢話不多說, 直接上干貨。

        接下來, 筆者將根據(jù)流程圖,對除了NewRequest以外的函數(shù)進行逐步的展開和分析

        (*Client).do

        (*Client).do方法的核心代碼是一個沒有結束條件的for循環(huán)。

        for {
        // For all but the first request, create the next
        // request hop and replace req.
        if len(reqs) > 0 {
        loc := resp.Header.Get("Location")
        // ...此處省略代碼...
        err = c.checkRedirect(req, reqs)
        // ...此處省略很多代碼...
        }

        reqs = append(reqs, req)
        var err error
        var didTimeout func() bool
        if resp, didTimeout, err = c.send(req, deadline); err != nil
        {
        // c.send() always closes req.Body
        reqBodyClosed = true
        // ...此處省略代碼...
        return nil, uerr(err)
        }

        var shouldRedirect bool
        redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
        if !shouldRedirect {
        return resp, nil
        }

        req.closeBody()
        }

        上面的代碼中, 請求第一次進入會調(diào)用c.send, 得到響應后會判斷請求是否需要重定向, 如果需要重定向則繼續(xù)循環(huán), 否則返回響應。

        進入重定向流程后, 這里筆者簡單介紹一下checkRedirect函數(shù):

        func defaultCheckRedirect(req *Request, via []*Request) error {
        if len(via) >= 10 {
        return errors.New("stopped after 10 redirects")
        }
        return nil
        }
        // ...
        func (c *Client) checkRedirect(req *Request, via []*Request) error {
        fn := c.CheckRedirect
        if fn == nil {
        fn = defaultCheckRedirect
        }
        return fn(req, via)
        }

        由上可知, 用戶可以自己定義重定向的檢查規(guī)則。如果用戶沒有自定義檢查規(guī)則, 則重定向次數(shù)不能超過10次。

        (*Client).send

        (*Client).send方法邏輯較為簡單, 主要看用戶有沒有為http.Client的Jar字段實現(xiàn)CookieJar接口。主要流程如下:

        1. 如果實現(xiàn)了CookieJar接口, 為Request添加保存的cookie信息。

        2. 調(diào)用send函數(shù)。

        3. 如果實現(xiàn)了CookieJar接口, 將Response中的cookie信息保存下來。

        // didTimeout is non-nil only if err != nil.
        func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
        if c.Jar != nil {
        for _, cookie := range c.Jar.Cookies(req.URL) {
        req.AddCookie(cookie)
        }
        }
        resp, didTimeout, err = send(req, c.transport(), deadline)
        if err != nil {
        return nil, didTimeout, err
        }
        if c.Jar != nil {
        if rc := resp.Cookies(); len(rc) > 0 {
        c.Jar.SetCookies(req.URL, rc)
        }
        }
        return resp, nil, nil
        }

        另外, 我們還需要關注c.transport()的調(diào)用。如果用戶未對http.Client指定Transport則會使用go默認的DefaultTransport。

        該Transport實現(xiàn)RoundTripper接口。在go中RoundTripper的定義為“執(zhí)行單個HTTP事務的能力,獲取給定請求的響應”。

        func (c *Client) transport() RoundTripper {
        if c.Transport != nil {
        return c.Transport
        }
        return DefaultTransport
        }

        send

        send函數(shù)會檢查request的URL,以及參數(shù)的rt, 和header值。如果URL和rt為nil則直接返回錯誤。同時, 如果請求中設置了用戶信息, 還會檢查并設置basic的驗證頭信息,最后調(diào)用rt.RoundTrip得到請求的響應。

        func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
        req := ireq // req is either the original request, or a modified fork
        // ...此處省略代碼...
        if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
        username := u.Username()
        password, _ := u.Password()
        forkReq()
        req.Header = cloneOrMakeHeader(ireq.Header)
        req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
        }

        if !deadline.IsZero() {
        forkReq()
        }
        stopTimer, didTimeout := setRequestCancel(req, rt, deadline)

        resp, err = rt.RoundTrip(req)
        if err != nil {
        // ...此處省略代碼...
        return nil, didTimeout, err
        }
        // ...此處省略代碼...
        return resp, nil, nil
        }

        (*Transport).RoundTrip

        (*Transport).RoundTrip的邏輯很簡單,它會調(diào)用(*Transport).roundTrip方法,因此本節(jié)實際上是對(*Transport).roundTrip方法的分析。

        func (t *Transport) RoundTrip(req *Request) (*Response, error) {
        return t.roundTrip(req)
        }
        func (t *Transport) roundTrip(req *Request) (*Response, error) {
        // ...此處省略校驗header頭和headervalue的代碼以及其他代碼...

        for {
        select {
        case <-ctx.Done():
        req.closeBody()
        return nil, ctx.Err()
        default:
        }

        // treq gets modified by roundTrip, so we need to recreate for each retry.
        treq := &transportRequest{Request: req, trace: trace}
        cm, err := t.connectMethodForRequest(treq)
        // ...此處省略代碼...
        pconn, err := t.getConn(treq, cm)
        if err != nil {
        t.setReqCanceler(req, nil)
        req.closeBody()
        return nil, err
        }

        var resp *Response
        if pconn.alt != nil {
        // HTTP/2 path.
        t.setReqCanceler(req, nil) // not cancelable with CancelRequest
        resp, err = pconn.alt.RoundTrip(req)
        } else {
        resp, err = pconn.roundTrip(treq)
        }
        if err == nil {
        return resp, nil
        }

        // ...此處省略判斷是否重試請求的代碼邏輯...
        }
        }

        由上可知, 每次for循環(huán), 會判斷請求上下文是否已經(jīng)取消, 如果沒有取消則繼續(xù)進行后續(xù)的流程。

        1. 先調(diào)用t.getConn方法獲取一個persistConn。

        2. 因為本篇主旨是http1.1,所以我們直接看http1.1的執(zhí)行分支。根據(jù)源碼中的注釋和實際的debug結果,獲取到連接后, 會繼續(xù)調(diào)用pconn.roundTrip。

        (*Transport).getConn

        筆者認為這一步在http請求中是非常核心的一個步驟,因為只有和server端建立連接后才能進行后續(xù)的通信。

        func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
        req := treq.Request
        trace := treq.trace
        ctx := req.Context()
        // ...此處省略代碼...
        w := &wantConn{
        cm: cm,
        key: cm.key(),
        ctx: ctx,
        ready: make(chan struct{}, 1),
        beforeDial: testHookPrePendingDial,
        afterDial: testHookPostPendingDial,
        }
        // ...此處省略代碼...
        // Queue for idle connection.
        if delivered := t.queueForIdleConn(w); delivered {
        pc := w.pc
        // ...此處省略代碼...
        return pc, nil
        }

        cancelc := make(chan error, 1)
        t.setReqCanceler(req, func(err error) { cancelc <- err })

        // Queue for permission to dial.
        t.queueForDial(w)

        // Wait for completion or cancellation.
        select {
        case <-w.ready:
        // Trace success but only for HTTP/1.
        // HTTP/2 calls trace.GotConn itself.
        if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
        trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
        }
        // ...此處省略代碼...
        return w.pc, w.err
        case <-req.Cancel:
        return nil, errRequestCanceledConn
        case <-req.Context().Done():
        return nil, req.Context().Err()
        case err := <-cancelc:
        if err == errRequestCanceled {
        err = errRequestCanceledConn
        }
        return nil, err
        }
        }

        由上能夠清楚的知道, 獲取連接分為以下幾個步驟:

        1. 調(diào)用t.queueForIdleConn獲取一個空閑且可復用的連接,如果獲取成功則直接返回該連接。

        2. 如果未獲取到空閑連接則調(diào)用t.queueForDial開始新建一個連接。

        3. 等待w.ready關閉,則可以返回新的連接。

        (*Transport).queueForIdleConn

        (*Transport).queueForIdleConn方法會根據(jù)請求的connectMethodKey從t.idleConn獲取一個[]*persistConn切片, 并從切片中,根據(jù)算法獲取一個有效的空閑連接。如果未獲取到空閑連接,則將wantConn結構體變量放入t.idleConnWait[w.key]等待隊列,此處wantConn結構體變量就是前面提到的w

        connectMethodKey定義和queueForIdleConn部分關鍵代碼如下:

        type connectMethodKey struct {
        proxy, scheme, addr string
        onlyH1 bool
        }

        func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
        // ...此處省略代碼...
        // Look for most recently-used idle connection.
        if list, ok := t.idleConn[w.key]; ok {
        stop := false
        delivered := false
        for len(list) > 0 && !stop {
        pconn := list[len(list)-1]

        // See whether this connection has been idle too long, considering
        // only the wall time (the Round(0)), in case this is a laptop or VM
        // coming out of suspend with previously cached idle connections.
        tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
        // ...此處省略代碼...
        delivered = w.tryDeliver(pconn, nil)
        if delivered {
        // ...此處省略代碼...
        }
        stop = true
        }
        if len(list) > 0 {
        t.idleConn[w.key] = list
        } else {
        delete(t.idleConn, w.key)
        }
        if stop {
        return delivered
        }
        }

        // Register to receive next connection that becomes idle.
        if t.idleConnWait == nil {
        t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
        }
        q := t.idleConnWait[w.key]
        q.cleanFront()
        q.pushBack(w)
        t.idleConnWait[w.key] = q
        return false
        }

        其中w.tryDeliver方法主要作用是將連接協(xié)程安全的賦值給w.pc,并關閉w.ready管道。此時我們便可以和(*Transport).getConn中調(diào)用queueForIdleConn成功后的返回值對應上。

        (*Transport).queueForDial

        (*Transport).queueForDial方法包含三個步驟:

        1. 如果t.MaxConnsPerHost小于等于0,執(zhí)行go t.dialConnFor(w)并返回。其中MaxConnsPerHost代表著每個host的最大連接數(shù),小于等于0表示不限制。

        2. 如果當前host的連接數(shù)不超過t.MaxConnsPerHost,對當前host的連接數(shù)+1,然后執(zhí)行go t.dialConnFor(w)并返回。

        3. 如果當前host的連接數(shù)等于t.MaxConnsPerHost,則將wantConn結構體變量放入t.connsPerHostWait[w.key]等待隊列,此處wantConn結構體變量就是前面提到的w。另外在放入等待隊列前會先清除隊列中已經(jīng)失效或者不再等待的變量。

        func (t *Transport) queueForDial(w *wantConn) {
        w.beforeDial()
        if t.MaxConnsPerHost <= 0 {
        go t.dialConnFor(w)
        return
        }

        t.connsPerHostMu.Lock()
        defer t.connsPerHostMu.Unlock()

        if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
        if t.connsPerHost == nil {
        t.connsPerHost = make(map[connectMethodKey]int)
        }
        t.connsPerHost[w.key] = n + 1
        go t.dialConnFor(w)
        return
        }

        if t.connsPerHostWait == nil {
        t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
        }
        q := t.connsPerHostWait[w.key]
        q.cleanFront()
        q.pushBack(w)
        t.connsPerHostWait[w.key] = q
        }
        (*Transport).dialConnFor

        (*Transport).dialConnFor方法調(diào)用t.dialConn獲取一個真正的*persistConn。并將這個連接傳遞給w, 如果w已經(jīng)獲取到了連接,則會傳遞失敗,此時調(diào)用t.putOrCloseIdleConn將連接放回空閑連接池。

        如果連接獲取錯誤則會調(diào)用t.decConnsPerHost減少當前host的連接數(shù)。

        func (t *Transport) dialConnFor(w *wantConn) {
        defer w.afterDial()

        pc, err := t.dialConn(w.ctx, w.cm)
        delivered := w.tryDeliver(pc, err)
        if err == nil && (!delivered || pc.alt != nil) {
        // pconn was not passed to w,
        // or it is HTTP/2 and can be shared.
        // Add to the idle connection pool.
        t.putOrCloseIdleConn(pc)
        }
        if err != nil {
        t.decConnsPerHost(w.key)
        }
        }
        • (*Transport).putOrCloseIdleConn方法

        func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
        if err := t.tryPutIdleConn(pconn); err != nil {
        pconn.close(err)
        }
        }
        func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
        if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
        return errKeepAlivesDisabled
        }
        // ...此處省略代碼...
        t.idleMu.Lock()
        defer t.idleMu.Unlock()
        // ...此處省略代碼...

        // Deliver pconn to goroutine waiting for idle connection, if any.
        // (They may be actively dialing, but this conn is ready first.
        // Chrome calls this socket late binding.
        // See https://insouciant.org/tech/connection-management-in-chromium/.)
        key := pconn.cacheKey
        if q, ok := t.idleConnWait[key]; ok {
        done := false
        if pconn.alt == nil {
        // HTTP/1.
        // Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
        for q.len() > 0 {
        w := q.popFront()
        if w.tryDeliver(pconn, nil) {
        done = true
        break
        }
        }
        } else {
        // HTTP/2.
        // Can hand the same pconn to everyone in the waiting list,
        // and we still won't be done: we want to put it in the idle
        // list unconditionally, for any future clients too.
        for q.len() > 0 {
        w := q.popFront()
        w.tryDeliver(pconn, nil)
        }
        }
        if q.len() == 0 {
        delete(t.idleConnWait, key)
        } else {
        t.idleConnWait[key] = q
        }
        if done {
        return nil
        }
        }

        if t.closeIdle {
        return errCloseIdle
        }
        if t.idleConn == nil {
        t.idleConn = make(map[connectMethodKey][]*persistConn)
        }
        idles := t.idleConn[key]
        if len(idles) >= t.maxIdleConnsPerHost() {
        return errTooManyIdleHost
        }
        // ...此處省略代碼...
        t.idleConn[key] = append(idles, pconn)
        t.idleLRU.add(pconn)
        // ...此處省略代碼...
        // Set idle timer, but only for HTTP/1 (pconn.alt == nil).
        // The HTTP/2 implementation manages the idle timer itself
        // (see idleConnTimeout in h2_bundle.go).
        if t.IdleConnTimeout > 0 && pconn.alt == nil {
        if pconn.idleTimer != nil {
        pconn.idleTimer.Reset(t.IdleConnTimeout)
        } else {
        pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
        }
        }
        pconn.idleAt = time.Now()
        return nil
        }
        func (t *Transport) maxIdleConnsPerHost() int {
        if v := t.MaxIdleConnsPerHost; v != 0 {
        return v
        }
        return DefaultMaxIdleConnsPerHost // 2
        }

        由上可知,將連接放入t.idleConn前,先檢查t.idleConnWait的數(shù)量。如果有請求在等待空閑連接, 則將連接復用,沒有空閑連接時,才將連接放入t.idleConn。連接放入t.idleConn后,還會重置連接的可空閑時間。

        另外在t.putOrCloseIdleConn函數(shù)中還需要注意兩點:

        1. 如果用戶自定義了http.client,且將DisableKeepAlives設置為true,或者將MaxIdleConnsPerHost設置為負數(shù),則連接不會放入t.idleConn即連接不能復用。

        2. 在判斷已有空閑連接數(shù)量時, 如果MaxIdleConnsPerHost 不等于0, 則返回用戶設置的數(shù)量,否則返回默認值2,詳見上面的(*Transport).maxIdleConnsPerHost?函數(shù)。

        綜上, 我們知道對于部分有連接數(shù)限制的業(yè)務, 我們可以為http.Client自定義一個Transport, 并設置Transport的MaxConnsPerHost,MaxIdleConnsPerHost,IdleConnTimeoutDisableKeepAlives從而達到即限制連接數(shù)量,又能保證一定的并發(fā)。

        • (*Transport).decConnsPerHost方法

        func (t *Transport) decConnsPerHost(key connectMethodKey) {
        // ...此處省略代碼...
        t.connsPerHostMu.Lock()
        defer t.connsPerHostMu.Unlock()
        n := t.connsPerHost[key]
        // ...此處省略代碼...

        // Can we hand this count to a goroutine still waiting to dial?
        // (Some goroutines on the wait list may have timed out or
        // gotten a connection another way. If they're all gone,
        // we don't want to kick off any spurious dial operations.)
        if q := t.connsPerHostWait[key]; q.len() > 0 {
        done := false
        for q.len() > 0 {
        w := q.popFront()
        if w.waiting() {
        go t.dialConnFor(w)
        done = true
        break
        }
        }
        if q.len() == 0 {
        delete(t.connsPerHostWait, key)
        } else {
        // q is a value (like a slice), so we have to store
        // the updated q back into the map.
        t.connsPerHostWait[key] = q
        }
        if done {
        return
        }
        }

        // Otherwise, decrement the recorded count.
        if n--; n == 0 {
        delete(t.connsPerHost, key)
        } else {
        t.connsPerHost[key] = n
        }
        }

        由上可知, decConnsPerHost方法主要干了兩件事:

        1. 判斷是否有請求在等待撥號, 如果有則執(zhí)行go t.dialConnFor(w)。

        2. 如果沒有請求在等待撥號, 則減少當前host的連接數(shù)量。

        (*Transport).dialConn

        根據(jù)http.Client的默認配置和實際的debug結果,(*Transport).dialConn方法主要邏輯如下:

        1. 調(diào)用t.dial(ctx, "tcp", cm.addr())創(chuàng)建TCP連接。

        2. 如果是https的請求, 則對請求建立安全的tls傳輸通道。

        3. 為persistConn創(chuàng)建讀寫buffer, 如果用戶沒有自定義讀寫buffer的大小, 根據(jù)writeBufferSize和readBufferSize方法可知, 讀寫bufffer的大小默認為4096。

        4. 執(zhí)行go pconn.readLoop()go pconn.writeLoop()開啟讀寫循環(huán)然后返回連接。

        func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
        pconn = &persistConn{
        t: t,
        cacheKey: cm.key(),
        reqch: make(chan requestAndChan, 1),
        writech: make(chan writeRequest, 1),
        closech: make(chan struct{}),
        writeErrCh: make(chan error, 1),
        writeLoopDone: make(chan struct{}),
        }
        // ...此處省略代碼...
        if cm.scheme() == "https" && t.hasCustomTLSDialer() {
        // ...此處省略代碼...
        } else {
        conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {
        return nil, wrapErr(err)
        }
        pconn.conn = conn
        if cm.scheme() == "https" {
        var firstTLSHost string
        if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
        return nil, wrapErr(err)
        }
        if err = pconn.addTLS(firstTLSHost, trace); err != nil {
        return nil, wrapErr(err)
        }
        }
        }

        // Proxy setup.
        switch { // ...此處省略代碼... }

        if cm.proxyURL != nil && cm.targetScheme == "https" {
        // ...此處省略代碼...
        }

        if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
        // ...此處省略代碼...
        }

        pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
        pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

        go pconn.readLoop()
        go pconn.writeLoop()
        return pconn, nil
        }
        func (t *Transport) writeBufferSize() int {
        if t.WriteBufferSize > 0 {
        return t.WriteBufferSize
        }
        return 4 << 10
        }

        func (t *Transport) readBufferSize() int {
        if t.ReadBufferSize > 0 {
        return t.ReadBufferSize
        }
        return 4 << 10
        }

        (*persistConn).roundTrip

        (*persistConn).roundTrip方法是http1.1請求的核心之一,該方法在這里獲取真實的Response并返回給上層。

        func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
        // ...此處省略代碼...

        gone := make(chan struct{})
        defer close(gone)
        // ...此處省略代碼...
        const debugRoundTrip = false

        // Write the request concurrently with waiting for a response,
        // in case the server decides to reply before reading our full
        // request body.
        startBytesWritten := pc.nwrite
        writeErrCh := make(chan error, 1)
        pc.writech <- writeRequest{req, writeErrCh, continueCh}

        resc := make(chan responseAndError)
        pc.reqch <- requestAndChan{
        req: req.Request,
        ch: resc,
        addedGzip: requestedGzip,
        continueCh: continueCh,
        callerGone: gone,
        }

        var respHeaderTimer <-chan time.Time
        cancelChan := req.Request.Cancel
        ctxDoneChan := req.Context().Done()
        for {
        testHookWaitResLoop()
        select {
        case err := <-writeErrCh:
        // ...此處省略代碼...
        if err != nil {
        pc.close(fmt.Errorf("write error: %v", err))
        return nil, pc.mapRoundTripError(req, startBytesWritten, err)
        }
        // ...此處省略代碼...
        case <-pc.closech:
        // ...此處省略代碼...
        return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
        case <-respHeaderTimer:
        // ...此處省略代碼...
        return nil, errTimeout
        case re := <-resc:
        if (re.res == nil) == (re.err == nil) {
        panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
        }
        if debugRoundTrip {
        req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
        }
        if re.err != nil {
        return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
        }
        return re.res, nil
        case <-cancelChan:
        pc.t.CancelRequest(req.Request)
        cancelChan = nil
        case <-ctxDoneChan:
        pc.t.cancelRequest(req.Request, req.Context().Err())
        cancelChan = nil
        ctxDoneChan = nil
        }
        }
        }

        由上可知, (*persistConn).roundTrip方法可以分為三步:

        1. 向連接的writech寫入writeRequest:?pc.writech <- writeRequest{req, writeErrCh, continueCh}, 參考(*Transport).dialConn可知pc.writech是一個緩沖大小為1的管道,所以會立馬寫入成功。

        2. 向連接的reqch寫入requestAndChan:?pc.reqch <- requestAndChan, pc.reqch和pc.writech一樣都是緩沖大小為1的管道。其中requestAndChan.ch是一個無緩沖的responseAndError管道,(*persistConn).roundTrip就通過這個管道讀取到真實的響應。

        3. 開啟for循環(huán)select, 等待響應或者超時等信息。

        • (*persistConn).writeLoop 寫循環(huán)

        (*persistConn).writeLoop方法主體邏輯相對簡單,把用戶的請求寫入連接的寫緩存buffer, 最后再flush就可以了。

        func (pc *persistConn) writeLoop() {
        defer close(pc.writeLoopDone)
        for {
        select {
        case wr := <-pc.writech:
        startBytesWritten := pc.nwrite
        err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
        if bre, ok := err.(requestBodyReadError); ok {
        err = bre.error
        wr.req.setError(err)
        }
        if err == nil {
        err = pc.bw.Flush()
        }
        if err != nil {
        wr.req.Request.closeBody()
        if pc.nwrite == startBytesWritten {
        err = nothingWrittenError{err}
        }
        }
        pc.writeErrCh <- err // to the body reader, which might recycle us
        wr.ch <- err // to the roundTrip function
        if err != nil {
        pc.close(err)
        return
        }
        case <-pc.closech:
        return
        }
        }
        }
        • (*persistConn).readLoop 讀循環(huán)

        (*persistConn).readLoop有較多的細節(jié), 我們先看代碼, 然后再逐步分析。

        func (pc *persistConn) readLoop() {
        closeErr := errReadLoopExiting // default value, if not changed below
        defer func() {
        pc.close(closeErr)
        pc.t.removeIdleConn(pc)
        }()

        tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
        if err := pc.t.tryPutIdleConn(pc); err != nil {
        // ...此處省略代碼...
        }
        // ...此處省略代碼...
        return true
        }
        // ...此處省略代碼...
        alive := true
        for alive {
        // ...此處省略代碼...
        rc := <-pc.reqch
        trace := httptrace.ContextClientTrace(rc.req.Context())

        var resp *Response
        if err == nil {
        resp, err = pc.readResponse(rc, trace)
        } else {
        err = transportReadFromServerError{err}
        closeErr = err
        }

        // ...此處省略代碼...
        bodyWritable := resp.bodyIsWritable()
        hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

        if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
        // Don't do keep-alive on error if either party requested a close
        // or we get an unexpected informational (1xx) response.
        // StatusCode 100 is already handled above.
        alive = false
        }

        if !hasBody || bodyWritable {
        // ...此處省略代碼...
        continue
        }

        waitForBodyRead := make(chan bool, 2)
        body := &bodyEOFSignal{
        body: resp.Body,
        earlyCloseFn: func() error {
        waitForBodyRead <- false
        <-eofc // will be closed by deferred call at the end of the function
        return nil

        },
        fn: func(err error) error {
        isEOF := err == io.EOF
        waitForBodyRead <- isEOF
        if isEOF {
        <-eofc // see comment above eofc declaration
        } else if err != nil {
        if cerr := pc.canceled(); cerr != nil {
        return cerr
        }
        }
        return err
        },
        }

        resp.Body = body
        // ...此處省略代碼...

        select {
        case rc.ch <- responseAndError{res: resp}:
        case <-rc.callerGone:
        return
        }

        // Before looping back to the top of this function and peeking on
        // the bufio.Reader, wait for the caller goroutine to finish
        // reading the response body. (or for cancellation or death)
        select {
        case bodyEOF := <-waitForBodyRead:
        pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
        alive = alive &&
        bodyEOF &&
        !pc.sawEOF &&
        pc.wroteRequest() &&
        tryPutIdleConn(trace)
        if bodyEOF {
        eofc <- struct{}{}
        }
        case <-rc.req.Cancel:
        alive = false
        pc.t.CancelRequest(rc.req)
        case <-rc.req.Context().Done():
        alive = false
        pc.t.cancelRequest(rc.req, rc.req.Context().Err())
        case <-pc.closech:
        alive = false
        }

        testHookReadLoopBeforeNextRead()
        }
        }

        由上可知, 只要連接處于活躍狀態(tài), 則這個讀循環(huán)會一直開啟, 直到 連接不活躍或者產(chǎn)生其他錯誤才會結束讀循環(huán)。

        在上述源碼中,pc.readResponse(rc,trace)會從連接的讀buffer中獲取一個請求對應的Response。

        讀到響應之后判斷請求是否是HEAD請求或者響應內(nèi)容為空,如果是HEAD請求或者響應內(nèi)容為空則將響應寫入rc.ch,并將連接放入idleConn(此處因為篇幅的原因省略了源碼內(nèi)容, 正常請求的邏輯也有寫響應和將連接放入idleConn兩個步驟)。

        如果不是HEAD請求并且響應內(nèi)容不為空即!hasBody || bodyWritable為false:

        1. 創(chuàng)建一個緩沖大小為2的等待響應被讀取的管道waitForBodyRead:?waitForBodyRead := make(chan bool, 2)

        2. 將響應的Body修改為bodyEOFSignal結構體。通過上面的源碼我們可以知道,此時的resp.Body中有earlyCloseFnfn兩個函數(shù)。earlyCloseFn函數(shù)會向waitForBodyRead管道寫入false, fn函數(shù)會判斷響應是否讀完, 如果已經(jīng)讀完則向waitForBodyRead寫入true否則寫入false

        3. 將修改后的響應寫入rc.ch。其中rc.chrc := <-pc.reqch獲取,而pc.reqch正是前面(*persistConn).roundTrip函數(shù)寫入的requestAndChan。requestAndChan.ch是一個無緩沖的responseAndError管道,(*persistConn).roundTrip通過這個管道讀取到真實的響應。

        4. select 讀取 waitForBodyRead被寫入的值。如果讀到到的是true則可以調(diào)用tryPutIdleConn(此方法會調(diào)用前面提到的(*Transport).tryPutIdleConn方法)將連接放入idleConn從而復用連接。

        waitForBodyRead寫入true的原因我們已經(jīng)知道了,但是被寫入true的時機我們尚不明確。

        func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
        // ...此處省略代碼...
        n, err = es.body.Read(p)
        if err != nil {
        es.mu.Lock()
        defer es.mu.Unlock()
        if es.rerr == nil {
        es.rerr = err
        }
        err = es.condfn(err)
        }
        return
        }

        func (es *bodyEOFSignal) Close() error {
        es.mu.Lock()
        defer es.mu.Unlock()
        if es.closed {
        return nil
        }
        es.closed = true
        if es.earlyCloseFn != nil && es.rerr != io.EOF {
        return es.earlyCloseFn()
        }
        err := es.body.Close()
        return es.condfn(err)
        }

        // caller must hold es.mu.
        func (es *bodyEOFSignal) condfn(err error) error {
        if es.fn == nil {
        return err
        }
        err = es.fn(err)
        es.fn = nil
        return err
        }

        由上述源碼可知, 只有當調(diào)用方完整的讀取了響應,該連接才能夠被復用。因此在http1.1中,一個連接上的請求,只有等前一個請求處理完之后才能繼續(xù)下一個請求。如果前面的請求處理較慢, 則后面的請求必須等待, 這就是http1.1中的線頭阻塞。

        根據(jù)上面的邏輯, 我們GoPher在平時的開發(fā)中如果遇到了不關心響應的請求, 也一定要記得把響應body讀完以保證連接的復用性。筆者在這里給出一個demo:

        io.CopyN(ioutil.Discard, resp.Body, 2 << 10)
        resp.Body.Close()

        以上,就是筆者整理的HTTP1.1的請求流程。

        注意

        筆者本著嚴謹?shù)膽B(tài)度, 特此提醒:

        上述流程中筆者對很多細節(jié)并未詳細提及或者僅一筆帶過,希望讀者酌情參考。

        總結

        1. 在go中發(fā)起http1.1的請求時, 如果遇到不關心響應的請求,請務必完整讀取響應內(nèi)容以保證連接的復用性。

        2. 如果遇到對連接數(shù)有限制的業(yè)務,可以通過自定義http.Client的Transport, 并設置Transport的MaxConnsPerHostMaxIdleConnsPerHost,IdleConnTimeoutDisableKeepAlives的值,來控制連接數(shù)。

        3. 如果對于重定向業(yè)務邏輯有需求,可以自定義http.Client的CheckRedirect。

        4. 在http1.1,中一個連接上的請求,只有等前一個請求處理完之后才能繼續(xù)下一個請求。如果前面的請求處理較慢, 則后面的請求必須等待, 這就是http1.1中的線頭阻塞。

        注: 寫本文時, 筆者所用go版本為: go1.14.2

        生命不息, 探索不止, 后續(xù)將持續(xù)更新有關于go的技術探索

        原創(chuàng)不易, 卑微求關注收藏二連。

        推薦閱讀:

        -------------------?End?-------------------

        歡迎大家點贊,轉(zhuǎn)發(fā),轉(zhuǎn)載,感謝大家的相伴與支持

        想加入學習群請在后臺回復【入群

        萬水千山總是情,點個【在看】行不行

        瀏覽 83
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 国产精品久久777777是什么意思 | 美女逼靠在线网站 | 国产剧情一区二区在线观看 | 淫色视频HD | 爆操逼 |