Maison > développement back-end > Golang > Go lance l'analyse du processus de requête HTTP2.0 (partie 2) - Trame de données et contrôle de flux

Go lance l'analyse du processus de requête HTTP2.0 (partie 2) - Trame de données et contrôle de flux

Libérer: 2023-07-21 16:10:19
avant
726 Les gens l'ont consulté
Trame de données

La plus petite unité de communication HTTP2 est une trame de données. Chaque trame contient deux parties : En-tête de trame et Charge utile. Les trames de différents flux de données peuvent être envoyées entrelacées (les trames du même flux de données doivent être envoyées séquentiellement), puis réassemblées en fonction de l'identifiant du flux de données dans chaque en-tête de trame.

Étant donné que la charge utile contient des données valides, seul l'en-tête de la trame est analysé et décrit.

En-tête de trame

La longueur totale de l'en-tête de trame est de 9 octets et contient quatre parties, à savoir :

  1. La longueur de la charge utile, qui occupe trois octets.

  2. Type de trame de données, occupant un octet.

  3. Identificateur de trame de données, occupant un octet.

  4. ID du flux de données, occupant quatre octets.

est représenté par un schéma comme suit :

Go lance l'analyse du processus de requête HTTP2.0 (partie 2) - Trame de données et contrôle de flux

Le format du bloc de données et la signification de chaque partie sont maintenant clairs, voyons donc comment lire un en-tête de cadre dans le code :

func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {
	_, err := io.ReadFull(r, buf[:http2frameHeaderLen])
if err != nil {
return http2FrameHeader{}, err
	}
return http2FrameHeader{
		Length:   (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
		Type:     http2FrameType(buf[3]),
		Flags:    http2Flags(buf[4]),
		StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
		valid:    true,
	}, nil
}
Copier après la connexion

Dans le code ci-dessus http2frameHeaderLen est une constante avec une valeur de 9. http2frameHeaderLen是一个常量,其值为9。

从io.Reader中读取9个字节后,将前三个字节和后四个字节均转为uint32的类型,从而得到Payload长度和数据流ID。另外需要理解的是帧头的前三个字节和后四个字节存储格式为大端(大小端笔者就不在这里解释了,请尚不了解的读者自行百度)。

数据帧类型

根据http://http2.github.io/http2-spec/#rfc.section.11.2描述,数据帧类型总共有10个。在go源码中均有体现:

const (
	http2FrameData         http2FrameType = 0x0
	http2FrameHeaders      http2FrameType = 0x1
	http2FramePriority     http2FrameType = 0x2
	http2FrameRSTStream    http2FrameType = 0x3
	http2FrameSettings     http2FrameType = 0x4
	http2FramePushPromise  http2FrameType = 0x5
	http2FramePing         http2FrameType = 0x6
	http2FrameGoAway       http2FrameType = 0x7
	http2FrameWindowUpdate http2FrameType = 0x8
	http2FrameContinuation http2FrameType = 0x9
)
Copier après la connexion

http2FrameData

Après avoir lu 9 octets depuis io.Reader, les trois premiers octets et les quatre derniers octets sont convertis en uint32 tapez pour obtenir la longueur de la charge utile et l'ID du flux de données. Une autre chose qu'il faut comprendre est que les trois premiers octets et les quatre derniers octets de l'en-tête de trame sont stockés au format big endian (je n'expliquerai pas le grand et le petit endian ici. Les lecteurs qui ne le comprennent pas sont invités à Baidu eux-mêmes).

Type de trame de données < /h5>

Selon http://http2.github.io/http2-spec/#rfc. section 11.2 Description, il existe un total de 10 types de trames de données. Cela se reflète dans le code source de go : 🎜

const (
// Data Frame
	http2FlagDataEndStream http2Flags = 0x1

// Headers Frame
	http2FlagHeadersEndStream  http2Flags = 0x1

// Settings Frame
	http2FlagSettingsAck http2Flags = 0x1
// 此处省略定义其他数据帧标识符的代码
)
Copier après la connexion
Copier après la connexion

http2FrameData : principalement utilisé pour envoyer le corps de la requête et recevoir une trame de données de réponse. 🎜

http2FrameHeaders:主要用于发送请求header和接收响应header的数据帧。

http2FrameSettings:主要用于client和server交流设置相关的数据帧。

http2FrameWindowUpdate : Trame de données principalement utilisée pour le contrôle de flux.

Les autres types de trames de données ne sont pas couverts dans cet article, ils ne seront donc pas décrits.

Identifiants de trames de données

Comme il existe de nombreux types d'identifiants de trames de données, l'auteur n'en présente que quelques-uns ici. Examinons d'abord le code source :

.
const (
// Data Frame
	http2FlagDataEndStream http2Flags = 0x1

// Headers Frame
	http2FlagHeadersEndStream  http2Flags = 0x1

// Settings Frame
	http2FlagSettingsAck http2Flags = 0x1
// 此处省略定义其他数据帧标识符的代码
)
Copier après la connexion
Copier après la connexion

http2FlagDataEndStream:在前篇中提到,调用(*http2ClientConn).newStream方法会创建一个数据流,那这个数据流什么时候结束呢,这就是http2FlagDataEndStream的作用。

当client收到有响应body的响应时(HEAD请求无响应body,301,302等响应也无响应body),一直读到http2FrameData数据帧的标识符为http2FlagDataEndStream则意味着本次请求结束可以关闭当前数据流。

http2FlagHeadersEndStream:如果读到的http2FrameHeaders数据帧有此标识符也意味着本次请求结束。

http2FlagSettingsAck:该标示符意味着对方确认收到http2FrameSettings数据帧。

流控制器

流控制是一种阻止发送方向接收方发送大量数据的机制,以免超出后者的需求或处理能力。Go中HTTP2通过http2flow结构体进行流控制:

type http2flow struct {
// n is the number of DATA bytes we&#39;re allowed to send.
// A flow is kept both on a conn and a per-stream.
	n int32

// conn points to the shared connection-level flow that is
// shared by all streams on that conn. It is nil for the flow
// that&#39;s on the conn directly.
	conn *http2flow
}
Copier après la connexion

字段含义英文注释已经描述的很清楚了,所以笔者不再翻译。下面看一下和流控制有关的方法。

(*http2flow).available

此方法返回当前流控制可发送的最大字节数:

func (f *http2flow) available() int32 {
	n := f.n
if f.conn != nil && f.conn.n < n {
		n = f.conn.n
	}
return n
}
Copier après la connexion
  • 如果f.conn为nil则意味着此控制器的控制级别为连接,那么可发送的最大字节数就是f.n

  • 如果f.conn不为nil则意味着此控制器的控制级别为数据流,且当前数据流可发送的最大字节数不能超过当前连接可发送的最大字节数。

(*http2flow).take

此方法用于消耗当前流控制器的可发送字节数:

func (f *http2flow) take(n int32) {
if n > f.available() {
panic("internal error: took too much")
	}
	f.n -= n
if f.conn != nil {
		f.conn.n -= n
	}
}
Copier après la connexion

通过实际需要传递一个参数,告知当前流控制器想要发送的数据大小。如果发送的大小超过流控制器允许的大小,则panic,如果未超过流控制器允许的大小,则将当前数据流和当前连接的可发送字节数-n

(*http2flow).add

有消耗就有新增,此方法用于增加流控制器可发送的最大字节数:

func (f *http2flow) add(n int32) bool {
	sum := f.n + n
if (sum > n) == (f.n > 0) {
		f.n = sum
return true
	}
return false
}
Copier après la connexion

上面的代码唯一需要注意的地方是,当sum超过int32正数最大值(2^31-1)时会返回false。

回顾:在前篇中提到的(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法均通过(*http2flow).add初始化可发送数据窗口大小。

有了帧和流控制器的基本概念,下面我们结合源码来分析总结流控制的具体实现。

(*http2ClientConn).readLoop

前篇分析(*http2Transport).newClientConn时止步于读循环,那么今天我们就从(*http2ClientConn).readLoop开始。

func (cc *http2ClientConn) readLoop() {
	rl := &http2clientConnReadLoop{cc: cc}
defer rl.cleanup()
	cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(http2ConnectionError); ok {
		cc.wmu.Lock()
		cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
		cc.wmu.Unlock()
	}
}
Copier après la connexion

由上可知,readLoop的逻辑比较简单,其核心逻辑在(*http2clientConnReadLoop).run方法里。

func (rl *http2clientConnReadLoop) run() error {
	cc := rl.cc
	rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
	gotReply := false // ever saw a HEADERS reply
	gotSettings := false
for {
		f, err := cc.fr.ReadFrame()
// 此处省略代码
		maybeIdle := false // whether frame might transition us to idle

switch f := f.(type) {
case *http2MetaHeadersFrame:
			err = rl.processHeaders(f)
			maybeIdle = true
			gotReply = true
case *http2DataFrame:
			err = rl.processData(f)
			maybeIdle = true
case *http2GoAwayFrame:
			err = rl.processGoAway(f)
			maybeIdle = true
case *http2RSTStreamFrame:
			err = rl.processResetStream(f)
			maybeIdle = true
case *http2SettingsFrame:
			err = rl.processSettings(f)
case *http2PushPromiseFrame:
			err = rl.processPushPromise(f)
case *http2WindowUpdateFrame:
			err = rl.processWindowUpdate(f)
case *http2PingFrame:
			err = rl.processPing(f)
default:
			cc.logf("Transport: unhandled response frame type %T", f)
		}
if err != nil {
if http2VerboseLogs {
				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
			}
return err
		}
if rl.closeWhenIdle && gotReply && maybeIdle {
			cc.closeIfIdle()
		}
	}
}
Copier après la connexion

由上可知,(*http2clientConnReadLoop).run的核心逻辑是读取数据帧然后对不同的数据帧进行不同的处理。

Les informations d'accusé de réception de

cc.fr.ReadFrame()会根据前面介绍的数据帧格式读出数据帧。

前篇中提到使用了一个支持h2协议的图片进行分析,本篇继续复用该图片对(*http2clientConnReadLoop).run方法进行debug。

收到http2FrameSettings数据帧

读循环会最先读到http2FrameSettings数据帧。读到该数据帧后会调用(*http2clientConnReadLoop).processSettings方法。(*http2clientConnReadLoop).processSettings主要包含3个逻辑。

1、判断是否是http2FrameSettings, si elles sont renvoyées directement, sinon continuez avec les étapes suivantes.

if f.IsAck() {
if cc.wantSettingsAck {
    cc.wantSettingsAck = false
return nil
  }
return http2ConnectionError(http2ErrCodeProtocol)
}
Copier après la connexion

2、处理不同http2FrameSettings的数据帧,并根据server传递的信息,修改maxConcurrentStreams等的值。

err := f.ForeachSetting(func(s http2Setting) error {
switch s.ID {
case http2SettingMaxFrameSize:
    cc.maxFrameSize = s.Val
case http2SettingMaxConcurrentStreams:
    cc.maxConcurrentStreams = s.Val
case http2SettingMaxHeaderListSize:
    cc.peerMaxHeaderListSize = uint64(s.Val)
case http2SettingInitialWindowSize:
if s.Val > math.MaxInt32 {
return http2ConnectionError(http2ErrCodeFlowControl)
    }
    delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
      cs.flow.add(delta)
    }
    cc.cond.Broadcast()
    cc.initialWindowSize = s.Val
default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
    cc.vlogf("Unhandled Setting: %v", s)
  }
return nil
})
Copier après la connexion

当收到ID为http2SettingInitialWindowSize的帧时,会调整当前连接中所有数据流的可发送数据窗口大小,并修改当前连接的initialWindowSize(每个新创建的数据流均会使用该值初始化可发送数据窗口大小)s.Val

3、发送http2FrameSettings的ack信息给server。

	cc.wmu.Lock()
defer cc.wmu.Unlock()

	cc.fr.WriteSettingsAck()
	cc.bw.Flush()
return cc.werr
Copier après la connexion

收到http2WindowUpdateFrame数据帧

在笔者debug的过程中,处理完http2FrameSettings数据帧后,紧接着就收到了http2WindowUpdateFrame数据帧。收到该数据帧后会调用(*http2clientConnReadLoop).processWindowUpdate方法:

func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
	cc := rl.cc
	cs := cc.streamByID(f.StreamID, false)
if f.StreamID != 0 && cs == nil {
return nil
	}

	cc.mu.Lock()
defer cc.mu.Unlock()

	fl := &cc.flow
if cs != nil {
		fl = &cs.flow
	}
if !fl.add(int32(f.Increment)) {
return http2ConnectionError(http2ErrCodeFlowControl)
	}
	cc.cond.Broadcast()
return nil
}
Copier après la connexion

上面的逻辑主要用于更新当前连接和数据流的可发送数据窗口大小。如果http2WindowUpdateFrame帧中的StreamID为0,则更新当前连接的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。

注意:在debug的过程,收到http2WindowUpdateFrame数据帧后,又收到一次http2FrameSettings,且该数据帧标识符为http2FlagSettingsAck

笔者在这里特意提醒,这是因为前篇中提到的(*http2Transport).NewClientConn方法,也向server发送了http2FrameSettings数据帧和http2WindowUpdateFrame数据帧。

Aussi, en traitement http2FrameSettingshttp2WindowUpdateFrame过程中,均出现了cc.cond.Broadcast()调用,该调用主要用于唤醒因为以下两种情况而Wait的请求:

  1. 因当前连接处理的数据流已经达到maxConcurrentStreams的上限(详见前篇中(*http2ClientConn).awaitOpenSlotForRequest方法分析)。

  2. 因发送数据流已达可发送数据窗口上限而等待可发送数据窗口更新的请求(后续会介绍)。

收到http2MetaHeadersFrame数据帧

收到此数据帧意味着某一个请求已经开始接收响应数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processHeaders :

func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
	cc := rl.cc
	cs := cc.streamByID(f.StreamID, false)
// 此处省略代码
	res, err := rl.handleResponse(cs, f)
if err != nil {
// 此处省略代码
		cs.resc <- http2resAndError{err: err}
return nil // return nil from process* funcs to keep conn alive
	}
if res == nil {
// (nil, nil) special case. See handleResponse docs.
return nil
	}
	cs.resTrailer = &res.Trailer
	cs.resc <- http2resAndError{res: res}
return nil
}
Copier après la connexion

首先我们先看cs.resc <- http2resAndError{res: res}这一行代码,向数据流写入http2resAndError即本次请求的响应。在(*http2ClientConn).roundTrip方法中有这样一行代码readLoopResCh := cs.resc

回顾:前篇(*http2ClientConn).roundTrip方法的第7点和本部分关联起来就可以形成一个完整的请求链。

接下来我们对rl.handleResponse方法展开分析。

(*http2clientConnReadLoop).handleResponse

(*http2clientConnReadLoop).handleResponse的主要作用是构建一个Response变量,下面对该函数的关键步骤进行描述。

1、构建一个Response变量。

header := make(Header)
res := &Response{
  Proto:      "HTTP/2.0",
  ProtoMajor: 2,
  Header:     header,
  StatusCode: statusCode,
  Status:     status + " " + StatusText(statusCode),
}
Copier après la connexion

2、构建header(本篇不对header进行展开分析)。

for _, hf := range f.RegularFields() {
  key := CanonicalHeaderKey(hf.Name)
if key == "Trailer" {
    t := res.Trailer
if t == nil {
      t = make(Header)
      res.Trailer = t
    }
    http2foreachHeaderElement(hf.Value, func(v string) {
      t[CanonicalHeaderKey(v)] = nil
    })
  } else {
    header[key] = append(header[key], hf.Value)
  }
}
Copier après la connexion

3、处理响应body的ContentLength。

streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
  res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
      res.ContentLength = clen64
    } else {
// TODO: care? unlike http/1, it won&#39;t mess up our framing, so it&#39;s
// more safe smuggling-wise to ignore.
    }
  } else if len(clens) > 1 {
// TODO: care? unlike http/1, it won&#39;t mess up our framing, so it&#39;s
// more safe smuggling-wise to ignore.
  }
}
Copier après la connexion

由上可知,当前数据流没有结束或者是HEAD请求才读取ContentLength。如果header中的ContentLength不合法则res.ContentLength的值为 -1

4、构建res.Body

cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req)

if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
  res.Header.Del("Content-Encoding")
  res.Header.Del("Content-Length")
  res.ContentLength = -1
  res.Body = &http2gzipReader{body: res.Body}
  res.Uncompressed = true
}
Copier après la connexion

根据Content-Encoding的编码方式,会构建两种不同的Body:

  1. 非gzip编码时,构造的res.Body类型为http2transportResponseBody

  2. gzip编码时,构造的res.Body类型为http2gzipReader

收到http2DataFrame数据帧

收到此数据帧意味着我们开始接收真实的响应,即平常开发中需要处理的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData

因为server无法及时知道数据流在client端的状态,所以server可能会向client中一个已经不存在的数据流发送数据:

cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
if cs == nil {
  cc.mu.Lock()
  neverSent := cc.nextStreamID
  cc.mu.Unlock()
// 此处省略代码
if f.Length > 0 {
    cc.mu.Lock()
    cc.inflow.add(int32(f.Length))
    cc.mu.Unlock()

    cc.wmu.Lock()
    cc.fr.WriteWindowUpdate(0, uint32(f.Length))
    cc.bw.Flush()
    cc.wmu.Unlock()
  }
return nil
}
Copier après la connexion

接收到的数据帧在client没有对应的数据流处理时,通过流控制器为当前连接可读窗口大小增加f.Length,并且通过http2FrameWindowUpdate数据帧告知server将当前连接的可写窗口大小增加f.Length

如果client有对应的数据流且f.Length大于0:

1、如果是head请求结束当前数据流并返回。

if cs.req.Method == "HEAD" && len(data) > 0 {
  cc.logf("protocol error: received DATA on a HEAD request")
  rl.endStreamError(cs, http2StreamError{
    StreamID: f.StreamID,
    Code:     http2ErrCodeProtocol,
  })
return nil
}
Copier après la connexion

2、检查当前数据流能否处理f.Length长度的数据。

cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
  cs.inflow.take(int32(f.Length))
} else {
  cc.mu.Unlock()
return http2ConnectionError(http2ErrCodeFlowControl)
}
Copier après la connexion

由上可知当前数据流如果能够处理该数据,通过流控制器调用cs.inflow.take减小当前数据流可接受窗口大小。

3、当前数据流被重置或者被关闭即cs.didReset为true时又或者数据帧有填充数据时需要调整流控制窗口。

var refund int
if pad := int(f.Length) - len(data); pad > 0 {
  refund += pad
}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {
  refund += len(data)
}
if refund > 0 {
  cc.inflow.add(int32(refund))
  cc.wmu.Lock()
  cc.fr.WriteWindowUpdate(0, uint32(refund))
if !didReset {
    cs.inflow.add(int32(refund))
    cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
  }
  cc.bw.Flush()
  cc.wmu.Unlock()
}
cc.mu.Unlock()
Copier après la connexion
  • 如果数据帧有填充数据则计算需要返还的填充数据长度。

  • 如果数据流无效该数据帧的长度需要全部返还。

最后,根据计算的refund增加当前连接或者当前数据流的可接受窗口大小,并且同时告知server增加当前连接或者当前数据流的可写窗口大小。

4、数据长度大于0且数据流正常则将数据写入数据流缓冲区。

if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil {
    rl.endStreamError(cs, err)
return err
  }
}
Copier après la connexion

回顾:前面的(*http2clientConnReadLoop).handleResponse方法中有这样一行代码res.Body = http2transportResponseBody{cs},所以在业务开发时能够通过Response读取到数据流中的缓冲数据。

(http2transportResponseBody).Read

在前面的内容里,如果数据流状态正常且数据帧没有填充数据则数据流和连接的可接收窗口会一直变小,而这部分内容就是增加数据流的可接受窗口大小。

因为篇幅和主旨的问题笔者仅分析描述该方法内和流控制有关的部分。

1、读取响应数据后计算当前连接需要增加的可接受窗口大小。

cc.mu.Lock()
defer cc.mu.Unlock()
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
  connAdd = http2transportDefaultConnFlow - v
  cc.inflow.add(connAdd)
}
Copier après la connexion

如果当前连接可接受窗口的大小已经小于http2transportDefaultConnFlow(1G)的一半,则当前连接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()

回顾http2transportDefaultConnFlow在前篇(*http2Transport).NewClientConn方法部分有提到,且连接刚建立时会通过http2WindowUpdateFrame数据帧告知server当前连接可发送窗口大小增加http2transportDefaultConnFlow

2、读取响应数据后计算当前数据流需要增加的可接受窗口大小。

if err == nil { // No need to refresh if the stream is over or failed.
// Consider any buffered body data (read from the conn but not
// consumed by the client) when computing flow control for this
// stream.
  v := int(cs.inflow.available()) + cs.bufPipe.Len()
if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
    streamAdd = int32(http2transportDefaultStreamFlow - v)
    cs.inflow.add(streamAdd)
  }
}
Copier après la connexion

如果当前数据流可接受窗口大小加上当前数据流缓冲区剩余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),则当前数据流可接受窗口大小需要增加http2transportDefaultStreamFlow - v

回顾http2transportDefaultStreamFlow在前篇(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法中均有提到。

连接刚建立时,发送http2FrameSettings数据帧,告知server每个数据流的可发送窗口大小为http2transportDefaultStreamFlow

newStream时,数据流默认的可接收窗口大小为http2transportDefaultStreamFlow

3、将连接和数据流分别需要增加的窗口大小通过http2WindowUpdateFrame数据帧告知server。

if connAdd != 0 || streamAdd != 0 {
  cc.wmu.Lock()
defer cc.wmu.Unlock()
if connAdd != 0 {
    cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
  }
if streamAdd != 0 {
    cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
  }
  cc.bw.Flush()
}
Copier après la connexion

以上就是server向client发送数据的流控制逻辑。

(*http2clientStream).writeRequestBody

前篇中(*http2ClientConn).roundTrip未对(*http2clientStream).writeRequestBody进行分析,下面我们看看该方法的源码:

func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
	cc := cs.cc
	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
// 此处省略代码
	req := cs.req
	hasTrailers := req.Trailer != nil
	remainLen := http2actualContentLength(req)
	hasContentLen := remainLen != -1

var sawEOF bool
for !sawEOF {
		n, err := body.Read(buf[:len(buf)-1])
// 此处省略代码
		remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
			allowed, err = cs.awaitFlowControl(len(remain))
switch {
case err == http2errStopReqBodyWrite:
return err
case err == http2errStopReqBodyWriteAndCancel:
				cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
return err
case err != nil:
return err
			}
			cc.wmu.Lock()
			data := remain[:allowed]
			remain = remain[allowed:]
			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
			err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
				err = cc.bw.Flush()
			}
			cc.wmu.Unlock()
		}
if err != nil {
return err
		}
	}
// 此处省略代码
return err
}
Copier après la connexion

上面的逻辑可简单总结为:不停的读取请求body然后将读取的内容通过cc.fr.WriteData转为http2FrameData数据帧发送给server,直到请求body读完为止。其中和流控制有关的方法是awaitFlowControl,下面我们对该方法进行分析。

(*http2clientStream).awaitFlowControl

此方法的主要作用是等待当前数据流可写窗口有容量能够写入数据。

func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
	cc := cs.cc
	cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, http2errClientConnClosed
		}
if cs.stopReqBody != nil {
return 0, cs.stopReqBody
		}
if err := cs.checkResetOrDone(); err != nil {
return 0, err
		}
if a := cs.flow.available(); a > 0 {
			take := a
if int(take) > maxBytes {

				take = int32(maxBytes) // can&#39;t truncate int; take is int32
			}
if take > int32(cc.maxFrameSize) {
				take = int32(cc.maxFrameSize)
			}
			cs.flow.take(take)
return take, nil
		}
		cc.cond.Wait()
	}
}
Copier après la connexion

根据源码可以知道,数据流被关闭或者停止发送请求body,则当前数据流无法写入数据。当数据流状态正常时,又分为两种情况:

  1. 当前数据流可写窗口剩余可写数据大于0,则计算可写字节数,并将当前数据流可写窗口大小消耗take

  2. 当前数据流可写窗口剩余可写数据小于等于0,则会一直等待直到被唤醒并进入下一次检查。

上面的第二种情况在收到http2WindowUpdateFrame数据帧这一节中提到过。

Une fois que le serveur a lu les données du flux de données actuel, il enverra la trame http2WindowUpdateFrame数据帧,client收到该数据帧后会增大对应数据流可写窗口,并执行cc.cond.Broadcast()唤醒因发送数据已达流控制上限而等待的数据流继续发送数据。

以上就是client向server发送数据的流控制逻辑。

总结

  1. 帧头长度为9个字节,并包含四个部分:Payload的长度、帧类型、帧标识符和数据流ID。

  2. 流控制可分为两个步骤:

  • 初始时,通过http2FrameSettings数据帧和http2WindowUpdateFrame数据帧告知对方当前连接读写窗口大小以及连接中数据流读写窗口大小。

  • 在读写数据过程中,通过发送http2WindowUpdateFramedata au flux de données correspondant du client pour contrôler la taille de la fenêtre d'écriture de l'autre extrémité.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:Go语言进阶学习
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal