Go BIO/NIO探讨(4):net/http 在 tcp conn 上的处理

Go语言精选

共 10981字,需浏览 22分钟

 · 2023-02-04

Go net库中 net.Listen("tcp", addr) 方法通过系统调用 socket、bind、listen 生成 net.Listener 对象,在后面的for 循环中,通过系统调用 accept 等待新的tcp conn,将其包装成一个 conn 对象,在新的 goroutine 中对这个conn进行处理。对应的代码在 net/http/server.go 中:

func (srv *Server) Serve(l net.Listener) error {
// ... 省略一部分代码
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
// ... 省略一部分代码
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}

通过Accept方法获取 rw net.Conn 和 err 以后,首先是错误检测。错误检测分为这几个条件:

  1. doneChan chan struct{} 已经被关闭,意味着Server已经停止服务

  2. 针对 tempory network error,进行重试。delay interval从5ms开始,每次增大一倍,最长是1s

  3. 对于 其他类型错误,Server直接退出

没有错误的情况下,变量 rw 的类型是 interface net.Conn。net.Conn 代表一个基于流的网络链接,它定义了一组方法,支持从一个conn并发地读写数据。对于 tcp server 而言,这里的 l 是 TCPListener,rw 的底层类型(underlying type) 是 *net.TCPConn。net.TCPConn 的定义是:

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

type conn struct {
fd *netFD
}

结合TCPListener的accept方法,可以理解为net.TCPConn是对处于ESTABLISHED状态套接字的封装。

func (ln *TCPListener) accept() (*TCPConn, error) {
// fd 是ESTABLISHED conn 的套接字
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

rw 作为一个有效的 net.TCPConn,结构体Server的newConn方法将其封装在一个 http.conn 结构体内。同时封装进去的还有 Server 对象本身,因为它的成员变量 Handler 可以由Web框架传入(比如Gin),以支持定制化的业务逻辑。newConn方法的定义如下:

// Create new connection from rwc.
func (srv *Server) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc: rwc,
}
if debugServerConnections {
c.rwc = newLoggingConn("server", c.rwc)
}
return c
}

net/http.conn 的方法serve包含了初始化、for循环里读取request数据、处理request、发送response到套接字缓冲区的全部逻辑,这块逻辑的细节是本文的核心。

在此之前,我们先看 struct net/http.conn 的结构:

// A conn represents the server side of an HTTP connection.
type conn struct {
// server is the server on which the connection arrived.
// Immutable; never nil.
server *Server

// cancelCtx cancels the connection-level context.
cancelCtx context.CancelFunc

// rwc is the underlying network connection.
// This is never wrapped by other types and is the value given out
// to CloseNotifier callers. It is usually of type *net.TCPConn or
// *tls.Conn.
rwc net.Conn

// remoteAddr is rwc.RemoteAddr().String(). It is not populated synchronously
// inside the Listener's Accept goroutine, as some implementations block.
// It is populated immediately inside the (*conn).serve goroutine.
// This is the value of a Handler's (*Request).RemoteAddr.
remoteAddr string

// tlsState is the TLS connection state when using TLS.
// nil means not TLS.
tlsState *tls.ConnectionState

// werr is set to the first write error to rwc.
// It is set via checkConnErrorWriter{w}, where bufw writes.
werr error

// r is bufr's read source. It's a wrapper around rwc that provides
// io.LimitedReader-style limiting (while reading request headers)
// and functionality to support CloseNotifier. See *connReader docs.
r *connReader

// bufr reads from r.
bufr *bufio.Reader

// bufw writes to checkConnErrorWriter{c}, which populates werr on error.
bufw *bufio.Writer

// lastMethod is the method of the most recent request
// on this connection, if any.
lastMethod string

curReq atomic.Value // of *response (which has a Request in it)

curState struct{ atomic uint64 } // packed (unixtime<<8|uint8(ConnState))

// mu guards hijackedv
mu sync.Mutex

// hijackedv is whether this connection has been hijacked
// by a Handler with the Hijacker interface.
// It is guarded by mu.
hijackedv bool
}

http.conn 的serve方法从套接字读写数据,必须依赖一些成员变量:

  1. 尝试从 conn 读取数据时,如果没有数据到来,当前的goroutine会被休眠;struct connReader 封装了读取socket缓冲区到用户态buffer的逻辑

  2. 尝试向 conn 写入数据时,如果缓冲区已满,当前的goroutine会被休眠;struct checkConnErrorWriter 封装了把数据从用户态buffer写入socket缓冲区的逻辑

serve方法里for循环开始之前,需要设置好 bufio.Reader 和 bufio.Writer

c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)

bufr 和 bufw 分别封装了对tcp conn 的读写,读写过程支持buffer,读取时可以减少数据拷贝,写入时可以攒一批一次性写入。从打流程。

在serve方法里,for循环内的逻辑如下:

  1. conn.readRequest 从 bufr 读取request数据,返回封装好的 *http.response 实例

  2. 校验错误信息、处理 Expect 100 Continue 等情况

  3. ServeHTTP 处理业务逻辑

  4. w.finishRequest 把 bufw 里的数据flush写入套接字缓冲区(借助于checkConnErrorWriter.Read)

  5. 清除状态信息

  6. keepalive支持

后面我们对这六部分展开聊一下。

第一步: net/http.conn.readRequest

代码如下:

// net/http/server.go
w, err := c.readRequest(ctx)

conn.readRequest方法既包含了从套接字缓冲区读数据,也包含了利用http解析数据的逻辑。其第一个返回值 w 的类型是 *http.response。下面的代码截取了一段逻辑:

// net/http/server.go
// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
// ... 省略部分代码
req, err := readRequest(c.bufr)
// ... 省略部分代码
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),

// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose: req.wantsClose(),
}
if isH2Upgrade {
w.closeAfterReply = true
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
return w, nil
}

可以发现 conn.readRequest 总共可以拆分成三部分:

  1. 从 bufr 读取数据,并解析称 http.Request (可以参考http协议格式读代码)

  2. 封装 http.response 对象,http.Request 也会被封装进去。

  3. 设置 bufio.Writer,以便向套接字缓冲区写入数据

第二步: 校验错误信息,处理Expect 100 Continue

代码如下:

// net/http/server.go

if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"

switch {
case err == errTooLarge:
// ... 431 Request Header Fields Too Large
case isUnsupportedTEError(err):
// ... 501 Unsupported transfer encoding
case isCommonNetReadError(err):
return // don't reply

default:
// 400 Bad Request
}

// ... 100 Expect Continue 的处理
}

需要处理的错误情况有四类,前三类server端都会响应一个错误码,最后一种server端不做任何响应:

  1. code 431: request too large

  2. code 501: not implemented

  3. code 400: bad request (无法正确解析request)

  4. client 丢失或失联导致的错误,server端不做任何响应。比如:

    • io.EOF

    • timeout net.Error

    • read net.OpError

关于 Expect 100 Continue,判断逻辑是 r.Header.get("Expect") 中包含 "100-continue" 字符串。对于这种情况,会启动一个goroutine 执行background read。

第三步: ServeHTTP

代码如下:

// net/http/server.go

serverHandler{c.server}.ServeHTTP(w, w.req)

ServeHTTP 是 interface http.Handler 的唯一方法,定义如下:

type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

显而易见 结构体 http.response 实现了 interface http.Handler。

从 Web框架的角度来看,它会实现http.Handler,ServeHTTP里会包含请求粒度的处理逻辑,比如路由分发(服务启动时绑定路由)、http.Request转化成框架request、中间件支持等。gin框架对应的是 struct gin.Engine。

看这段代码的话,每次请求都会创建一个新的 serverHandler 对象,它有两个职能:

  1. 选择一个兜底的路由分发器, 取决于 http.Server 的 Handler字段 是否为 nil

  2. 做一些URL的合法性校验

// serverHandler delegates to either the server's Handler or
// DefaultServeMux and also handles "OPTIONS *" requests.
type serverHandler struct {
srv *Server
}

func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}

if req.URL != nil && strings.Contains(req.URL.RawQuery, ";") {
var allowQuerySemicolonsInUse int32
req = req.WithContext(context.WithValue(req.Context(), silenceSemWarnContextKey, func() {
atomic.StoreInt32(&allowQuerySemicolonsInUse, 1)
}))
defer func() {
if atomic.LoadInt32(&allowQuerySemicolonsInUse) == 0 {
sh.srv.logf("http: URL query contains semicolon, which is no longer a supported separator; parts of the query may be stripped when parsed; see golang.org/issue/25192")
}
}()
}

handler.ServeHTTP(rw, req)
}

对于Gin框架而言, 上面这段代码里的 srv.Handler 的底层类型是 *gin.Engine 或 *gin.h2cHandler (for http/2)。
对于下面这个情况,srv.Handler 是nil,所以使用了兜底逻辑 http.DefaultServeMux:

// 来源: https://pkg.go.dev/net/http
http.Handle("/foo", fooHandler)

http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %q", html.EscapeString(r.URL.Path))
})

log.Fatal(http.ListenAndServe(":8080", nil))

第四步: w.finishRequest

w.finishRequest 把 bufw 里的数据flush写入套接字缓冲区(借助于checkConnErrorWriter.Read), 代码如下:

func (w *response) finishRequest() {
// ... 省略部分代码

// bufio.Writer.Flush 触发 chunkWriter.Write
w.w.Flush()
// 回收 bufio.Writer 对象占用的内存
putBufioWriter(w.w)
// chunkWriter.close 触发 conn.bufw.Write
w.cw.close()
// conn.bufw.Flush 触发 checkConnErrorWriter.Write
// checkConnErrorWriter 是对 tcp conn 的封装
w.conn.bufw.Flush()

// ... 省略部分代码
}

由于变量名过于简单,这段代码读起来比较费劲。在这个篇幅里很难讲清楚,我把相关的结构体和变量列出来,有兴趣的朋友结合上面的注释和net库的代码一起看吧。

  1. bufio.Writer

  2. http.chunkWriter

  3. http.response

  4. http.conn

提到(底层)类型,运行时涉及到的一些变量是:

  1. w: *http.response

  2. w.w: *bufio.Writer{wr: chunkWriter{res: *http.response}, buf:[]byte}

  3. w.cr: chunkWriter

  4. w.conn: *http.conn{server: *Server, rwc: net.Conn, bufw: *bufio.Writer, ...}

  5. w.conn.bufw: *bufio.Writer{wr: checkConnErrorWriter{c: *http.conn}, buf: []byte}

第五步: 清除状态信息

代码如下:

if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle, runHooks)
c.curReq.Store((*response)(nil))

这里我们只提一点: 如果不需要复用tcp conn, server端会主动触发系统调用shutdown,并sleep 500ms。值得注意的是,在进行系统调用时,设置了flag syscall.SHUT_WR,它做的事情是把缓冲区的事情发送给client,然后发送一个FIN报文要求结束。相对于 Close关闭连接,Shutdown提供了一种更为弹性的方式。

func (fd *netFD) closeWrite() error {
return fd.shutdown(syscall.SHUT_WR)
}

第六步: keepalive支持、刷新timeout

代码:

if !w.conn.server.doKeepAlives() {
// We're in shutdown mode. We might've replied
// to the user without "Connection: close" and
// they might think they can send another
// request, but such is life with HTTP/1.1.
return
}

if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
c.rwc.SetReadDeadline(time.Time{})

这里做两件事:

  1. 如果不需要 keepalive,则退出

  2. 如果需要keepalive,则重置READ的超时截止时间

tcp conn 生命周期的结束

通常情况下,tcp conn 服务一段时间之后,就完成了它的使命。

一个例外是,conn被client端劫持 (hijacked),这时候conn的生命周期的管理权就转移了。

tcp conn 内部是一个处于ESTABLISHED状态的套接字。

对于serve方法,在 for循环启动之前,已经通过 defer 声明了serve结束后的清理逻辑。核心功能是:

  1. panic recovery

  2. conn 级别的变量需要清理

  3. conn 被正确地关闭

defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
if inFlightResponse != nil {
inFlightResponse.cancelCtx()
}
if !c.hijacked() {
if inFlightResponse != nil {
inFlightResponse.conn.r.abortPendingRead()
inFlightResponse.reqBody.Close()
}
c.close()
c.setState(c.rwc, StateClosed, runHooks)
}
}()

关于Linux系统调用的详细说明,点击左下角查看原文


推荐阅读


福利

我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

浏览 62
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报