Skip to content

Commit b0bf57d

Browse files
authored
feat: implement conn deadline (#396)
1 parent 3f9d740 commit b0bf57d

File tree

7 files changed

+289
-93
lines changed

7 files changed

+289
-93
lines changed

.github/workflows/pr-check.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,15 @@ jobs:
66
compatibility-test:
77
strategy:
88
matrix:
9-
go: [ 1.18, 1.23 ]
10-
os: [ X64, ARM64 ]
9+
go: [ 1.18, 1.24 ]
10+
os: [ ubuntu-latest, ubuntu-24.04-arm, macos-latest ]
1111
runs-on: ${{ matrix.os }}
1212
steps:
1313
- uses: actions/checkout@v4
1414
- name: Set up Go
1515
uses: actions/setup-go@v5
1616
with:
1717
go-version: ${{ matrix.go }}
18-
cache: false
1918
- name: Unit Test
2019
run: go test -timeout=2m -race ./...
2120
- name: Benchmark
@@ -46,7 +45,7 @@ jobs:
4645
uses: crate-ci/[email protected]
4746

4847
golangci-lint:
49-
runs-on: [ Linux, X64 ]
48+
runs-on: ubuntu-latest
5049
steps:
5150
- uses: actions/checkout@v4
5251
- name: Set up Go

connection_impl.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ type connection struct {
3939
locker
4040
operator *FDOperator
4141
readTimeout time.Duration
42+
readDeadline int64 // UnixNano(). it overwrites readTimeout. 0 if not set.
4243
readTimer *time.Timer
4344
readTrigger chan error
4445
waitReadSize int64
4546
writeTimeout time.Duration
47+
writeDeadline int64 // UnixNano(). it overwrites writeTimeout. 0 if not set.
4648
writeTimer *time.Timer
4749
writeTrigger chan error
4850
inputBuffer *LinkBuffer
@@ -87,6 +89,7 @@ func (c *connection) SetReadTimeout(timeout time.Duration) error {
8789
if timeout >= 0 {
8890
c.readTimeout = timeout
8991
}
92+
c.readDeadline = 0
9093
return nil
9194
}
9295

@@ -95,6 +98,38 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
9598
if timeout >= 0 {
9699
c.writeTimeout = timeout
97100
}
101+
c.writeDeadline = 0
102+
return nil
103+
}
104+
105+
// SetDeadline implements net.Conn.SetDeadline
106+
func (c *connection) SetDeadline(t time.Time) error {
107+
v := int64(0)
108+
if !t.IsZero() {
109+
v = t.UnixNano()
110+
}
111+
c.readDeadline = v
112+
c.writeDeadline = v
113+
return nil
114+
}
115+
116+
// SetReadDeadline implements net.Conn.SetReadDeadline
117+
func (c *connection) SetReadDeadline(t time.Time) error {
118+
if t.IsZero() {
119+
c.readDeadline = 0
120+
} else {
121+
c.readDeadline = t.UnixNano()
122+
}
123+
return nil
124+
}
125+
126+
// SetWriteDeadline implements net.Conn.SetWriteDeadline
127+
func (c *connection) SetWriteDeadline(t time.Time) error {
128+
if t.IsZero() {
129+
c.writeDeadline = 0
130+
} else {
131+
c.writeDeadline = t.UnixNano()
132+
}
98133
return nil
99134
}
100135

@@ -408,8 +443,14 @@ func (c *connection) waitRead(n int) (err error) {
408443
}
409444
atomic.StoreInt64(&c.waitReadSize, int64(n))
410445
defer atomic.StoreInt64(&c.waitReadSize, 0)
411-
if c.readTimeout > 0 {
412-
return c.waitReadWithTimeout(n)
446+
if dl := c.readDeadline; dl > 0 {
447+
timeout := time.Duration(dl - time.Now().UnixNano())
448+
if timeout <= 0 {
449+
return Exception(ErrReadTimeout, c.remoteAddr.String())
450+
}
451+
return c.waitReadWithTimeout(n, timeout)
452+
} else if c.readTimeout > 0 {
453+
return c.waitReadWithTimeout(n, c.readTimeout)
413454
}
414455
// wait full n
415456
for c.inputBuffer.Len() < n {
@@ -429,12 +470,11 @@ func (c *connection) waitRead(n int) (err error) {
429470
}
430471

431472
// waitReadWithTimeout will wait full n bytes or until timeout.
432-
func (c *connection) waitReadWithTimeout(n int) (err error) {
433-
// set read timeout
473+
func (c *connection) waitReadWithTimeout(n int, timeout time.Duration) (err error) {
434474
if c.readTimer == nil {
435-
c.readTimer = time.NewTimer(c.readTimeout)
475+
c.readTimer = time.NewTimer(timeout)
436476
} else {
437-
c.readTimer.Reset(c.readTimeout)
477+
c.readTimer.Reset(timeout)
438478
}
439479

440480
for c.inputBuffer.Len() < n {
@@ -501,15 +541,22 @@ func (c *connection) flush() error {
501541
}
502542

503543
func (c *connection) waitFlush() (err error) {
504-
if c.writeTimeout == 0 {
544+
timeout := c.writeTimeout
545+
if dl := c.writeDeadline; dl > 0 {
546+
timeout = time.Duration(dl - time.Now().UnixNano())
547+
if timeout <= 0 {
548+
return Exception(ErrWriteTimeout, c.remoteAddr.String())
549+
}
550+
}
551+
if timeout == 0 {
505552
return <-c.writeTrigger
506553
}
507554

508555
// set write timeout
509556
if c.writeTimer == nil {
510-
c.writeTimer = time.NewTimer(c.writeTimeout)
557+
c.writeTimer = time.NewTimer(timeout)
511558
} else {
512-
c.writeTimer.Reset(c.writeTimeout)
559+
c.writeTimer.Reset(timeout)
513560
}
514561

515562
select {

0 commit comments

Comments
 (0)