@@ -36,46 +36,80 @@ public static void EfficientCopyTo(this Stream input, Stream output)
3636 }
3737 }
3838
39- public static async Task < int > ReadAsync ( this Stream stream , byte [ ] buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
39+ public static int Read ( this Stream stream , byte [ ] buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
4040 {
41- var state = 1 ; // 1 == reading, 2 == done reading, 3 == timedout, 4 == cancelled
42-
43- var bytesRead = 0 ;
44- using ( new Timer ( _ => ChangeState ( 3 ) , null , timeout , Timeout . InfiniteTimeSpan ) )
45- using ( cancellationToken . Register ( ( ) => ChangeState ( 4 ) ) )
41+ try
4642 {
47- try
48- {
49- bytesRead = await stream . ReadAsync ( buffer , offset , count , cancellationToken ) . ConfigureAwait ( false ) ;
50- ChangeState ( 2 ) ; // note: might not actually go to state 2 if already in state 3 or 4
51- }
52- catch when ( state == 1 )
53- {
54- try { stream . Dispose ( ) ; } catch { }
55- throw ;
56- }
57- catch when ( state >= 3 )
43+ using var manualResetEvent = new ManualResetEventSlim ( ) ;
44+ var readOperation = stream . BeginRead (
45+ buffer ,
46+ offset ,
47+ count ,
48+ state => ( ( ManualResetEventSlim ) state . AsyncState ) . Set ( ) ,
49+ manualResetEvent ) ;
50+
51+ if ( readOperation . IsCompleted || manualResetEvent . Wait ( timeout , cancellationToken ) )
5852 {
59- // a timeout or operation cancelled exception will be thrown instead
53+ return stream . EndRead ( readOperation ) ;
6054 }
55+ }
56+ catch ( OperationCanceledException )
57+ {
58+ // Have to suppress OperationCanceledException here, it will be thrown after the stream will be disposed.
59+ }
60+ catch ( ObjectDisposedException )
61+ {
62+ throw new IOException ( ) ;
63+ }
6164
62- if ( state == 3 ) { throw new TimeoutException ( ) ; }
63- if ( state == 4 ) { throw new OperationCanceledException ( ) ; }
65+ try
66+ {
67+ stream . Dispose ( ) ;
68+ }
69+ catch
70+ {
71+ // Ignore any exceptions
6472 }
6573
66- return bytesRead ;
74+ cancellationToken . ThrowIfCancellationRequested ( ) ;
75+ throw new TimeoutException ( ) ;
76+ }
6777
68- void ChangeState ( int to )
78+ public static async Task < int > ReadAsync ( this Stream stream , byte [ ] buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
79+ {
80+ Task < int > readTask = null ;
81+ try
82+ {
83+ readTask = stream . ReadAsync ( buffer , offset , count ) ;
84+ return await readTask . WaitAsync ( timeout , cancellationToken ) . ConfigureAwait ( false ) ;
85+ }
86+ catch ( ObjectDisposedException )
6987 {
70- var from = Interlocked . CompareExchange ( ref state , to , 1 ) ;
71- if ( from == 1 && to >= 3 )
88+ // It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true.
89+ throw new IOException ( ) ;
90+ }
91+ catch ( Exception ex ) when ( ex is OperationCanceledException or TimeoutException )
92+ {
93+ // await Task.WaitAsync() throws OperationCanceledException in case of cancellation and TimeoutException in case of timeout
94+ try
7295 {
73- try { stream . Dispose ( ) ; } catch { } // disposing the stream aborts the read attempt
96+ stream . Dispose ( ) ;
97+ if ( readTask != null )
98+ {
99+ // Should await on the task to avoid UnobservedTaskException
100+ await readTask . ConfigureAwait ( false ) ;
101+ }
74102 }
103+ catch
104+ {
105+ // Ignore any exceptions
106+ }
107+
108+ throw ;
75109 }
76110 }
77111
78- public static void ReadBytes ( this Stream stream , byte [ ] buffer , int offset , int count , CancellationToken cancellationToken )
112+ public static void ReadBytes ( this Stream stream , byte [ ] buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
79113 {
80114 Ensure . IsNotNull ( stream , nameof ( stream ) ) ;
81115 Ensure . IsNotNull ( buffer , nameof ( buffer ) ) ;
@@ -84,7 +118,7 @@ public static void ReadBytes(this Stream stream, byte[] buffer, int offset, int
84118
85119 while ( count > 0 )
86120 {
87- var bytesRead = stream . Read ( buffer , offset , count ) ; // TODO: honor cancellationToken?
121+ var bytesRead = stream . Read ( buffer , offset , count , timeout , cancellationToken ) ;
88122 if ( bytesRead == 0 )
89123 {
90124 throw new EndOfStreamException ( ) ;
@@ -94,7 +128,7 @@ public static void ReadBytes(this Stream stream, byte[] buffer, int offset, int
94128 }
95129 }
96130
97- public static void ReadBytes ( this Stream stream , IByteBuffer buffer , int offset , int count , CancellationToken cancellationToken )
131+ public static void ReadBytes ( this Stream stream , IByteBuffer buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
98132 {
99133 Ensure . IsNotNull ( stream , nameof ( stream ) ) ;
100134 Ensure . IsNotNull ( buffer , nameof ( buffer ) ) ;
@@ -105,7 +139,7 @@ public static void ReadBytes(this Stream stream, IByteBuffer buffer, int offset,
105139 {
106140 var backingBytes = buffer . AccessBackingBytes ( offset ) ;
107141 var bytesToRead = Math . Min ( count , backingBytes . Count ) ;
108- var bytesRead = stream . Read ( backingBytes . Array , backingBytes . Offset , bytesToRead ) ; // TODO: honor cancellationToken?
142+ var bytesRead = stream . Read ( backingBytes . Array , backingBytes . Offset , bytesToRead , timeout , cancellationToken ) ;
109143 if ( bytesRead == 0 )
110144 {
111145 throw new EndOfStreamException ( ) ;
@@ -155,44 +189,82 @@ public static async Task ReadBytesAsync(this Stream stream, IByteBuffer buffer,
155189 }
156190 }
157191
158-
159- public static async Task WriteAsync ( this Stream stream , byte [ ] buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
192+ public static void Write ( this Stream stream , byte [ ] buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
160193 {
161- var state = 1 ; // 1 == writing, 2 == done writing, 3 == timedout, 4 == cancelled
162-
163- using ( new Timer ( _ => ChangeState ( 3 ) , null , timeout , Timeout . InfiniteTimeSpan ) )
164- using ( cancellationToken . Register ( ( ) => ChangeState ( 4 ) ) )
194+ try
165195 {
166- try
167- {
168- await stream . WriteAsync ( buffer , offset , count , cancellationToken ) . ConfigureAwait ( false ) ;
169- ChangeState ( 2 ) ; // note: might not actually go to state 2 if already in state 3 or 4
170- }
171- catch when ( state == 1 )
172- {
173- try { stream . Dispose ( ) ; } catch { }
174- throw ;
175- }
176- catch when ( state >= 3 )
196+ using var manualResetEvent = new ManualResetEventSlim ( ) ;
197+ var writeOperation = stream . BeginWrite (
198+ buffer ,
199+ offset ,
200+ count ,
201+ state => ( ( ManualResetEventSlim ) state . AsyncState ) . Set ( ) ,
202+ manualResetEvent ) ;
203+
204+ if ( writeOperation . IsCompleted || manualResetEvent . Wait ( timeout , cancellationToken ) )
177205 {
178- // a timeout or operation cancelled exception will be thrown instead
206+ stream . EndWrite ( writeOperation ) ;
207+ return ;
179208 }
209+ }
210+ catch ( OperationCanceledException )
211+ {
212+ // Have to suppress OperationCanceledException here, it will be thrown after the stream will be disposed.
213+ }
214+ catch ( ObjectDisposedException )
215+ {
216+ // It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true.
217+ throw new IOException ( ) ;
218+ }
180219
181- if ( state == 3 ) { throw new TimeoutException ( ) ; }
182- if ( state == 4 ) { throw new OperationCanceledException ( ) ; }
220+ try
221+ {
222+ stream . Dispose ( ) ;
183223 }
224+ catch
225+ {
226+ // Ignore any exceptions
227+ }
228+
229+ cancellationToken . ThrowIfCancellationRequested ( ) ;
230+ throw new TimeoutException ( ) ;
231+ }
184232
185- void ChangeState ( int to )
233+ public static async Task WriteAsync ( this Stream stream , byte [ ] buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
234+ {
235+ Task writeTask = null ;
236+ try
237+ {
238+ writeTask = stream . WriteAsync ( buffer , offset , count ) ;
239+ await writeTask . WaitAsync ( timeout , cancellationToken ) . ConfigureAwait ( false ) ;
240+ }
241+ catch ( ObjectDisposedException )
242+ {
243+ // It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true.
244+ throw new IOException ( ) ;
245+ }
246+ catch ( Exception ex ) when ( ex is OperationCanceledException or TimeoutException )
186247 {
187- var from = Interlocked . CompareExchange ( ref state , to , 1 ) ;
188- if ( from == 1 && to >= 3 )
248+ // await Task.WaitAsync() throws OperationCanceledException in case of cancellation and TimeoutException in case of timeout
249+ try
189250 {
190- try { stream . Dispose ( ) ; } catch { } // disposing the stream aborts the write attempt
251+ stream . Dispose ( ) ;
252+ // Should await on the task to avoid UnobservedTaskException
253+ if ( writeTask != null )
254+ {
255+ await writeTask . ConfigureAwait ( false ) ;
256+ }
191257 }
258+ catch
259+ {
260+ // Ignore any exceptions
261+ }
262+
263+ throw ;
192264 }
193265 }
194266
195- public static void WriteBytes ( this Stream stream , IByteBuffer buffer , int offset , int count , CancellationToken cancellationToken )
267+ public static void WriteBytes ( this Stream stream , IByteBuffer buffer , int offset , int count , TimeSpan timeout , CancellationToken cancellationToken )
196268 {
197269 Ensure . IsNotNull ( stream , nameof ( stream ) ) ;
198270 Ensure . IsNotNull ( buffer , nameof ( buffer ) ) ;
@@ -204,7 +276,7 @@ public static void WriteBytes(this Stream stream, IByteBuffer buffer, int offset
204276 cancellationToken . ThrowIfCancellationRequested ( ) ;
205277 var backingBytes = buffer . AccessBackingBytes ( offset ) ;
206278 var bytesToWrite = Math . Min ( count , backingBytes . Count ) ;
207- stream . Write ( backingBytes . Array , backingBytes . Offset , bytesToWrite ) ; // TODO: honor cancellationToken?
279+ stream . Write ( backingBytes . Array , backingBytes . Offset , bytesToWrite , timeout , cancellationToken ) ;
208280 offset += bytesToWrite ;
209281 count -= bytesToWrite ;
210282 }
0 commit comments