Go 标准库之 net.http 之 transport 源码解析

database/sql 包提供了保证SQL或类SQL数据库的泛用接口,常用的数据库驱动有 github.com/go-sql-driver/mysql

https://pkg.go.dev/database/sql

环境

下述代码来自 golang 源码的 src/net/http/transport.go 文件,不同版本的 golang 的源码有差异,不过整体思路是一样的。

http.Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type Client struct {
// Transport specifies the mechanism by which individual HTTP requests are made.
// 一个 http.RoundTripper 接口类型的对象,只包含一个方法 RoundTrip,它接受一个 *http.Request 类型的参数,表示 HTTP 请求,返回一个 *http.Response 类型的响应和一个错误对象,该方法的作用是发送HTTP请求并返回响应,同时处理可能出现的传输错误,如超时、连接错误、重定向等。
// If nil, DefaultTransport is used.
// http.RoundTripper 的默认实现是 http.Transport,该实现使用TCP连接池,支持HTTP/1.1、HTTP/2协议,同时还支持HTTPS、代理、压缩和连接复用等特性。如果需要更灵活地控制HTTP请求的传输过程,可以自定义实现http.RoundTripper接口,并将其传递给http.Client的Transport字段。
Transport RoundTripper

// CheckRedirect specifies the policy for handling redirects.
// 指定了处理重定向的策略。用于控制 HTTP 重定向。默认情况下,http.DefaultCheckRedirect 允许自动跟随 HTTP 重定向。
//
// If CheckRedirect is not nil, the client calls it before following an HTTP redirect.
// 如果 CheckRedirect 不为nil,则客户端在执行HTTP重定向之前调用它。
// The arguments req and via are the upcoming request and the requests made already, oldest first.
// 参数 req 和 via 是即将到来的请求和已经发出的请求,最早的请求排在第一位。
// If CheckRedirect returns an error, the Client's Get method returns both the previous Response (with its Body closed) and CheckRedirect's error (wrapped in a url.Error) instead of issuing the Request req.
// 如果 CheckRedirect 返回错误,客户端的 Get 方法将返回之前的 Response(其Body已关闭)和 CheckRedirect 的错误(包裹在url.error中),而不是发出 Request 请求。
// As a special case, if CheckRedirect returns ErrUseLastResponse, then the most recent response is returned with its body unclosed, along with a nil error.
// 作为一种特殊情况,如果CheckRedirect返回ErrUseLastResponse,则返回最新的响应,其正文未关闭,同时返回nil错误。
//
// If CheckRedirect is nil, the Client uses its default policy, which is to stop after 10 consecutive requests.
// 如果CheckRedirect为nil,客户端将使用其默认策略,即在连续10个请求后停止
CheckRedirect func(req *Request, via []*Request) error

// Jar specifies the cookie jar.
//
// The Jar is used to insert relevant cookies into every outbound Request and is updated with the cookie values of every inbound Response. The Jar is consulted for every redirect that the Client follows.
// 一个 http.CookieJar 接口类型的对象,用于管理 HTTP cookie。默认情况下,http.DefaultCookieJar 使用 net/http/cookiejar 包中的默认cookie实现。
//
// If Jar is nil, cookies are only sent if they are explicitly set on the Request.
// 如果 Jar 为nil,则只有在请求中明确设置 Cookie 时才会发送 Cookie。
Jar CookieJar

// Timeout specifies a time limit for requests made by this Client. The timeout includes connection time, any redirects, and reading the response body. The timer remains running after Get, Head, Post, or Do return and will interrupt reading of the Response.Body.
// 超时指定此客户端发出的请求的时间限制。超时包括连接时间、任何重定向和读取响应正文。在Get、Head、Post或Do返回后,计时器仍在运行,并将中断响应的读取身体。
//
// A Timeout of zero means no timeout.
// 零表示没有超时。
//
// The Client cancels requests to the underlying Transport as if the Request's Context ended.
// 客户端取消对基础传输的请求,就像请求的上下文结束一样。
//
// For compatibility, the Client will also use the deprecated CancelRequest method on Transport if found. New RoundTripper implementations should use the Request's Context for cancellation instead of implementing CancelRequest.
// 为了兼容性,如果找到,客户端还将在传输上使用已弃用的CancelRequest方法。新的RoundTripper实现应使用Request的上下文进行取消,而不是实现CancelRequest。
Timeout time.Duration
}

Transport

src/net/http/transport.go 文件中的 Transport 实现了 RoundTripper 接口,该接口只有一个方法 RoundTrip(),Transport 的入口函数就是 RoundTrip()

Transport 的主要功能其实就是缓存了长连接,用于大量 http 请求场景下的连接复用,减少发送请求时 TCP(TLS) 连接建立的时间损耗,同时 Transport 还能对连接做一些限制,如连接超时时间,每个 host 的最大连接数等。Transport 对长连接的缓存和控制仅限于 TCP+(TLS)+HTTP1,不对 HTTP2 做缓存和限制。

Transport 包含如下几个主要概念:

  • 连接池:在 idleConn 中保存了不同类型(connectMethodKey)的请求连接(persistConn)。当发生请求时,首先会尝试从连接池中取一条符合其请求类型的连接使用
  • readLoop/writeLoop:连接之上的功能,循环处理该类型的请求(发送 request,返回 response)
  • roundTrip:请求的真正入口,接收到一个请求后会交给 writeLoop 和 readLoop 处理。

一对 readLoop/writeLoop 只能处理一条连接,如果这条连接上没有更多的请求,则关闭连接,退出循环,释放系统资源

RoundTripper 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type RoundTripper interface {
// RoundTrip executes a single HTTP transaction, returning
// a Response for the provided Request.
//
// RoundTrip should not attempt to interpret the response. In
// particular, RoundTrip must return err == nil if it obtained
// a response, regardless of the response's HTTP status code.
// A non-nil err should be reserved for failure to obtain a
// response. Similarly, RoundTrip should not attempt to
// handle higher-level protocol details such as redirects,
// authentication, or cookies.
//
// RoundTrip should not modify the request, except for
// consuming and closing the Request's Body. RoundTrip may
// read fields of the request in a separate goroutine. Callers
// should not mutate or reuse the request until the Response's
// Body has been closed.
//
// RoundTrip must always close the body, including on errors,
// but depending on the implementation may do so in a separate
// goroutine even after RoundTrip returns. This means that
// callers wanting to reuse the body for subsequent requests
// must arrange to wait for the Close call before doing so.
//
// The Request's URL and Header fields must be initialized.
RoundTrip(*Request) (*Response, error)
}

Transport

Transport 结构体定义

Transport 结构体中的主要成员如下(非全部):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Transport struct {
idleMu sync.Mutex
closeIdle bool // 用户请求关闭所有的闲置连接
idleConn map[connectMethodKey][]*persistConn // 每个host对应的闲置连接列表
idleConnWait map[connectMethodKey]wantConnQueue // 每个host对应的等待闲置连接列表,在其它request将连接放回连接池前先看一下这个队列是否为空,不为空则直接将连接交由其中一个等待对象
idleLRU connLRU // 用来清理过期的连接

reqMu sync.Mutex
reqCanceler map[*Request]func(error)

connsPerHostMu sync.Mutex
connsPerHost map[connectMethodKey]int // 每个host对应的等待连接个数
connsPerHostWait map[connectMethodKey]wantConnQueue // 每个host对应的等待连接列表

// 用于指定创建未加密的TCP连接的dial功能,如果该函数为空,则使用net包下的dial函数
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
Dial func(network, addr string) (net.Conn, error)
// 以下两个函数处理https的请求
DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
DialTLS func(network, addr string) (net.Conn, error)

DisableKeepAlives bool // 是否复用连接
DisableCompression bool // 是否压缩
MaxIdleConns int // 总的最大闲置连接的个数
MaxIdleConnsPerHost int // 每个host最大闲置连接的个数
MaxConnsPerHost int // 每个host的最大连接个数,如果已经达到该数字,dial连接会被block住
IdleConnTimeout time.Duration // 闲置连接的最大等待时间,一旦超过该时间,连接会被关闭
ResponseHeaderTimeout time.Duration // 读超时,从写完请求到接受到返回头的总时间
ExpectContinueTimeout time.Duration // Expect:100-continue两个请求间的超时时间
MaxResponseHeaderBytes int64 // 返回中header的限制
WriteBufferSize int // write buffer的使用量
ReadBufferSize int // read buffer的使用量
}


// 入口
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
return t.roundTrip(req)
}

Transport.roundTrip 入口

Transport.roundTrip 是入口,它通过传入一个 request 参数,由此选择一个合适的长连接来发送该 request 并返回 response。整个流程主要分为两步:

  • 使用 getConn 函数来获得底层 TCP(TLS)连接;
  • 调用 roundTrip 函数进行上层协议(HTTP)处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context() // 通过context控制请求的生命周期
trace := httptrace.ContextClientTrace(ctx) // 钩子函数,在请求的各个阶段可以指定回调函数

if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
// 下面判断request首部的有效性
if isHTTP {
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) { // header key 校验
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) { // header values 校验
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}
// 判断是否使用注册的RoundTrip来处理对应的scheme。对于使用tcp+tls+http1(wss协议升级)的场景
// 不能使用注册的roundTrip。后续代码对tcp+tls+http1或tcp+http1进行了roundTrip处理
if t.useRegisteredProtocol(req) {
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
if altRT := altProto[scheme]; altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
}
}

// 后续仅处理URL scheme为http或https的连接
if !isHTTP {
req.closeBody()
return nil, &badStringError{"unsupported protocol scheme", scheme}
}
if req.Method != "" && !validMethod(req.Method) {
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}

// 下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled)
// 的情况是不会进行重试的。具体参见shouldRetryRequest函数
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}

// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace}

// connectMethodForRequest 函数通过输入一个 request 返回一个connectMethod(简称cm),该类型通过 {proxyURL,targetScheme,tartgetAddr,onlyH1},即{代理URL,server端的scheme,server的地址,是否HTTP1} 来表示一个请求。一个符合connectMethod描述的request将会在Transport.idleConn中匹配到一类长连接。
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}

// 核心函数,获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。请求的发送与接受也是在这个函数内部实现的。
// 该连接可能是HTTP2格式的,存放在persistCnn.alt中,使用其自注册的RoundTrip处理。该函数描述参见下面内容。
// 从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。
pconn, err := t.getConn(treq, cm)
// 如果获取底层连接失败,无法继续上层协议的请求,直接返回错误
if err != nil {
// 每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置
t.setReqCanceler(req, nil)
req.closeBody()
return nil, err
}

var resp *Response
// pconn.alt就是从Transport.TLSNextProto中获取的,它表示TLS之上的协议,如HTTP2。从persistConn.alt的注释中可以看出目前alt仅支持HTTP2协议,后续可能会支持更多协议。
if pconn.alt != nil {
// HTTP2处理,使用HTTP2时,由于不缓存HTTP2连接,不对其做限制
t.decHostConnCount(cm.key())
// 清除getConn中设置的标记。具体参见getConn
t.setReqCanceler(req, nil)
resp, err = pconn.alt.RoundTrip(req)
} else {
// 核心函数。pconn.roundTrip 中做了比较复杂的处理,该函数用于发送request并返回response。
// 通过writeLoop发送request,通过readLoop返回response
resp, err = pconn.roundTrip(treq)
}
// 如果成功返回response,则整个处理结束.
if err == nil {
return resp, nil
}
// 判断该request是否满足重试条件,大部分场景是不支持重试的,仅有少部分情况支持,如 errServerClosedIdle
// err 非nil时实际并没有在原来的连接上重试,且pconn没有关闭,提了issue
if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done.
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
testHookRoundTripRetried()

// Rewind the body if we're able to.
// 用于重定向场景
if req.GetBody != nil {
newReq := *req
var err error
newReq.Body, err = req.GetBody()
if err != nil {
return nil, err
}
req = &newReq
}
}
}

Transport.getConn 获取一个有效连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 获取一个有效的连接
// 获取连接的方式有三种
// 1是从连接池中获取一个闲置连接
// 2是新创建一个连接
// 3是在等待新创建连接好期间,有别的请求释放了一个连接,则直接使用该连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}

w := &wantConn{ // 底层连接可以复用,但wantConn一定是每个request对应一个,需要使用它进行同步
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()

// 核心函数,从缓存池中获取到闲置连接,如果获取到则直接使用;否则在等待连接队列中进行注册
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// Trace only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(req, func(error) {})
return pc, nil
}

cancelc := make(chan error, 1)
t.setReqCanceler(req, func(err error) { cancelc <- err })

// 新创建一个连接,与此同时,它仍可能在等待其它正在使用的连接
t.queueForDial(w)

// Wait for completion or cancellation.
select {
case <-w.ready: // 连接已经创建好了,有以上三种情况
// Trace success but only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil { // 有错误发生
// If the request has been cancelled, that's probably
// what caused w.err; if so, prefer to return the
// cancellation error (see golang.org/issue/16049).
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err // 返回连接,此时连接是connect状态
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}

Transport.queueForIdleConn 从闲置连接池获取空闲连接。如果有空闲连接,delivered返回true;否则在等待连接队列里进行注册,这样如果之后有别的请求释放了一个连接,可以直接拿过来用。Transport.getConn 调用这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 该函数的主要作用是从连接池中获取一个闲置连接
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
t.idleMu.Lock()
defer t.idleMu.Unlock()

// 设置闲置期最长的时间点
var oldTime time.Time
if t.IdleConnTimeout > 0 {
oldTime = time.Now().Add(-t.IdleConnTimeout)
}

// 从最新的连接开始遍历
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
pconn := list[len(list)-1] // 获取连接池中最新的连接

// 下面这个函数设计得很巧妙,要知道我们在把一个连接放入连接池中时设置过过期timer,它会主动清理连接
// 但在程序实际运行的过程中,很可能timer到时时,该协程并没有被cpu给调度上来,如果还继续使用的话,一旦清理协程被调度上来,会照成逻辑错误
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
if tooOld {
go pconn.closeConnIfStillIdle() // 异步关闭连接
}
if pconn.isBroken() || tooOld {
// 如果连接损坏,则使用下一个连接
list = list[:len(list)-1]
continue
}
delivered = w.tryDeliver(pconn, nil)
if delivered { // 该连接被使用,如果delivered为false,表示已经从别的渠道接受到连接
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
}
if len(list) > 0 {
t.idleConn[w.key] = list
} else {
delete(t.idleConn, w.key)
}
if stop { // 1从连接池中获取到连接,2是在运行的过程中,创建新的连接已完成,这是也不需要在等待连接了
return delivered
}
}

if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.idleConnWait[w.key]
q.cleanFront() // 清理过期连接
q.pushBack(w) // 放入等待连接队列中
t.idleConnWait[w.key] = q
return false
}

Transport.queueForDial 创建一个新的连接。Transport.queueForIdleConn 调用这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 创建一个新的连接
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
if t.MaxConnsPerHost <= 0 { // 对每个host的连接数不做限制,直接创建
go t.dialConnFor(w) // 注意这里启动了一个协程,它的好处是如果在新创建连接的过程中,有一个连接被释放,可以直接使用被释放的连接,而不用一直等待
return
}

t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()

if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { // 不到最大连接数,直接创建新的连接
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}

// 此时连接已经达到设置的最大值,放入等待队列中
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w) // 放入等待列表中
t.connsPerHostWait[w.key] = q
}

Transport.dialConnFor 创建链接。Transport.queueForDial 调用这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// dialConnFor dials on behalf of w and delivers the result to w.
// dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()].
// If the dial is cancelled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()].
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()

pc, err := t.dialConn(w.ctx, w.cm) // 核心函数,创建连接
delivered := w.tryDeliver(pc, err) // 将新创建的连接赋值给w,如果返回false,表示已经从别的渠道获取到连接
if err == nil && (!delivered || pc.alt != nil) { // 赋值不成功,说明w已经在这期间从等待列表中获取到有效的连接
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc) // 将新创建的连接放入闲置连接池中,或者关闭掉
}
if err != nil {
t.decConnsPerHost(w.key) // 创建失败,更新统计信息
}
}

Transport.dialConn 底层创建链接。Transport.dialConnFor 调用这个函数

这个函数主要有三个作用:

  1. 是创建底层的socket,
  2. 是将socket与read buffer和write buffer关联起来,
  3. 是启动readLoop和writeLoop

需要注意的是一旦socket创建成功,readLoop和writeLoop会循环运行,服务不同的请求,直到退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
// Return a typed error, per Issue 16997
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
...
} else {
conn, err := t.dial(ctx, "tcp", cm.addr()) // 创建socket
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
}

pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}

Transport.tryPutIdleConn 用来将一条新创建或回收的连接放回连接池中,以便后续使用。与getIdleConnCh配合使用,后者用于获取一类连接对应的chan。在如下场景会将一个连接放回idleConn中

  • 在readLoop成功之后(当然还有其他判断,如底层链路没有返回EOF错误);
  • 创建一个新连接且新连接没有被使用时;
  • roundTrip一开始发现request被取消时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
// 当不使用长连接或该主机上的连接数小于0(即不允许缓存任何连接)时,返回错误并关闭创建的连接(此处没有做关闭处理,
// 但存在不适用的连接时必须关闭,如使用putOrCloseIdleConn)。
// 可以看出当不使用长连接时,Transport不能缓存连接
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
return errKeepAlivesDisabled
}
if pconn.isBroken() {
return errConnBroken
}
// 如果是HTTP2连接,则直接返回,不缓存该连接
if pconn.alt != nil {
return errNotCachingH2Conn
}
// 为新连接标记可重用状态,新创建的连接肯定是可以重用的,用于在Transport.roundTrip
// 中的shouldRetryRequest函数中判断连接是否可以重用
pconn.markReused()
// 该key对应Transport.idleConn中的key,标识特定的连接
key := pconn.cacheKey

t.idleMu.Lock()
defer t.idleMu.Unlock()
// idleConnCh中的chan元素用于存放可用的连接pconn,每类连接都有一个chan
waitingDialer := t.idleConnCh[key]
select {
// 如果此时有调用者等待一个连接,则直接将该连接传递出去,不进行保存,这种做法有利于提高效率
case waitingDialer <- pconn:
// We're done with this pconn and somebody else is
// currently waiting for a conn of this type (they're
// actively dialing, but this conn is ready
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
return nil
default:
// 如果没有调用者等待连接,则清除该chan。删除map中的chan直接会关闭该chan
if waitingDialer != nil {
// They had populated this, but their dial won
// first, so we can clean up this map entry.
delete(t.idleConnCh, key)
}
}
// 与DisableKeepAlives有点像,当用户需要关闭所有idle的连接时,不会再缓存连接
if t.wantIdle {
return errWantIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
idles := t.idleConn[key]
// 当主机上该类连接数超过Transport.MaxIdleConnsPerHost时,不能再保存新的连接,返回错误并关闭连接
if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdleHost
}
// 需要缓存的连接与连接池中已有的重复,系统退出(这种情况下系统已经发生了混乱,直接退出)
for _, exist := range idles {
if exist == pconn {
log.Fatalf("dup idle pconn %p in freelist", pconn)
}
}
// 添加待缓存的连接
t.idleConn[key] = append(idles, pconn)
t.idleLRU.add(pconn)
// 受MaxIdleConns的限制,添加策略变为:添加新的连接,删除最老的连接。
// MaxIdleConns限制了所有类型的idle状态的最大连接数目,而MaxIdleConnsPerHost限制了host上单一类型的最大连接数目
// idleLRU中保存了所有的连接,此处的作用为,找出最老的连接并移除
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
oldest := t.idleLRU.removeOldest()
oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest)
}
// 为新添加的连接设置超时时间
if t.IdleConnTimeout > 0 {
if pconn.idleTimer != nil {
// 如果该连接是被释放的,则重置超时时间
pconn.idleTimer.Reset(t.IdleConnTimeout)
} else {
// 如果该连接时新建的,则设置超时时间并设置超时动作pconn.closeConnIfStillIdle
// closeConnIfStillIdle用于释放连接,从Transport.idleLRU和Transport.idleConn中移除并关闭该连接
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
}
}

pconn.idleAt = time.Now()
return nil
}

persistConn

该类封装了底层连接,作为长链接的形式供请求使用

persistConn 结构体定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// persistConn wraps a connection, usually a persistent one
// (but may be used for non-keep-alive requests as well)
type persistConn struct {
t *Transport
cacheKey connectMethodKey // schema + host + uri
conn net.Conn // 底层连接
br *bufio.Reader // 连接的读buffer
bw *bufio.Writer // 连接的写buffer
nwrite int64
reqch chan requestAndChan // 作为persistConn.roundTrip和readLoop之间的同步,由roundTrip写,readLoop读
writech chan writeRequest // 作为persistConn.roundTrip和writeLoop之间的同步,由roundTrip写,writeLoop读
closech chan struct{} // 连接关闭的channel
sawEOF bool // 是否读完整请求内容,由readLoop负责
readLimit int64 // 读数据的最大值,由readLoop负责
writeErrCh chan error // writeLoop和readLoop之间的同步,用以判断该连接是否可以复用
writeLoopDone chan struct{} // writeLoop函数退出时关闭

// Both guarded by Transport.idleMu:
idleAt time.Time // 最后一次闲置的时间
idleTimer *time.Timer // 计数器,用来到期清理本连接

mu sync.Mutex // guards following fields
numExpectedResponses int // 连接的请求次数,大于1表示该连接被复用
closed error // 设置连接错误的原因
canceledErr error // 设置连接被取消的原因
broken bool // 在使用过程中被损坏
reused bool // 连接是否被复用
}

persistConn.roundTripTransport.roundTrip 调用这个函数

在获取到底层TCP(TLS)连接后在 roundTrip 中处理上层协议:即发送HTTP request,返回HTTP response。roundTrip给writeLoop提供request,从readLoop获取response。

一个 roundTrip 用于处理一类 request。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// 一次请求的发送与接受的实现
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
// 此处与getConn中的"t.setReqCanceler(req, func(error) {})"相对应,用于判断request是否被取消
// 返回false表示request被取消,不必继续后续请求,关闭连接并返回错误
if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.mu.Lock()
// 与readLoop配合使用,表示期望的响应的个数
pc.numExpectedResponses++
// dialConn中定义的函数,设置了proxy的认证信息
headerFn := pc.mutateHeaderFunc
pc.mu.Unlock()

if headerFn != nil {
headerFn(req.extraHeaders())
}

// Ask for a compressed version if the caller didn't set their
// own value for Accept-Encoding. We only attempt to
// uncompress the gzip stream if we were the layer that
// requested it.
requestedGzip := false
// 如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且
// 仅在调用者没有设置这些首部时设置
if !pc.t.DisableCompression &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// https://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
requestedGzip = true
req.extraHeaders().Set("Accept-Encoding", "gzip")
}

// 用于处理首部含"Expect: 100-continue"的request,客户端使用该首部探测服务器是否能够
// 处理request首部中的规格要求(如长度过大的request)。
var continueCh chan struct{}
if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
continueCh = make(chan struct{}, 1)
}
// HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会
// 新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置
// "Connection: close",则为request设置该首部。将底层表现与上层协议保持一致。
if pc.t.DisableKeepAlives && !req.wantsClose() {
req.extraHeaders().Set("Connection", "close")
}

// 用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞
gone := make(chan struct{})
defer close(gone)

defer func() {
if err != nil {
pc.t.setReqCanceler(req.Request, nil)
}
}()

const debugRoundTrip = false

// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
// 表示发送了多少个字节的request,debug使用
startBytesWritten := pc.nwrite
// 给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop
// 接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}

// 给readLoop封装并发送信息
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}

var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
// 该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse
// 退出roundtrip函数
for {
testHookWaitResLoop()
select {
// writeLoop返回发送request后的结果
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
// 设置一个接收response的定时器,如果在这段时间内没有接收到response(即没有进入下面代码
// 的"case re := <-resc:"分支),超时后进入""case <-respHeaderTimer:分支,关闭连接,
// 防止readLoop一直等待读取response,导致处理阻塞;没有超时则关闭定时器
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
// 处理底层连接关闭。"case <-cancelChan:"和”case <-ctxDoneChan:“为request关闭,request
// 关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。
case <-pc.closech:
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
// 等待获取response超时,关闭连接
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
// 接收到readLoop返回的response结果
case re := <-resc:
// 极异常情况,直接程序panic
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
// request取消
case <-cancelChan:
pc.t.CancelRequest(req.Request)
// 将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0)
cancelChan = nil
case <-ctxDoneChan:
pc.t.cancelRequest(req.Request, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
}

persistConn.writeLoop: 实际的写循环,用于发送 request 请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech: // 收到roundTrip的写命令
startBytesWritten := pc.nwrite
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) // 主函数,实际写http数据
if bre, ok := err.(requestBodyReadError); ok {
err = bre.error
wr.req.setError(err)
}
if err == nil {
err = pc.bw.Flush()
}
if err != nil {
wr.req.Request.closeBody()
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
pc.writeErrCh <- err // 通知readLoop,用于判断连接是否可以回收
wr.ch <- err // 通知persistConn.roundTrip,设定读response的timeout
if err != nil { // 有错误发生时不再复用连接
pc.close(err)
return
}
// 处理下一个请求
case <-pc.closech: // 连接被关闭
return
}
}
}

persistConn.waitForContinuepersistConn.writeLoop 调用这个函数

该函数用来支持Expect:100-continue功能,它返回一个闭包函数,它的作用是阻塞继续执行,直到以下三种情况发生:

  1. 是在readLoop接受到100-continue的返回结果,会向continueCh发送一个数据,
  2. 是在100-continue等待server返回结果超时的情况,这时会继续将body发送出去,
  3. 是连接关闭的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
if continueCh == nil {
return nil
}
return func() bool {
timer := time.NewTimer(pc.t.ExpectContinueTimeout) // 设定超时时间
defer timer.Stop()

select {
case _, ok := <-continueCh: // 如果readLoop接受到的消息表示不支持Expect:100-continue类型的请求,则会关闭channel
return ok
case <-timer.C: // 超时
return true
case <-pc.closech: // 连接被关闭
return false
}
}
}

write 实现了发送一次请求的实际操作。persistConn.writeLoop 调用这个函数

主要的看点是Expect:100-continue的处理方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
func (r *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, waitForContinue func() bool) (err error) {
trace := httptrace.ContextClientTrace(r.Context())
if trace != nil && trace.WroteRequest != nil {
defer func() {
trace.WroteRequest(httptrace.WroteRequestInfo{ // 完成写请求的回调函数
Err: err,
})
}()
}

// 确定host
host := cleanHost(r.Host)
if host == "" {
if r.URL == nil {
return errMissingHost
}
host = cleanHost(r.URL.Host)
}

host = removeZone(host)

ruri := r.URL.RequestURI()
if usingProxy && r.URL.Scheme != "" && r.URL.Opaque == "" {
ruri = r.URL.Scheme + "://" + host + ruri
} else if r.Method == "CONNECT" && r.URL.Path == "" {
// CONNECT requests normally give just the host and port, not a full URL.
ruri = host
if r.URL.Opaque != "" {
ruri = r.URL.Opaque
}
}
if stringContainsCTLByte(ruri) {
return errors.New("net/http: can't write control character in Request.URL")
}

var bw *bufio.Writer
if _, ok := w.(io.ByteWriter); !ok {
bw = bufio.NewWriter(w)
w = bw
}
// 开始写数据
_, err = fmt.Fprintf(w, "%s %s HTTP/1.1\r\n", valueOrDefault(r.Method, "GET"), ruri)
if err != nil {
return err
}

// Header lines
_, err = fmt.Fprintf(w, "Host: %s\r\n", host)
if err != nil {
return err
}
if trace != nil && trace.WroteHeaderField != nil {
trace.WroteHeaderField("Host", []string{host})
}

userAgent := defaultUserAgent
if r.Header.has("User-Agent") {
userAgent = r.Header.Get("User-Agent")
}
if userAgent != "" {
_, err = fmt.Fprintf(w, "User-Agent: %s\r\n", userAgent)
if err != nil {
return err
}
if trace != nil && trace.WroteHeaderField != nil {
trace.WroteHeaderField("User-Agent", []string{userAgent})
}
}

// Process Body,ContentLength,Close,Trailer
tw, err := newTransferWriter(r)
if err != nil {
return err
}
err = tw.writeHeader(w, trace)
if err != nil {
return err
}

err = r.Header.writeSubset(w, reqWriteExcludeHeader, trace)
if err != nil {
return err
}

if extraHeaders != nil {
err = extraHeaders.write(w, trace)
if err != nil {
return err
}
}

_, err = io.WriteString(w, "\r\n")
if err != nil {
return err
}
// 完成header
if trace != nil && trace.WroteHeaders != nil {
trace.WroteHeaders()
}

// Flush and wait for 100-continue if expected.
if waitForContinue != nil {
if bw, ok := w.(*bufio.Writer); ok {
err = bw.Flush() // 先发送header请求
if err != nil {
return err
}
}
if trace != nil && trace.Wait100Continue != nil {
trace.Wait100Continue()
}
if !waitForContinue() { // 等待server的返回,在以下两种情况下继续发送body,1是server返回ok,2是超时
r.closeBody()
return nil
}
}

if bw, ok := w.(*bufio.Writer); ok && tw.FlushHeaders {
if err := bw.Flush(); err != nil {
return err
}
}

// 写body
err = tw.writeBody(w)
if err != nil {
if tw.bodyReadError == err {
err = requestBodyReadError{err}
}
return err
}

if bw != nil {
return bw.Flush()
}
return nil
}

persistConn.readLoop 连接读循环,循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。当readLoop正常处理完一个response之后,会将连接重新放入到连接池中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()

tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil { // 将连接放回空闲连接池
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}

// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
eofc := make(chan struct{})
defer close(eofc) // unblock reader on errors

// Read this once, before loop starts. (to avoid races in tests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()

alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)

pc.mu.Lock()
if pc.numExpectedResponses == 0 { // 表示server端主动关闭连接
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()

rc := <-pc.reqch // 一次新的请求,和persistConn.roundTrip进行同步
trace := httptrace.ContextClientTrace(rc.req.Context())

var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace) // 读请求的具体实现
} else {
err = transportReadFromServerError{err}
closeErr = err
}

if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}

select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone: // 调用方放弃请求,链接关闭。之所以不再复用该连接,可能是避免未知原因造成下次请求的异常
return
}
return
}
pc.readLimit = maxInt64 // effectively no limit for response bodies

pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()

bodyWritable := resp.bodyIsWritable()
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}

if !hasBody || bodyWritable {
pc.t.setReqCanceler(rc.req, nil)

// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
// but after
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() && // 这个函数有两个作用,1是确保请求已经写完,因为不排除服务端还未完全接受到请求时就返回结果;2是判断此次请求在writeLoop中是否发生错误,如果有错误则连接失效
tryPutIdleConn(trace) // 将该连接放入连接池中

if bodyWritable {
closeErr = errCallerOwnsConn
}

select {
case rc.ch <- responseAndError{res: resp}: // 写response,会在persistConn.roundTrip函数中获取到该结果
case <-rc.callerGone:
return
}
continue // 处理下一个请求
}

waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil

},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}

resp.Body = body
if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}

select {
case rc.ch <- responseAndError{res: resp}: // 写response,会在persistConn.roundTrip函数中获取到该结果
case <-rc.callerGone:
return
}

// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancellation or death)
select {
case bodyEOF := <-waitForBodyRead:
pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace) // 将该连接放入连接池中
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.req, rc.req.Context().Err())
case <-pc.closech: // 连接被关闭,可能是lru被踢出出来了
alive = false
}

testHookReadLoopBeforeNextRead()
}
}

persistConn.readResponse 读取一次http请求返回,如果是header带“Expect:100-continue”的请求,则在正常情况下会收到两次server的返回结果。persistConn.readLoop 调用这个函数

关于Expect:100-continue的介绍可以参考https://www.jianshu.com/p/154c310748db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
if trace != nil && trace.GotFirstResponseByte != nil {
if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
trace.GotFirstResponseByte()
}
}
num1xx := 0 // number of informational 1xx headers received
const max1xxResponses = 5 // arbitrary bound on number of informational responses

continueCh := rc.continueCh
for {
resp, err = ReadResponse(pc.br, rc.req) // 读取server的返回请求结果
if err != nil {
return
}
resCode := resp.StatusCode
if continueCh != nil { // 表示header中带“Expect:100-continue”的请求
if resCode == 100 {
if trace != nil && trace.Got100Continue != nil { // 回调函数
trace.Got100Continue()
}
continueCh <- struct{}{} // 发送通知,让writeLoop可以继续发送请求的body
continueCh = nil // 重置channel
} else if resCode >= 200 { // 不支持Expect:100-continue,通知writeLoop取消发送
close(continueCh)
continueCh = nil // 重置channel
}
}
break
}
return
}

Reference


Go 标准库之 net.http 之 transport 源码解析
https://flepeng.github.io/021-Go-32-Go-标准库-Go-标准库之-net-http-之-transport-源码解析/
作者
Lepeng
发布于
2024年12月3日
许可协议