Skip to content

Commit d5b0914

Browse files
committed
feat: add WithReadThreshold API
1 parent 9707178 commit d5b0914

23 files changed

+551
-164
lines changed

connection_errors.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const (
3535
ErrEOF = syscall.Errno(0x106)
3636
// Write I/O buffer timeout, calling by Connection.Writer
3737
ErrWriteTimeout = syscall.Errno(0x107)
38+
// The wait read size large than read threshold
39+
ErrReadOutOfThreshold = syscall.Errno(0x108)
3840
)
3941

4042
const ErrnoMask = 0xFF
@@ -90,11 +92,12 @@ func (e *exception) Unwrap() error {
9092

9193
// Errors defined in netpoll
9294
var errnos = [...]string{
93-
ErrnoMask & ErrConnClosed: "connection has been closed",
94-
ErrnoMask & ErrReadTimeout: "connection read timeout",
95-
ErrnoMask & ErrDialTimeout: "dial wait timeout",
96-
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
97-
ErrnoMask & ErrUnsupported: "netpoll dose not support",
98-
ErrnoMask & ErrEOF: "EOF",
99-
ErrnoMask & ErrWriteTimeout: "connection write timeout",
95+
ErrnoMask & ErrConnClosed: "connection has been closed",
96+
ErrnoMask & ErrReadTimeout: "connection read timeout",
97+
ErrnoMask & ErrDialTimeout: "dial wait timeout",
98+
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
99+
ErrnoMask & ErrUnsupported: "netpoll dose not support",
100+
ErrnoMask & ErrEOF: "EOF",
101+
ErrnoMask & ErrWriteTimeout: "connection write timeout",
102+
ErrnoMask & ErrReadOutOfThreshold: "connection read size is out of threshold",
100103
}

connection_impl.go

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,21 @@ type connection struct {
3333
netFD
3434
onEvent
3535
locker
36-
operator *FDOperator
37-
readTimeout time.Duration
38-
readTimer *time.Timer
39-
readTrigger chan error
40-
waitReadSize int64
41-
writeTimeout time.Duration
42-
writeTimer *time.Timer
43-
writeTrigger chan error
44-
inputBuffer *LinkBuffer
45-
outputBuffer *LinkBuffer
46-
outputBarrier *barrier
47-
supportZeroCopy bool
48-
maxSize int // The maximum size of data between two Release().
49-
bookSize int // The size of data that can be read at once.
36+
operator *FDOperator
37+
readTimeout time.Duration
38+
readTimer *time.Timer
39+
readTrigger chan error
40+
waitReadSize int64
41+
writeTimeout time.Duration
42+
writeTimer *time.Timer
43+
writeTrigger chan error
44+
inputBuffer *LinkBuffer
45+
outputBuffer *LinkBuffer
46+
outputBarrier *barrier
47+
supportZeroCopy bool
48+
maxSize int // The maximum size of data between two Release().
49+
bookSize int // The size of data that can be read at once.
50+
readBufferThreshold int64 // The readBufferThreshold limit the size of connection inputBuffer. In bytes.
5051
}
5152

5253
var (
@@ -94,6 +95,12 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
9495
return nil
9596
}
9697

98+
// SetReadBufferThreshold implements Connection.
99+
func (c *connection) SetReadBufferThreshold(threshold int64) error {
100+
c.readBufferThreshold = threshold
101+
return nil
102+
}
103+
97104
// ------------------------------------------ implement zero-copy reader ------------------------------------------
98105

99106
// Next implements Connection.
@@ -394,28 +401,44 @@ func (c *connection) triggerWrite(err error) {
394401
// waitRead will wait full n bytes.
395402
func (c *connection) waitRead(n int) (err error) {
396403
if n <= c.inputBuffer.Len() {
397-
return nil
404+
goto CLEANUP
398405
}
406+
// cannot wait read with an out of threshold size
407+
if c.readBufferThreshold > 0 && int64(n) > c.readBufferThreshold {
408+
// just return error and dont do cleanup
409+
return Exception(ErrReadOutOfThreshold, "wait read")
410+
}
411+
399412
atomic.StoreInt64(&c.waitReadSize, int64(n))
400-
defer atomic.StoreInt64(&c.waitReadSize, 0)
401413
if c.readTimeout > 0 {
402-
return c.waitReadWithTimeout(n)
414+
err = c.waitReadWithTimeout(n)
415+
goto CLEANUP
403416
}
404417
// wait full n
405418
for c.inputBuffer.Len() < n {
406419
switch c.status(closing) {
407420
case poller:
408-
return Exception(ErrEOF, "wait read")
421+
err = Exception(ErrEOF, "wait read")
409422
case user:
410-
return Exception(ErrConnClosed, "wait read")
423+
err = Exception(ErrConnClosed, "wait read")
411424
default:
412425
err = <-c.readTrigger
413-
if err != nil {
414-
return err
415-
}
426+
}
427+
if err != nil {
428+
goto CLEANUP
416429
}
417430
}
418-
return nil
431+
CLEANUP:
432+
atomic.StoreInt64(&c.waitReadSize, 0)
433+
if c.readBufferThreshold > 0 && err == nil {
434+
// only resume read when current read size could make newBufferSize < readBufferThreshold
435+
bufferSize := int64(c.inputBuffer.Len())
436+
newBufferSize := bufferSize - int64(n)
437+
if bufferSize >= c.readBufferThreshold && newBufferSize < c.readBufferThreshold {
438+
c.resumeRead()
439+
}
440+
}
441+
return err
419442
}
420443

421444
// waitReadWithTimeout will wait full n bytes or until timeout.

connection_onevent.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
103103
c.SetReadTimeout(opts.readTimeout)
104104
c.SetWriteTimeout(opts.writeTimeout)
105105
c.SetIdleTimeout(opts.idleTimeout)
106+
c.SetReadBufferThreshold(opts.readBufferThreshold)
106107

107108
// calling prepare first and then register.
108109
if opts.onPrepare != nil {

connection_reactor.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ func (c *connection) inputAck(n int) (err error) {
104104
c.maxSize = mallocMax
105105
}
106106

107+
// trigger throttle
108+
if c.readBufferThreshold > 0 && int64(length) >= c.readBufferThreshold {
109+
c.pauseRead()
110+
}
111+
107112
var needTrigger = true
108113
if length == n { // first start onRequest
109114
needTrigger = c.onRequest()
@@ -117,7 +122,7 @@ func (c *connection) inputAck(n int) (err error) {
117122
// outputs implements FDOperator.
118123
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
119124
if c.outputBuffer.IsEmpty() {
120-
c.rw2r()
125+
c.pauseWrite()
121126
return rs, c.supportZeroCopy
122127
}
123128
rs = c.outputBuffer.GetBytes(vs)
@@ -131,13 +136,41 @@ func (c *connection) outputAck(n int) (err error) {
131136
c.outputBuffer.Release()
132137
}
133138
if c.outputBuffer.IsEmpty() {
134-
c.rw2r()
139+
c.pauseWrite()
135140
}
136141
return nil
137142
}
138143

139-
// rw2r removed the monitoring of write events.
140-
func (c *connection) rw2r() {
141-
c.operator.Control(PollRW2R)
144+
// pauseWrite removed the monitoring of write events.
145+
// pauseWrite used in poller
146+
func (c *connection) pauseWrite() {
147+
switch c.operator.getMode() {
148+
case opreadwrite:
149+
c.operator.Control(PollRW2R)
150+
case opwrite:
151+
c.operator.Control(PollW2Hup)
152+
}
142153
c.triggerWrite(nil)
143154
}
155+
156+
// pauseRead removed the monitoring of read events.
157+
// pauseRead used in poller
158+
func (c *connection) pauseRead() {
159+
switch c.operator.getMode() {
160+
case opread:
161+
c.operator.Control(PollR2Hup)
162+
case opreadwrite:
163+
c.operator.Control(PollRW2W)
164+
}
165+
}
166+
167+
// resumeRead add the monitoring of read events.
168+
// resumeRead used by users
169+
func (c *connection) resumeRead() {
170+
switch c.operator.getMode() {
171+
case ophup:
172+
c.operator.Control(PollHup2R)
173+
case opwrite:
174+
c.operator.Control(PollW2RW)
175+
}
176+
}

connection_test.go

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -499,18 +499,15 @@ func TestConnDetach(t *testing.T) {
499499
func TestParallelShortConnection(t *testing.T) {
500500
ln, err := createTestListener("tcp", ":12345")
501501
MustNil(t, err)
502-
defer ln.Close()
503-
504502
var received int64
505503
el, err := NewEventLoop(func(ctx context.Context, connection Connection) error {
506504
data, err := connection.Reader().Next(connection.Reader().Len())
507-
if err != nil {
508-
return err
509-
}
505+
Assert(t, err == nil || errors.Is(err, ErrEOF))
510506
atomic.AddInt64(&received, int64(len(data)))
511-
//t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
507+
t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
512508
return nil
513509
})
510+
defer el.Shutdown(context.Background())
514511
go func() {
515512
el.Serve(ln)
516513
}()
@@ -536,10 +533,11 @@ func TestParallelShortConnection(t *testing.T) {
536533
}
537534
wg.Wait()
538535

539-
for atomic.LoadInt64(&received) < int64(totalSize) {
540-
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
536+
start := time.Now()
537+
for atomic.LoadInt64(&received) < int64(totalSize) && time.Now().Sub(start) < time.Second {
541538
time.Sleep(time.Millisecond * 100)
542539
}
540+
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
543541
}
544542

545543
func TestConnectionServerClose(t *testing.T) {
@@ -643,8 +641,6 @@ func TestConnectionServerClose(t *testing.T) {
643641
func TestConnectionDailTimeoutAndClose(t *testing.T) {
644642
ln, err := createTestListener("tcp", ":12345")
645643
MustNil(t, err)
646-
defer ln.Close()
647-
648644
el, err := NewEventLoop(
649645
func(ctx context.Context, connection Connection) error {
650646
_, err = connection.Reader().Next(connection.Reader().Len())
@@ -668,10 +664,102 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
668664
go func() {
669665
defer wg.Done()
670666
conn, err := DialConnection("tcp", ":12345", time.Nanosecond)
671-
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"))
667+
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"), err)
672668
_ = conn
673669
}()
674670
}
675671
wg.Wait()
676672
}
677673
}
674+
675+
func TestConnectionReadOutOfThreshold(t *testing.T) {
676+
var readThreshold = 1024 * 100
677+
var readSize = readThreshold + 1
678+
var opts = &options{}
679+
var wg sync.WaitGroup
680+
wg.Add(1)
681+
opts.onRequest = func(ctx context.Context, connection Connection) error {
682+
if connection.Reader().Len() < readThreshold {
683+
return nil
684+
}
685+
defer wg.Done()
686+
// read throttled data
687+
_, err := connection.Reader().Next(readSize)
688+
Assert(t, errors.Is(err, ErrReadOutOfThreshold), err)
689+
connection.Close()
690+
return nil
691+
}
692+
693+
WithReadBufferThreshold(int64(readThreshold)).f(opts)
694+
r, w := GetSysFdPairs()
695+
rconn, wconn := &connection{}, &connection{}
696+
rconn.init(&netFD{fd: r}, opts)
697+
wconn.init(&netFD{fd: w}, opts)
698+
699+
msg := make([]byte, readThreshold)
700+
_, err := wconn.Writer().WriteBinary(msg)
701+
MustNil(t, err)
702+
err = wconn.Writer().Flush()
703+
MustNil(t, err)
704+
wg.Wait()
705+
}
706+
707+
func TestConnectionReadThreshold(t *testing.T) {
708+
var readThreshold int64 = 1024 * 100
709+
var opts = &options{}
710+
var wg sync.WaitGroup
711+
var throttled int32
712+
wg.Add(1)
713+
opts.onRequest = func(ctx context.Context, connection Connection) error {
714+
if int64(connection.Reader().Len()) < readThreshold {
715+
return nil
716+
}
717+
defer wg.Done()
718+
719+
atomic.StoreInt32(&throttled, 1)
720+
// check if no more read data when throttled
721+
inbuffered := connection.Reader().Len()
722+
t.Logf("Inbuffered: %d", inbuffered)
723+
time.Sleep(time.Millisecond * 100)
724+
Equal(t, inbuffered, connection.Reader().Len())
725+
726+
// read non-throttled data
727+
buf, err := connection.Reader().Next(int(readThreshold))
728+
Equal(t, int64(len(buf)), readThreshold)
729+
MustNil(t, err)
730+
err = connection.Reader().Release()
731+
MustNil(t, err)
732+
t.Logf("read non-throttled data")
733+
734+
// continue read throttled data
735+
buf, err = connection.Reader().Next(5)
736+
MustNil(t, err)
737+
t.Logf("read throttled data: [%s]", buf)
738+
Equal(t, len(buf), 5)
739+
MustNil(t, err)
740+
err = connection.Reader().Release()
741+
MustNil(t, err)
742+
Equal(t, connection.Reader().Len(), 0)
743+
return nil
744+
}
745+
746+
WithReadBufferThreshold(readThreshold).f(opts)
747+
r, w := GetSysFdPairs()
748+
rconn, wconn := &connection{}, &connection{}
749+
rconn.init(&netFD{fd: r}, opts)
750+
wconn.init(&netFD{fd: w}, opts)
751+
Assert(t, rconn.readBufferThreshold == readThreshold)
752+
753+
msg := make([]byte, readThreshold)
754+
_, err := wconn.Writer().WriteBinary(msg)
755+
MustNil(t, err)
756+
err = wconn.Writer().Flush()
757+
MustNil(t, err)
758+
_, err = wconn.Writer().WriteString("hello")
759+
MustNil(t, err)
760+
err = wconn.Writer().Flush()
761+
MustNil(t, err)
762+
t.Logf("flush final msg")
763+
764+
wg.Wait()
765+
}

docs/guide/guide_cn.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,26 @@ func callback(connection netpoll.Connection) error {
519519
}
520520
```
521521

522+
## 8. 如何配置连接的读取阈值大小 ?
523+
524+
Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadThreshold` 来控制读取的最大阈值。
525+
526+
### Client 侧使用
527+
528+
```
529+
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
530+
conn, _ = dialer.DialConnection(network, address, timeout)
531+
```
532+
533+
### Server 侧使用
534+
535+
```
536+
eventLoop, _ := netpoll.NewEventLoop(
537+
handle,
538+
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
539+
)
540+
```
541+
522542
# 注意事项
523543

524544
## 1. 错误设置 NumLoops

0 commit comments

Comments
 (0)