Skip to content

Commit b47f243

Browse files
committed
update netpoll
1 parent 851b6ee commit b47f243

File tree

1 file changed

+236
-0
lines changed

1 file changed

+236
-0
lines changed

netpoll.md

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,3 +861,239 @@ func dropg() {
861861
因为调度中有讲,这里就不赘述了。
862862

863863
### 读写超时
864+
865+
```go
866+
// SetReadDeadline sets the read deadline on the underlying connection.
867+
// A zero value for t means Read will not time out.
868+
func (c *Conn) SetReadDeadline(t time.Time) error {
869+
return c.conn.SetReadDeadline(t)
870+
}
871+
872+
// SetWriteDeadline sets the write deadline on the underlying connection.
873+
// A zero value for t means Write will not time out.
874+
// After a Write has timed out, the TLS state is corrupt and all future writes will return the same error.
875+
func (c *Conn) SetWriteDeadline(t time.Time) error {
876+
return c.conn.SetWriteDeadline(t)
877+
}
878+
```
879+
880+
```go
881+
// SetReadDeadline implements the Conn SetReadDeadline method.
882+
func (c *conn) SetReadDeadline(t time.Time) error {
883+
if !c.ok() {
884+
return syscall.EINVAL
885+
}
886+
if err := c.fd.pfd.SetReadDeadline(t); err != nil {
887+
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
888+
}
889+
return nil
890+
}
891+
892+
// SetWriteDeadline implements the Conn SetWriteDeadline method.
893+
func (c *conn) SetWriteDeadline(t time.Time) error {
894+
if !c.ok() {
895+
return syscall.EINVAL
896+
}
897+
if err := c.fd.pfd.SetWriteDeadline(t); err != nil {
898+
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
899+
}
900+
return nil
901+
}
902+
```
903+
904+
```go
905+
// SetReadDeadline sets the read deadline associated with fd.
906+
func (fd *FD) SetReadDeadline(t time.Time) error {
907+
return setDeadlineImpl(fd, t, 'r')
908+
}
909+
910+
// SetWriteDeadline sets the write deadline associated with fd.
911+
func (fd *FD) SetWriteDeadline(t time.Time) error {
912+
return setDeadlineImpl(fd, t, 'w')
913+
}
914+
```
915+
916+
```go
917+
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
918+
diff := int64(time.Until(t))
919+
d := runtimeNano() + diff
920+
if d <= 0 && diff > 0 {
921+
// If the user has a deadline in the future, but the delay calculation
922+
// overflows, then set the deadline to the maximum possible value.
923+
d = 1<<63 - 1
924+
}
925+
if t.IsZero() {
926+
d = 0
927+
}
928+
if err := fd.incref(); err != nil {
929+
return err
930+
}
931+
defer fd.decref()
932+
if fd.pd.runtimeCtx == 0 {
933+
return ErrNoDeadline
934+
}
935+
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
936+
return nil
937+
}
938+
```
939+
940+
```go
941+
//go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
942+
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
943+
lock(&pd.lock)
944+
if pd.closing {
945+
unlock(&pd.lock)
946+
return
947+
}
948+
pd.seq++ // invalidate current timers
949+
// Reset current timers.
950+
if pd.rt.f != nil {
951+
deltimer(&pd.rt)
952+
pd.rt.f = nil
953+
}
954+
if pd.wt.f != nil {
955+
deltimer(&pd.wt)
956+
pd.wt.f = nil
957+
}
958+
// Setup new timers.
959+
if d != 0 && d <= nanotime() {
960+
d = -1
961+
}
962+
if mode == 'r' || mode == 'r'+'w' {
963+
pd.rd = d
964+
}
965+
if mode == 'w' || mode == 'r'+'w' {
966+
pd.wd = d
967+
}
968+
if pd.rd > 0 && pd.rd == pd.wd {
969+
pd.rt.f = netpollDeadline
970+
pd.rt.when = pd.rd
971+
// Copy current seq into the timer arg.
972+
// Timer func will check the seq against current descriptor seq,
973+
// if they differ the descriptor was reused or timers were reset.
974+
pd.rt.arg = pd
975+
pd.rt.seq = pd.seq
976+
addtimer(&pd.rt)
977+
} else {
978+
if pd.rd > 0 {
979+
pd.rt.f = netpollReadDeadline
980+
pd.rt.when = pd.rd
981+
pd.rt.arg = pd
982+
pd.rt.seq = pd.seq
983+
addtimer(&pd.rt)
984+
}
985+
if pd.wd > 0 {
986+
pd.wt.f = netpollWriteDeadline
987+
pd.wt.when = pd.wd
988+
pd.wt.arg = pd
989+
pd.wt.seq = pd.seq
990+
addtimer(&pd.wt)
991+
}
992+
}
993+
// If we set the new deadline in the past, unblock currently pending IO if any.
994+
var rg, wg *g
995+
atomicstorep(unsafe.Pointer(&wg), nil) // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
996+
if pd.rd < 0 {
997+
rg = netpollunblock(pd, 'r', false)
998+
}
999+
if pd.wd < 0 {
1000+
wg = netpollunblock(pd, 'w', false)
1001+
}
1002+
unlock(&pd.lock)
1003+
if rg != nil {
1004+
netpollgoready(rg, 3)
1005+
}
1006+
if wg != nil {
1007+
netpollgoready(wg, 3)
1008+
}
1009+
}
1010+
```
1011+
1012+
```go
1013+
func netpollDeadline(arg interface{}, seq uintptr) {
1014+
netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
1015+
}
1016+
1017+
func netpollReadDeadline(arg interface{}, seq uintptr) {
1018+
netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
1019+
}
1020+
```
1021+
1022+
```go
1023+
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
1024+
lock(&pd.lock)
1025+
// Seq arg is seq when the timer was set.
1026+
// If it's stale, ignore the timer event.
1027+
if seq != pd.seq {
1028+
// The descriptor was reused or timers were reset.
1029+
unlock(&pd.lock)
1030+
return
1031+
}
1032+
var rg *g
1033+
if read {
1034+
if pd.rd <= 0 || pd.rt.f == nil {
1035+
throw("runtime: inconsistent read deadline")
1036+
}
1037+
pd.rd = -1
1038+
atomicstorep(unsafe.Pointer(&pd.rt.f), nil) // full memory barrier between store to rd and load of rg in netpollunblock
1039+
rg = netpollunblock(pd, 'r', false)
1040+
}
1041+
var wg *g
1042+
if write {
1043+
if pd.wd <= 0 || pd.wt.f == nil && !read {
1044+
throw("runtime: inconsistent write deadline")
1045+
}
1046+
pd.wd = -1
1047+
atomicstorep(unsafe.Pointer(&pd.wt.f), nil) // full memory barrier between store to wd and load of wg in netpollunblock
1048+
wg = netpollunblock(pd, 'w', false)
1049+
}
1050+
unlock(&pd.lock)
1051+
if rg != nil {
1052+
netpollgoready(rg, 0)
1053+
}
1054+
if wg != nil {
1055+
netpollgoready(wg, 0)
1056+
}
1057+
}
1058+
```
1059+
1060+
```go
1061+
func netpollgoready(gp *g, traceskip int) {
1062+
atomic.Xadd(&netpollWaiters, -1)
1063+
goready(gp, traceskip+1)
1064+
}
1065+
1066+
func goready(gp *g, traceskip int) {
1067+
systemstack(func() {
1068+
ready(gp, traceskip, true)
1069+
})
1070+
}
1071+
1072+
// Mark gp ready to run.
1073+
func ready(gp *g, traceskip int, next bool) {
1074+
if trace.enabled {
1075+
traceGoUnpark(gp, traceskip)
1076+
}
1077+
1078+
status := readgstatus(gp)
1079+
1080+
// Mark runnable.
1081+
_g_ := getg()
1082+
_g_.m.locks++ // disable preemption because it can be holding p in a local var
1083+
if status&^_Gscan != _Gwaiting {
1084+
dumpgstatus(gp)
1085+
throw("bad g->status in ready")
1086+
}
1087+
1088+
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
1089+
casgstatus(gp, _Gwaiting, _Grunnable)
1090+
runqput(_g_.m.p.ptr(), gp, next)
1091+
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
1092+
wakep()
1093+
}
1094+
_g_.m.locks--
1095+
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack
1096+
_g_.stackguard0 = stackPreempt
1097+
}
1098+
}
1099+
```

0 commit comments

Comments
 (0)