@@ -80,6 +80,12 @@ func (c *connection) closeBuffer() {
8080
8181// inputs implements FDOperator.
8282func (c * connection ) inputs (vs [][]byte ) (rs [][]byte ) {
83+ // trigger throttle
84+ if c .readBufferThreshold > 0 && int64 (c .inputBuffer .Len ()) >= c .readBufferThreshold {
85+ c .pauseRead ()
86+ return
87+ }
88+
8389 vs [0 ] = c .inputBuffer .book (c .bookSize , c .maxSize )
8490 return vs [:1 ]
8591}
@@ -123,6 +129,7 @@ func (c *connection) inputAck(n int) (err error) {
123129func (c * connection ) outputs (vs [][]byte ) (rs [][]byte , supportZeroCopy bool ) {
124130 if c .outputBuffer .IsEmpty () {
125131 c .pauseWrite ()
132+ c .triggerWrite (nil )
126133 return rs , c .supportZeroCopy
127134 }
128135 rs = c .outputBuffer .GetBytes (vs )
@@ -137,50 +144,47 @@ func (c *connection) outputAck(n int) (err error) {
137144 }
138145 if c .outputBuffer .IsEmpty () {
139146 c .pauseWrite ()
147+ c .triggerWrite (nil )
140148 }
141149 return nil
142150}
143151
152+ /* The race description of operator event monitoring
153+ - Pause operation will remove old event monitor of operator
154+ - Resume operation will add new event monitor of operator
155+ - Only poller could use Pause to remove event monitor, and poller already hold the op.do() locker
156+ - Only user could use Resume, and user's operation maybe compete with poller's operation
157+ - If competition happen, because of all resume operation will monitor all events, it's safe to do that with a race condition.
158+ * If resume first and pause latter, poller will monitor the accurate events it needs.
159+ * If pause first and resume latter, poller will monitor the duplicate events which will be removed after next poller triggered.
160+ And poller will ensure to remove the duplicate events.
161+ - If there is no readBufferThreshold option, the code path will be more simple and efficient.
162+ */
163+
144164// pauseWrite removed the monitoring of write events.
145165// pauseWrite used in poller
146166func (c * connection ) pauseWrite () {
147- switch c .operator .getMode () {
148- case opreadwrite :
149- c . operator . Control ( PollRW2R )
150- case opwrite :
151- c . operator . Control ( PollW2Hup )
152- }
153- c .triggerWrite ( nil )
167+ c .operator .Control ( PollRW2R )
168+ }
169+
170+ // resumeWrite add the monitoring of write events.
171+ // resumeWrite used by users
172+ func ( c * connection ) resumeWrite () {
173+ c .operator . Control ( PollR2RW )
154174}
155175
156176// pauseRead removed the monitoring of read events.
157177// pauseRead used in poller
158178func (c * connection ) pauseRead () {
159- // Note that the poller ensure that every fd should read all left data in socket buffer before detach it.
160- // So the operator mode should never be ophup.
161- var changeTo PollEvent
162- switch c .operator .getMode () {
163- case opread :
164- changeTo = PollR2Hup
165- case opreadwrite :
166- changeTo = PollRW2W
167- }
168- if changeTo > 0 && atomic .CompareAndSwapInt32 (& c .operator .throttled , 0 , 1 ) {
169- c .operator .Control (changeTo )
179+ if atomic .CompareAndSwapInt32 (& c .operator .throttled , 0 , 1 ) {
180+ c .operator .Control (PollRW2W )
170181 }
171182}
172183
173184// resumeRead add the monitoring of read events.
174185// resumeRead used by users
175186func (c * connection ) resumeRead () {
176- var changeTo PollEvent
177- switch c .operator .getMode () {
178- case ophup :
179- changeTo = PollHup2R
180- case opwrite :
181- changeTo = PollW2RW
182- }
183- if changeTo > 0 && atomic .CompareAndSwapInt32 (& c .operator .throttled , 1 , 0 ) {
184- c .operator .Control (changeTo )
187+ if atomic .CompareAndSwapInt32 (& c .operator .throttled , 1 , 0 ) {
188+ c .operator .Control (PollW2RW )
185189 }
186190}
0 commit comments