Skip to content

Commit 129d68a

Browse files
committed
feat: add WithReadThreshold API
1 parent 9707178 commit 129d68a

20 files changed

+494
-104
lines changed

connection_errors.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const (
3535
ErrEOF = syscall.Errno(0x106)
3636
// Write I/O buffer timeout, calling by Connection.Writer
3737
ErrWriteTimeout = syscall.Errno(0x107)
38+
// The wait read size large than read threshold
39+
ErrReadOutOfThreshold = syscall.Errno(0x108)
3840
)
3941

4042
const ErrnoMask = 0xFF
@@ -90,11 +92,12 @@ func (e *exception) Unwrap() error {
9092

9193
// Errors defined in netpoll
9294
var errnos = [...]string{
93-
ErrnoMask & ErrConnClosed: "connection has been closed",
94-
ErrnoMask & ErrReadTimeout: "connection read timeout",
95-
ErrnoMask & ErrDialTimeout: "dial wait timeout",
96-
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
97-
ErrnoMask & ErrUnsupported: "netpoll dose not support",
98-
ErrnoMask & ErrEOF: "EOF",
99-
ErrnoMask & ErrWriteTimeout: "connection write timeout",
95+
ErrnoMask & ErrConnClosed: "connection has been closed",
96+
ErrnoMask & ErrReadTimeout: "connection read timeout",
97+
ErrnoMask & ErrDialTimeout: "dial wait timeout",
98+
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
99+
ErrnoMask & ErrUnsupported: "netpoll dose not support",
100+
ErrnoMask & ErrEOF: "EOF",
101+
ErrnoMask & ErrWriteTimeout: "connection write timeout",
102+
ErrnoMask & ErrReadOutOfThreshold: "connection read size is out of threshold",
100103
}

connection_impl.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ type connection struct {
4545
outputBuffer *LinkBuffer
4646
outputBarrier *barrier
4747
supportZeroCopy bool
48-
maxSize int // The maximum size of data between two Release().
49-
bookSize int // The size of data that can be read at once.
48+
maxSize int // The maximum size of data between two Release().
49+
bookSize int // The size of data that can be read at once.
50+
readThreshold int64 // The readThreshold of connection max read.
5051
}
5152

5253
var (
@@ -94,6 +95,12 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
9495
return nil
9596
}
9697

98+
// SetReadThreshold implements Connection.
99+
func (c *connection) SetReadThreshold(readThreshold int64) error {
100+
c.readThreshold = readThreshold
101+
return nil
102+
}
103+
97104
// ------------------------------------------ implement zero-copy reader ------------------------------------------
98105

99106
// Next implements Connection.
@@ -394,28 +401,44 @@ func (c *connection) triggerWrite(err error) {
394401
// waitRead will wait full n bytes.
395402
func (c *connection) waitRead(n int) (err error) {
396403
if n <= c.inputBuffer.Len() {
397-
return nil
404+
goto CLEANUP
398405
}
406+
// cannot wait read with an out of threshold size
407+
if c.readThreshold > 0 && int64(n) > c.readThreshold {
408+
// just return error and dont do cleanup
409+
return Exception(ErrReadOutOfThreshold, "wait read")
410+
}
411+
399412
atomic.StoreInt64(&c.waitReadSize, int64(n))
400-
defer atomic.StoreInt64(&c.waitReadSize, 0)
401413
if c.readTimeout > 0 {
402-
return c.waitReadWithTimeout(n)
414+
err = c.waitReadWithTimeout(n)
415+
goto CLEANUP
403416
}
404417
// wait full n
405418
for c.inputBuffer.Len() < n {
406419
switch c.status(closing) {
407420
case poller:
408-
return Exception(ErrEOF, "wait read")
421+
err = Exception(ErrEOF, "wait read")
409422
case user:
410-
return Exception(ErrConnClosed, "wait read")
423+
err = Exception(ErrConnClosed, "wait read")
411424
default:
412425
err = <-c.readTrigger
413-
if err != nil {
414-
return err
415-
}
426+
}
427+
if err != nil {
428+
goto CLEANUP
416429
}
417430
}
418-
return nil
431+
CLEANUP:
432+
atomic.StoreInt64(&c.waitReadSize, 0)
433+
if c.readThreshold > 0 && err == nil {
434+
// only resume read when current read size could make newBufferSize < readThreshold
435+
bufferSize := int64(c.inputBuffer.Len())
436+
newBufferSize := bufferSize - int64(n)
437+
if bufferSize >= c.readThreshold && newBufferSize < c.readThreshold {
438+
c.resumeRead()
439+
}
440+
}
441+
return err
419442
}
420443

421444
// waitReadWithTimeout will wait full n bytes or until timeout.

connection_onevent.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
103103
c.SetReadTimeout(opts.readTimeout)
104104
c.SetWriteTimeout(opts.writeTimeout)
105105
c.SetIdleTimeout(opts.idleTimeout)
106+
c.SetReadThreshold(opts.readThreshold)
106107

107108
// calling prepare first and then register.
108109
if opts.onPrepare != nil {

connection_reactor.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ func (c *connection) inputAck(n int) (err error) {
104104
c.maxSize = mallocMax
105105
}
106106

107+
// trigger throttle
108+
if c.readThreshold > 0 && int64(length) >= c.readThreshold {
109+
c.pauseRead()
110+
}
111+
107112
var needTrigger = true
108113
if length == n { // first start onRequest
109114
needTrigger = c.onRequest()
@@ -138,6 +143,29 @@ func (c *connection) outputAck(n int) (err error) {
138143

139144
// rw2r removed the monitoring of write events.
140145
func (c *connection) rw2r() {
141-
c.operator.Control(PollRW2R)
146+
switch c.operator.getMode() {
147+
case opreadwrite:
148+
c.operator.Control(PollRW2R)
149+
case opwrite:
150+
c.operator.Control(PollW2RW)
151+
}
142152
c.triggerWrite(nil)
143153
}
154+
155+
func (c *connection) pauseRead() {
156+
switch c.operator.getMode() {
157+
case opread:
158+
c.operator.Control(PollR2Hup)
159+
case opreadwrite:
160+
c.operator.Control(PollRW2W)
161+
}
162+
}
163+
164+
func (c *connection) resumeRead() {
165+
switch c.operator.getMode() {
166+
case ophup:
167+
c.operator.Control(PollHup2R)
168+
case opwrite:
169+
c.operator.Control(PollW2RW)
170+
}
171+
}

connection_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,3 +675,95 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
675675
wg.Wait()
676676
}
677677
}
678+
679+
func TestConnectionReadOutOfThreshold(t *testing.T) {
680+
var readThreshold = 1024 * 100
681+
var readSize = readThreshold + 1
682+
var opts = &options{}
683+
var wg sync.WaitGroup
684+
wg.Add(1)
685+
opts.onRequest = func(ctx context.Context, connection Connection) error {
686+
if connection.Reader().Len() < readThreshold {
687+
return nil
688+
}
689+
defer wg.Done()
690+
// read throttled data
691+
_, err := connection.Reader().Next(readSize)
692+
Assert(t, errors.Is(err, ErrReadOutOfThreshold), err)
693+
connection.Close()
694+
return nil
695+
}
696+
697+
WithReadThreshold(int64(readThreshold)).f(opts)
698+
r, w := GetSysFdPairs()
699+
rconn, wconn := &connection{}, &connection{}
700+
rconn.init(&netFD{fd: r}, opts)
701+
wconn.init(&netFD{fd: w}, opts)
702+
703+
msg := make([]byte, readThreshold)
704+
_, err := wconn.Writer().WriteBinary(msg)
705+
MustNil(t, err)
706+
err = wconn.Writer().Flush()
707+
MustNil(t, err)
708+
wg.Wait()
709+
}
710+
711+
func TestConnectionReadThreshold(t *testing.T) {
712+
var readThreshold int64 = 1024 * 100
713+
var opts = &options{}
714+
var wg sync.WaitGroup
715+
var throttled int32
716+
wg.Add(1)
717+
opts.onRequest = func(ctx context.Context, connection Connection) error {
718+
if int64(connection.Reader().Len()) < readThreshold {
719+
return nil
720+
}
721+
defer wg.Done()
722+
723+
atomic.StoreInt32(&throttled, 1)
724+
// check if no more read data when throttled
725+
inbuffered := connection.Reader().Len()
726+
t.Logf("Inbuffered: %d", inbuffered)
727+
time.Sleep(time.Millisecond * 100)
728+
Equal(t, inbuffered, connection.Reader().Len())
729+
730+
// read non-throttled data
731+
buf, err := connection.Reader().Next(int(readThreshold))
732+
Equal(t, int64(len(buf)), readThreshold)
733+
MustNil(t, err)
734+
err = connection.Reader().Release()
735+
MustNil(t, err)
736+
t.Logf("read non-throttled data")
737+
738+
// continue read throttled data
739+
buf, err = connection.Reader().Next(5)
740+
MustNil(t, err)
741+
t.Logf("read throttled data: [%s]", buf)
742+
Equal(t, len(buf), 5)
743+
MustNil(t, err)
744+
err = connection.Reader().Release()
745+
MustNil(t, err)
746+
Equal(t, connection.Reader().Len(), 0)
747+
return nil
748+
}
749+
750+
WithReadThreshold(readThreshold).f(opts)
751+
r, w := GetSysFdPairs()
752+
rconn, wconn := &connection{}, &connection{}
753+
rconn.init(&netFD{fd: r}, opts)
754+
wconn.init(&netFD{fd: w}, opts)
755+
Assert(t, rconn.readThreshold == readThreshold)
756+
757+
msg := make([]byte, readThreshold)
758+
_, err := wconn.Writer().WriteBinary(msg)
759+
MustNil(t, err)
760+
err = wconn.Writer().Flush()
761+
MustNil(t, err)
762+
_, err = wconn.Writer().WriteString("hello")
763+
MustNil(t, err)
764+
err = wconn.Writer().Flush()
765+
MustNil(t, err)
766+
t.Logf("flush final msg")
767+
768+
wg.Wait()
769+
}

docs/guide/guide_cn.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,26 @@ func callback(connection netpoll.Connection) error {
519519
}
520520
```
521521

522+
## 8. 如何配置连接的读取阈值大小 ?
523+
524+
Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadThreshold` 来控制读取的最大阈值。
525+
526+
### Client 侧使用
527+
528+
```
529+
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
530+
conn, _ = dialer.DialConnection(network, address, timeout)
531+
```
532+
533+
### Server 侧使用
534+
535+
```
536+
eventLoop, _ := netpoll.NewEventLoop(
537+
handle,
538+
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
539+
)
540+
```
541+
522542
# 注意事项
523543

524544
## 1. 错误设置 NumLoops

docs/guide/guide_en.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,30 @@ func callback(connection netpoll.Connection) error {
558558
}
559559
```
560560

561+
## 8. How to configure the read threshold of the connection?
562+
563+
By default, Netpoll does not place any limit on the reading speed of data sent by the end.
564+
Whenever there have more data on the connection, Netpoll will read the data into its own buffer as quickly as possible.
565+
566+
But sometimes users may not want data to be read too quickly, or they want to control the service memory usage, or the user's OnRequest callback processing data very slowly and need to control the peer's send speed.
567+
In this case, you can use `WithReadThreshold` to control the maximum reading threshold.
568+
569+
### Client side use
570+
571+
```
572+
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
573+
conn, _ = dialer.DialConnection(network, address, timeout)
574+
```
575+
576+
### Server side use
577+
578+
```
579+
eventLoop, _ := netpoll.NewEventLoop(
580+
handle,
581+
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
582+
)
583+
```
584+
561585
# Attention
562586

563587
## 1. Wrong setting of NumLoops

eventloop.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,27 +54,27 @@ type OnPrepare func(connection Connection) context.Context
5454
//
5555
// An example usage in TCP Proxy scenario:
5656
//
57-
// func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context {
58-
// downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second)
59-
// return context.WithValue(ctx, downstreamKey, downstream)
60-
// }
61-
// func onRequest(ctx context.Context, upstream netpoll.Connection) error {
62-
// downstream := ctx.Value(downstreamKey).(netpoll.Connection)
63-
// }
57+
// func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context {
58+
// downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second)
59+
// return context.WithValue(ctx, downstreamKey, downstream)
60+
// }
61+
// func onRequest(ctx context.Context, upstream netpoll.Connection) error {
62+
// downstream := ctx.Value(downstreamKey).(netpoll.Connection)
63+
// }
6464
type OnConnect func(ctx context.Context, connection Connection) context.Context
6565

6666
// OnRequest defines the function for handling connection. When data is sent from the connection peer,
6767
// netpoll actively reads the data in LT mode and places it in the connection's input buffer.
6868
// Generally, OnRequest starts handling the data in the following way:
6969
//
70-
// func OnRequest(ctx context, connection Connection) error {
71-
// input := connection.Reader().Next(n)
72-
// handling input data...
73-
// send, _ := connection.Writer().Malloc(l)
74-
// copy(send, output)
75-
// connection.Flush()
76-
// return nil
77-
// }
70+
// func OnRequest(ctx context, connection Connection) error {
71+
// input := connection.Reader().Next(n)
72+
// handling input data...
73+
// send, _ := connection.Writer().Malloc(l)
74+
// copy(send, output)
75+
// connection.Flush()
76+
// return nil
77+
// }
7878
//
7979
// OnRequest will run in a separate goroutine and
8080
// it is guaranteed that there is one and only one OnRequest running at the same time.

0 commit comments

Comments
 (0)