流量控制-滑动窗口

Catalogue
  1. 1. @接受窗口
  2. 2. @发送窗口
  3. 3. @Silly Window Syndrome
  4. 4. @发送窗口的维护
  5. 5. @接受窗口的维护

接收端在给发送端回 ACK 中会汇报自己的 AdvertisedWindow,而发送方会根据这个窗口来控制发送数据的大小,以保证接收方可以处理。
要明确的是滑动窗口分为两个窗口,接收窗口和发送窗口

@接受窗口

接收窗口不仅可以限制发送端发送的速率,还可以提高效率,因为接收窗口的机制,可以允许发送端一次多发送几个片段,而不必等候 ACK,而且可以允许等待一定情况下的乱序, 比如说先缓存提前到的数据,然后去等待需要的数据。

接收的窗口可以分为四段:

  • 数据已经被 tcp 确认,但用户程序还未读取数据内容
  • 中间还有些数据没有到达
  • 数据已经接收到,但 tcp 未确认
  • 通告窗口,也就是接收端在给发送端回 ACK 中会汇报自己的窗口大小

当接收端接收到数据包时,会判断该数据包的序列号是不是在接收窗口內,如果不在窗口內会立即回一个 ack 给发送端, 且丢弃该报文。
滑动: 当用户程序读取接收窗口的内容后,窗口向右滑行

@发送窗口

发送窗口的值是由接收窗口和拥塞窗口一起决定的,发送窗口的大小也决定了发送的速率。

发送窗口的上限值 = Min [rwnd, cwnd],cwnd 拥塞窗口

f发送窗口可以分成四段:

  • 已收到 ack 确认的数据
  • 已经发送,但还没收到 ack 的数据
  • 在窗口中还没有发出的(接收方还有空间)
  • 窗口以外的数据(接收方没空间)
    滑动: 当发送端收到数据 ack 确认时,窗口向右滑

如果一个处理缓慢的 Server(接收端)是怎么把 Client(发送端)的TCP Sliding Window给降成 0 的。此时,你一定会问,如果 Window 变成 0 了,TCP 会怎么样?是不是发送端就不发数据了?是的,发送端就不发数据了,你可以想像成“Window Closed”,那你一定还会问,如果发送端不发数据了,接收方一会儿 Window size 可用了,怎么通知发送端呢?

1.当接收方的应用程序读取了接收缓冲区中的数据以后,接收方会发送一个 ACK,通过通告窗口字段告诉发送方自己又可以接收数据了,发送方收到这个 ACK 之后,就知道自己可以继续发送数据了。

2.同时发送端使用了Zero Window Probe技术,缩写为 ZWP,当接收方的接收窗口为 0 时,每隔一段时间,发送方会主动发送探测包,迫使对端响应来得知其接收窗口有无打开。

既然接收端会主动通知发送端,为何还需要发送端定时探测?

@Silly Window Syndrome

Silly Window Syndrome翻译成中文就是“糊涂窗口综合症”。正如你上面看到的一样,如果我们的接收方太忙了,来不及取走 Receive Windows 里的数据,那么,就会导致发送方越来越小。到最后,如果接收方腾出几个字节并告诉发送方现在有几个字节的 window,而我们的发送方会义无反顾地发送这几个字节。

要知道,我们的 TCP+IP 头有 40 个字节,为了几个字节,要达上这么大的开销,这太不经济了。

所以,Silly Windows Syndrome这个现像就像是你本来可以坐 200 人的飞机里只做了一两个人。要解决这个问题也不难,就是避免对小的 window size 做出响应,直到有足够大的 window size 再响应,这个思路可以同时实现在 sender 和 receiver 两端。

如果这个问题是由 Receiver 端引起的,那么就会使用David D Clark’s方案。在 receiver 端,如果收到的数据导致window size小于某个值,可以直接 ack(0)回 sender,这样就把 window 给关闭了,也阻止了 sender 再发数据过来,等到 receiver 端处理了一些数据后windows size大于等于了 MSS,或者,receiver buffer有一半为空,就可以把 window 打开让 sender 发送数据过来。

如果这个问题是由 Sender 端引起的,那么就会使用著名的Nagle’s algorithm。这个算法的思路也是延时处理,他有两个主要的条件:

要等到 Window Size >= MSS 或是 Data Size >= MSS
收到之前发送数据的 ack 回包,他才会发数据,否则就是在攒数据

@发送窗口的维护

1
2
3
4
5
6
7
8
9
10
                     +-------> sndWnd <-------+
| |
---------------------+-------------+----------+--------------------
| acked | * * * * * * | # # # # #| unable send
---------------------+-------------+----------+--------------------
^ ^
| |
sndUna sndNxt
***** in flight data
##### able send date

发送窗口主要维护这些变量,sndBufSize、sndBufUsed、sndUna、sndNxt 和 sndWnd。sndUna 表示是下一个未确认的序列号,sndNxt 是要发送的下一个段的序列号,sndWnd 是接受端通告的窗口大小。 首先是处理接收方的窗口通告,当收到报文时,一定会带接收窗口和确认号,此时先更新发送器的发送窗口大小为接收窗口大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Write writes data to the endpoint's peer.
// 接收上层的数,通过tcp连接发送到对端
func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, <-chan struct{}, *tcpip.Error) {
// Linux completely ignores any address passed to sendto(2) for TCP sockets
// (without the MSG_FASTOPEN flag). Corking is unimplemented, so opts.More
// and opts.EndOfRecord are also ignored.

e.mu.RLock()
defer e.mu.RUnlock()

// The endpoint cannot be written to if it's not connected.
// 判断tcp状态,必须已经建立了连接才能发送数据
if e.state != stateConnected {
switch e.state {
case stateError:
return 0, nil, e.hardError
default:
return 0, nil, tcpip.ErrClosedForSend
}
}

// Nothing to do if the buffer is empty.
// 检查负载的长度,如果为0,直接返回
if p.Size() == 0 {
return 0, nil, nil
}

e.sndBufMu.Lock()

// Check if the connection has already been closed for sends.
if e.sndClosed {
e.sndBufMu.Unlock()
return 0, nil, tcpip.ErrClosedForSend
}

// Check against the limit.
// tcp流量控制:未被占用发送缓存还剩多少,如果发送缓存已经被用光了,返回 ErrWouldBlock
avail := e.sndBufSize - e.sndBufUsed
if avail <= 0 {
e.sndBufMu.Unlock()
return 0, nil, tcpip.ErrWouldBlock
}

v, perr := p.Get(avail)
if perr != nil {
e.sndBufMu.Unlock()
return 0, nil, perr
}

var err *tcpip.Error
if p.Size() > avail {
err = tcpip.ErrWouldBlock
}
l := len(v)
s := newSegmentFromView(&e.route, e.id, v)

// Add data to the send queue.
// 插入发送队列
e.sndBufUsed += l
e.sndBufInQueue += seqnum.Size(l)
e.sndQueue.PushBack(s)

e.sndBufMu.Unlock()

// 发送数据,最终会调用 sender sendData 来发送数据。
if e.workMu.TryLock() {
// Do the work inline.
e.handleWrite()
e.workMu.Unlock()
} else {
// Let the protocol goroutine do the work.
e.sndWaker.Assert()
}
return uintptr(l), nil, err
}

// 收到tcp段时调用 handleRcvdSegment; 它负责更新与发送相关的状态。
func (s *sender) handleRcvdSegment(seg *segment) {
...

// 存放当前窗口大小。
s.sndWnd = seg.window

// 获取确认号
ack := seg.ackNumber
// 如果ack在最小未确认的seq和下一seg的seq之间
if (ack - 1).InRange(s.sndUna, s.sndNxt) {
...
// Remove all acknowledged data from the write list.
acked := s.sndUna.Size(ack)
s.sndUna = ack

ackLeft := acked
originalOutstanding := s.outstanding
for ackLeft > 0 {
// We use logicalLen here because we can have FIN
// segments (which are always at the end of list) that
// have no data, but do consume a sequence number.
seg := s.writeList.Front()
datalen := seg.logicalLen()

if datalen > ackLeft {
seg.data.TrimFront(int(ackLeft))
break
}

if s.writeNext == seg {
s.writeNext = seg.Next()
}
s.writeList.Remove(seg)
s.outstanding--
seg.decRef()
ackLeft -= datalen
}

// Update the send buffer usage and notify potential waiters.
s.ep.updateSndBufferUsage(int(acked))

...
}

...
}

@接受窗口的维护

接收窗口主要维护这几个变量,rcvBufSize、rcvBufUsed、rcvNxt 和 rcvAcc,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// tcp流量控制:计算未被占用的接收缓存大小
func (e *endpoint) receiveBufferAvailable() int {
e.rcvListMu.Lock()
size := e.rcvBufSize
used := e.rcvBufUsed
e.rcvListMu.Unlock()

// We may use more bytes than the buffer size when the receive buffer
// shrinks.
if used >= size {
return 0
}

return size - used
}

func (e *endpoint) receiveBufferSize() int {
e.rcvListMu.Lock()
size := e.rcvBufSize
e.rcvListMu.Unlock()

return size
}

// zeroReceiveWindow 根据可用缓冲区的数量和接收窗口缩放,检查现在要宣布的接收窗口是否为零。
func (e *endpoint) zeroReceiveWindow(scale uint8) bool {
if e.rcvBufUsed >= e.rcvBufSize {
return true
}

return ((e.rcvBufSize - e.rcvBufUsed) >> scale) == 0
}

// tcp流量控制:判断 segSeq 在窗口內
func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool {
rcvWnd := r.rcvNxt.Size(r.rcvAcc)
if rcvWnd == 0 {
return segLen == 0 && segSeq == r.rcvNxt
}

return segSeq.InWindow(r.rcvNxt, rcvWnd) ||
seqnum.Overlap(r.rcvNxt, rcvWnd, segSeq, segLen)
}

// tcp流量控制:当接收窗口从零增长到非零时,调用 nonZeroWindow;在这种情况下,
// 我们可能需要发送一个 ack,以便向对端表明它可以恢复发送数据。
func (r *receiver) nonZeroWindow() {
if (r.rcvAcc-r.rcvNxt)>>r.rcvWndScale != 0 {
// We never got around to announcing a zero window size, so we
// don't need to immediately announce a nonzero one.
return
}

// Immediately send an ack.
r.ep.snd.sendAck()
}

// 从tcp的接收队列中读取数据,并从接收队列中删除已读数据
func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) {
if e.rcvBufUsed == 0 {
if e.rcvClosed || e.state != stateConnected {
return buffer.View{}, tcpip.ErrClosedForReceive
}
return buffer.View{}, tcpip.ErrWouldBlock
}

s := e.rcvList.Front()
views := s.data.Views()
v := views[s.viewToDeliver]
s.viewToDeliver++

if s.viewToDeliver >= len(views) {
e.rcvList.Remove(s)
s.decRef()
}

scale := e.rcv.rcvWndScale
// tcp流量控制:检测接收窗口是否为0
wasZero := e.zeroReceiveWindow(scale)
e.rcvBufUsed -= len(v)
// 检测糊涂窗口,主动发送窗口不为0的通告给对方
if wasZero && !e.zeroReceiveWindow(scale) {
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}

return v, nil
}