Skip to content

Commit b04e0b7

Browse files
committed
fix: race condition
1 parent a9d6975 commit b04e0b7

11 files changed

+143
-142
lines changed

connection_impl.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -503,17 +503,10 @@ func (c *connection) flush() error {
503503
if c.outputBuffer.IsEmpty() {
504504
return nil
505505
}
506-
if c.operator.getMode() == ophup {
507-
// triggered read throttled, so here shouldn't trigger read event again
508-
err = c.operator.Control(PollHup2W)
509-
} else {
510-
err = c.operator.Control(PollR2RW)
511-
}
512-
c.operator.done()
513-
if err != nil {
514-
return Exception(err, "when flush")
515-
}
516506

507+
// no need to check if resume write successfully
508+
// if resume failed, the connection will be triggered triggerWrite(err), and waitFlush will return err
509+
c.resumeWrite()
517510
return c.waitFlush()
518511
}
519512

@@ -546,8 +539,8 @@ func (c *connection) waitFlush() (err error) {
546539
default:
547540
}
548541
// if timeout, remove write event from poller
549-
// we cannot flush it again, since we don't if the poller is still process outputBuffer
550-
c.operator.Control(PollRW2R)
542+
// we cannot flush it again, since we don't know if the poller is still processing outputBuffer
543+
c.pauseWrite()
551544
return Exception(ErrWriteTimeout, c.remoteAddr.String())
552545
}
553546
}

connection_reactor.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ func (c *connection) closeBuffer() {
8080

8181
// inputs implements FDOperator.
8282
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
83+
// trigger throttle
84+
if c.readBufferThreshold > 0 && int64(c.inputBuffer.Len()) >= c.readBufferThreshold {
85+
c.pauseRead()
86+
return
87+
}
88+
8389
vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
8490
return vs[:1]
8591
}
@@ -123,6 +129,7 @@ func (c *connection) inputAck(n int) (err error) {
123129
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
124130
if c.outputBuffer.IsEmpty() {
125131
c.pauseWrite()
132+
c.triggerWrite(nil)
126133
return rs, c.supportZeroCopy
127134
}
128135
rs = c.outputBuffer.GetBytes(vs)
@@ -137,50 +144,43 @@ func (c *connection) outputAck(n int) (err error) {
137144
}
138145
if c.outputBuffer.IsEmpty() {
139146
c.pauseWrite()
147+
c.triggerWrite(nil)
140148
}
141149
return nil
142150
}
143151

152+
/* The race description of operator event monitoring
153+
- Pause operation will remove old event monitor of operator
154+
- Resume operation will add new event monitor of operator
155+
- Only poller could use Pause to remove event monitor, and poller already hold the op.do() locker
156+
- Only user could use Resume, and user's operation maybe compete with poller's operation
157+
- If competition happen, because of all resume operation will monitor all events, it's safe to do that with a race condition.
158+
* If resume first and pause latter, poller will monitor the accurate events it needs.
159+
* If pause first and resume latter, poller will monitor the duplicate events which will be removed after next poller triggered.
160+
And poller will ensure to remove the duplicate events.
161+
- If there is no readBufferThreshold option, the code path will be more simple and efficient.
162+
*/
163+
144164
// pauseWrite removed the monitoring of write events.
145165
// pauseWrite used in poller
146166
func (c *connection) pauseWrite() {
147-
switch c.operator.getMode() {
148-
case opreadwrite:
149-
c.operator.Control(PollRW2R)
150-
case opwrite:
151-
c.operator.Control(PollW2Hup)
152-
}
153-
c.triggerWrite(nil)
167+
c.operator.Control(PollRW2R)
168+
}
169+
170+
// resumeWrite add the monitoring of write events.
171+
// resumeWrite used by users
172+
func (c *connection) resumeWrite() {
173+
c.operator.Control(PollR2RW)
154174
}
155175

156176
// pauseRead removed the monitoring of read events.
157177
// pauseRead used in poller
158178
func (c *connection) pauseRead() {
159-
// Note that the poller ensure that every fd should read all left data in socket buffer before detach it.
160-
// So the operator mode should never be ophup.
161-
var changeTo PollEvent
162-
switch c.operator.getMode() {
163-
case opread:
164-
changeTo = PollR2Hup
165-
case opreadwrite:
166-
changeTo = PollRW2W
167-
}
168-
if changeTo > 0 && atomic.CompareAndSwapInt32(&c.operator.throttled, 0, 1) {
169-
c.operator.Control(changeTo)
170-
}
179+
c.operator.Control(PollRW2W)
171180
}
172181

173182
// resumeRead add the monitoring of read events.
174183
// resumeRead used by users
175184
func (c *connection) resumeRead() {
176-
var changeTo PollEvent
177-
switch c.operator.getMode() {
178-
case ophup:
179-
changeTo = PollHup2R
180-
case opwrite:
181-
changeTo = PollW2RW
182-
}
183-
if changeTo > 0 && atomic.CompareAndSwapInt32(&c.operator.throttled, 1, 0) {
184-
c.operator.Control(changeTo)
185-
}
185+
c.operator.Control(PollW2RW)
186186
}

connection_test.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -784,18 +784,10 @@ func TestConnectionReadThresholdWithClosed(t *testing.T) {
784784
MustNil(t, err)
785785
t.Logf("read non-throttled data")
786786

787-
// continue read throttled data
788-
buf, err = connection.Reader().Next(5)
789-
MustNil(t, err)
790-
t.Logf("read throttled data: [%s]", buf)
791-
Equal(t, len(buf), 5)
792-
MustNil(t, err)
793-
err = connection.Reader().Release()
794-
MustNil(t, err)
795-
Equal(t, connection.Reader().Len(), 0)
796-
797-
_, err = connection.Reader().Next(1)
798-
Assert(t, errors.Is(err, ErrEOF))
787+
// continue read throttled data with EOF
788+
for !errors.Is(err, ErrEOF) {
789+
buf, err = connection.Reader().Next(1)
790+
}
799791
trigger <- struct{}{}
800792
return nil
801793
}

docs/guide/guide_cn.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -521,12 +521,12 @@ func callback(connection netpoll.Connection) error {
521521

522522
## 8. 如何配置连接的读取阈值大小 ?
523523

524-
Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadThreshold` 来控制读取的最大阈值。
524+
Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadBufferThreshold` 来控制读取的最大阈值。
525525

526526
### Client 侧使用
527527

528528
```
529-
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
529+
dialer := netpoll.NewDialer(netpoll.WithReadBufferThreshold(1024 * 1024 * 1024 * 1)) // 1GB
530530
conn, _ = dialer.DialConnection(network, address, timeout)
531531
```
532532

@@ -535,7 +535,7 @@ conn, _ = dialer.DialConnection(network, address, timeout)
535535
```
536536
eventLoop, _ := netpoll.NewEventLoop(
537537
handle,
538-
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
538+
netpoll.WithReadBufferThreshold(1024 * 1024 * 1024 * 1), // 1GB
539539
)
540540
```
541541

docs/guide/guide_en.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,12 +564,12 @@ By default, Netpoll does not place any limit on the reading speed of data sent b
564564
Whenever there have more data on the connection, Netpoll will read the data into its own buffer as quickly as possible.
565565

566566
But sometimes users may not want data to be read too quickly, or they want to control the service memory usage, or the user's OnRequest callback processing data very slowly and need to control the peer's send speed.
567-
In this case, you can use `WithReadThreshold` to control the maximum reading threshold.
567+
In this case, you can use `WithReadBufferThreshold` to control the maximum reading threshold.
568568

569569
### Client side use
570570

571571
```
572-
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
572+
dialer := netpoll.NewDialer(netpoll.WithReadBufferThreshold(1024 * 1024 * 1024 * 1)) // 1GB
573573
conn, _ = dialer.DialConnection(network, address, timeout)
574574
```
575575

@@ -578,7 +578,7 @@ conn, _ = dialer.DialConnection(network, address, timeout)
578578
```
579579
eventLoop, _ := netpoll.NewEventLoop(
580580
handle,
581-
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
581+
netpoll.WithReadBufferThreshold(1024 * 1024 * 1024 * 1), // 1GB
582582
)
583583
```
584584

fd_operator.go

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,6 @@ import (
1919
"sync/atomic"
2020
)
2121

22-
const (
23-
opdetach int32 = -1
24-
_ int32 = 0 // default op mode, means nothing
25-
opread int32 = 1
26-
opwrite int32 = 2
27-
opreadwrite int32 = 3
28-
ophup int32 = 4
29-
)
30-
3122
// FDOperator is a collection of operations on file descriptors.
3223
type FDOperator struct {
3324
// FD is file descriptor, poll will bind when register.
@@ -51,8 +42,8 @@ type FDOperator struct {
5142
// poll is the registered location of the file descriptor.
5243
poll Poll
5344

54-
mode int32
55-
throttled int32
45+
// protect only detach once
46+
detached int32
5647

5748
// private, used by operatorCache
5849
next *FDOperator
@@ -61,21 +52,16 @@ type FDOperator struct {
6152
}
6253

6354
func (op *FDOperator) Control(event PollEvent) error {
55+
if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
56+
return nil
57+
}
6458
return op.poll.Control(op, event)
6559
}
6660

6761
func (op *FDOperator) Free() {
6862
op.poll.Free(op)
6963
}
7064

71-
func (op *FDOperator) getMode() int32 {
72-
return atomic.LoadInt32(&op.mode)
73-
}
74-
75-
func (op *FDOperator) setMode(mode int32) {
76-
atomic.StoreInt32(&op.mode, mode)
77-
}
78-
7965
func (op *FDOperator) do() (can bool) {
8066
return atomic.CompareAndSwapInt32(&op.state, 1, 2)
8167
}
@@ -112,6 +98,5 @@ func (op *FDOperator) reset() {
11298
op.Inputs, op.InputAck = nil, nil
11399
op.Outputs, op.OutputAck = nil, nil
114100
op.poll = nil
115-
op.mode = 0
116-
op.throttled = 0
101+
op.detached = 0
117102
}

netpoll_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ func TestReadThresholdOption(t *testing.T) {
456456
Equal(t, len(msg), 5)
457457

458458
_, err = connection.Reader().Next(1)
459-
Assert(t, errors.Is(err, ErrEOF))
459+
Assert(t, errors.Is(err, ErrEOF), err)
460460
t.Logf("server closed")
461461
return nil
462462
}, WithReadBufferThreshold(int64(readThreshold)))
@@ -530,16 +530,13 @@ func TestReadThresholdClosed(t *testing.T) {
530530
t.Logf("server reading msg1")
531531
trigger <- struct{}{} // let client send msg2
532532
<-trigger // ensure client send msg2 and closed
533-
total := 0
534533
for {
535534
msg, err := connection.Reader().Next(1)
536-
total += len(msg)
537535
if errors.Is(err, ErrEOF) {
538536
break
539537
}
540538
_ = msg
541539
}
542-
Equal(t, total, readThreshold+5)
543540
close(trigger)
544541
return nil
545542
}, WithReadBufferThreshold(int64(readThreshold)))

poll.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,31 +48,23 @@ type PollEvent int
4848
const (
4949
// PollReadable is used to monitor whether the FDOperator registered by
5050
// listener and connection is readable or closed.
51-
PollReadable PollEvent = 0x1
51+
PollReadable PollEvent = iota + 1
5252

5353
// PollWritable is used to monitor whether the FDOperator created by the dialer is writable or closed.
5454
// ET mode must be used (still need to poll hup after being writable)
55-
PollWritable PollEvent = 0x2
55+
PollWritable
5656

5757
// PollDetach is used to remove the FDOperator from poll.
58-
PollDetach PollEvent = 0x3
58+
PollDetach
5959

6060
// PollR2RW is used to monitor writable for FDOperator,
6161
// which is only called when the socket write buffer is full.
62-
PollR2RW PollEvent = 0x4
62+
PollR2RW
6363
// PollRW2R is used to remove the writable monitor of FDOperator, generally used with PollR2RW.
64-
PollRW2R PollEvent = 0x5
64+
PollRW2R
6565

6666
// PollRW2W is used to remove the readable monitor of FDOperator.
67-
PollRW2W PollEvent = 0x6
67+
PollRW2W
6868
// PollW2RW is used to add the readable monitor of FDOperator, generally used with PollRW2W.
69-
PollW2RW PollEvent = 0x7
70-
PollW2Hup PollEvent = 0x8
71-
72-
// PollR2Hup is used to remove the readable monitor of FDOperator.
73-
PollR2Hup PollEvent = 0x9
74-
// PollHup2R is used to add the readable monitor of FDOperator, generally used with PollR2Hup.
75-
PollHup2R PollEvent = 0xA
76-
// PollHup2W is used to add the writeable monitor of FDOperator.
77-
PollHup2W PollEvent = 0xB
69+
PollW2RW
7870
)

poll_default_bsd.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ func (p *defaultPoll) Wait() error {
115115
}
116116
}
117117
if triggerHup {
118-
// if peer closed with throttled state, we should ensure we read all left data to avoid data loss
119-
if (triggerRead || atomic.LoadInt32(&operator.throttled) > 0) && operator.Inputs != nil {
118+
if triggerRead && operator.Inputs != nil {
120119
var leftRead int
121120
// read all left data if peer send and close
122121
if leftRead, err = readall(operator, barriers[i]); err != nil && !errors.Is(err, ErrEOF) {
@@ -183,44 +182,25 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
183182
switch event {
184183
case PollReadable:
185184
operator.inuse()
186-
operator.setMode(opread)
187185
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE
188186
case PollWritable:
189187
operator.inuse()
190-
operator.setMode(opwrite)
191188
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
192189
case PollDetach:
193-
operator.setMode(ophup)
194190
if operator.OnWrite != nil { // means WaitWrite finished
195191
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE
196192
} else {
197193
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE
198194
}
199195
p.delOperator(operator)
200196
case PollR2RW:
201-
operator.setMode(opreadwrite)
202197
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
203198
case PollRW2R:
204-
operator.setMode(opread)
205199
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE
206200
case PollRW2W:
207-
operator.setMode(opwrite)
208201
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE
209202
case PollW2RW:
210-
operator.setMode(opreadwrite)
211203
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE
212-
case PollR2Hup:
213-
operator.setMode(ophup)
214-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE
215-
case PollW2Hup:
216-
operator.setMode(ophup)
217-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE
218-
case PollHup2R:
219-
operator.setMode(opread)
220-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE
221-
case PollHup2W:
222-
operator.setMode(opwrite)
223-
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
224204
}
225205
_, err := syscall.Kevent(p.fd, evs, nil, nil)
226206
return err

0 commit comments

Comments
 (0)