19
19
import java .io .ByteArrayOutputStream ;
20
20
import java .io .IOException ;
21
21
import java .io .InputStream ;
22
+ import java .io .UncheckedIOException ;
22
23
import java .lang .reflect .Method ;
23
24
import java .net .ConnectException ;
24
25
import java .net .InetAddress ;
76
77
import org .springframework .messaging .MessagingException ;
77
78
import org .springframework .messaging .support .ErrorMessage ;
78
79
import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
80
+ import org .springframework .util .ReflectionUtils ;
79
81
import org .springframework .util .StopWatch ;
80
82
81
83
import static org .assertj .core .api .Assertions .assertThat ;
84
+ import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
85
+ import static org .assertj .core .api .Assertions .assertThatIllegalArgumentException ;
82
86
import static org .assertj .core .api .Assertions .fail ;
83
87
import static org .awaitility .Awaitility .await ;
84
88
import static org .awaitility .Awaitility .with ;
@@ -125,10 +129,10 @@ public void testWriteTimeout(TestInfo testInfo) throws Exception {
125
129
s .close ();
126
130
}
127
131
catch (Exception e ) {
128
- e . printStackTrace ( );
132
+ ReflectionUtils . rethrowRuntimeException ( e );
129
133
}
130
134
});
131
- assertThat (latch .await (10000 , TimeUnit .MILLISECONDS )).isTrue ();
135
+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
132
136
TcpNioClientConnectionFactory factory =
133
137
new TcpNioClientConnectionFactory ("localhost" , serverSocket .get ().getLocalPort ());
134
138
factory .setLookupHost (true );
@@ -174,10 +178,10 @@ public void testReadTimeout(TestInfo testInfo) throws Exception {
174
178
done .await (10 , TimeUnit .SECONDS );
175
179
}
176
180
catch (Exception e ) {
177
- e . printStackTrace ( );
181
+ ReflectionUtils . rethrowRuntimeException ( e );
178
182
}
179
183
});
180
- assertThat (latch .await (10000 , TimeUnit .MILLISECONDS )).isTrue ();
184
+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
181
185
TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory ("localhost" ,
182
186
serverSocket .get ().getLocalPort ());
183
187
factory .setApplicationEventPublisher (nullPublisher );
@@ -215,10 +219,10 @@ public void testMemoryLeak(TestInfo testInfo) throws Exception {
215
219
readFully (socket .getInputStream (), b );
216
220
}
217
221
catch (Exception e ) {
218
- e . printStackTrace ( );
222
+ ReflectionUtils . rethrowRuntimeException ( e );
219
223
}
220
224
});
221
- assertThat (latch .await (10000 , TimeUnit .MILLISECONDS )).isTrue ();
225
+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
222
226
TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory ("localhost" ,
223
227
serverSocket .get ().getLocalPort ());
224
228
factory .setApplicationEventPublisher (nullPublisher );
@@ -231,10 +235,9 @@ public void testMemoryLeak(TestInfo testInfo) throws Exception {
231
235
connection .close ();
232
236
assertThat (!connection .isOpen ()).isTrue ();
233
237
TestUtils .getPropertyValue (factory , "selector" , Selector .class ).wakeup ();
234
- await ().atMost (Duration .ofSeconds (10 )).until (() -> connections . size () == 0 );
238
+ await ().atMost (Duration .ofSeconds (10 )).until (connections :: isEmpty );
235
239
}
236
240
catch (Exception e ) {
237
- e .printStackTrace ();
238
241
fail ("Unexpected exception " + e );
239
242
}
240
243
factory .stop ();
@@ -330,16 +333,12 @@ public void testInsufficientThreads() throws Exception {
330
333
}
331
334
return null ;
332
335
});
333
- try {
334
- Object o = future .get (10 , TimeUnit .SECONDS );
335
- fail ("Expected exception, got " + o );
336
- }
337
- catch (ExecutionException e ) {
338
- assertThat (e .getCause ().getMessage ()).isEqualTo ("Timed out waiting for buffer space" );
339
- }
340
- finally {
341
- exec .shutdownNow ();
342
- }
336
+
337
+ assertThatExceptionOfType (ExecutionException .class )
338
+ .isThrownBy (() -> future .get (10 , TimeUnit .SECONDS ))
339
+ .withStackTraceContaining ("Timed out waiting for buffer space" );
340
+
341
+ exec .shutdownNow ();
343
342
}
344
343
345
344
@ Test
@@ -374,7 +373,6 @@ public void testSufficientThreads() throws Exception {
374
373
}
375
374
}
376
375
catch (Exception e ) {
377
- e .printStackTrace ();
378
376
throw (Exception ) e .getCause ();
379
377
}
380
378
return null ;
@@ -445,18 +443,13 @@ public void testByteArrayReadWithBadArgs() throws Exception {
445
443
.getPropertyValue ("channelInputStream" );
446
444
stream .write (ByteBuffer .wrap ("foo" .getBytes ()));
447
445
byte [] out = new byte [5 ];
448
- try {
449
- stream .read (out , 1 , 5 );
450
- fail ("Expected IndexOutOfBoundsException" );
451
- }
452
- catch (IndexOutOfBoundsException e ) {
453
- }
454
- try {
455
- stream .read (null , 1 , 5 );
456
- fail ("Expected IllegalArgumentException" );
457
- }
458
- catch (IllegalArgumentException e ) {
459
- }
446
+
447
+ assertThatExceptionOfType (IndexOutOfBoundsException .class )
448
+ .isThrownBy (() -> stream .read (out , 1 , 5 ));
449
+
450
+ assertThatIllegalArgumentException ()
451
+ .isThrownBy (() -> stream .read (null , 1 , 5 ));
452
+
460
453
assertThat (stream .read (out , 0 , 0 )).isEqualTo (0 );
461
454
assertThat (stream .read (out )).isEqualTo (3 );
462
455
}
@@ -476,7 +469,7 @@ public void testByteArrayBlocksForZeroRead() throws Exception {
476
469
stream .read (out );
477
470
}
478
471
catch (IOException e ) {
479
- e . printStackTrace ( );
472
+ throw new UncheckedIOException ( e );
480
473
}
481
474
latch .countDown ();
482
475
});
@@ -514,16 +507,12 @@ public Integer answer(InvocationOnMock invocation) {
514
507
SocketChannel outChannel = mock (SocketChannel .class );
515
508
when (outChannel .socket ()).thenReturn (outSocket );
516
509
TcpNioConnection outboundConnection = new TcpNioConnection (outChannel , true , false , nullPublisher , null );
517
- doAnswer (new Answer <Object >() {
518
-
519
- @ Override
520
- public Object answer (InvocationOnMock invocation ) throws Throwable {
521
- ByteBuffer buff = invocation .getArgument (0 );
522
- byte [] bytes = new byte [buff .limit ()];
523
- buff .get (bytes );
524
- written .write (bytes );
525
- return null ;
526
- }
510
+ doAnswer (invocation -> {
511
+ ByteBuffer buff = invocation .getArgument (0 );
512
+ byte [] bytes = new byte [buff .limit ()];
513
+ buff .get (bytes );
514
+ written .write (bytes );
515
+ return null ;
527
516
}).when (outChannel ).write (any (ByteBuffer .class ));
528
517
529
518
MapMessageConverter outConverter = new MapMessageConverter ();
@@ -539,14 +528,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
539
528
540
529
final AtomicReference <Message <?>> inboundMessage = new AtomicReference <Message <?>>();
541
530
final CountDownLatch latch = new CountDownLatch (1 );
542
- TcpListener listener = new TcpListener () {
543
-
544
- @ Override
545
- public boolean onMessage (Message <?> message ) {
546
- inboundMessage .set (message );
547
- latch .countDown ();
548
- return false ;
549
- }
531
+ TcpListener listener = message1 -> {
532
+ inboundMessage .set (message1 );
533
+ latch .countDown ();
534
+ return false ;
550
535
};
551
536
inboundConnection .registerListener (listener );
552
537
inboundConnection .readPacket ();
@@ -565,19 +550,14 @@ public void testAssemblerUsesSecondaryExecutor() throws Exception {
565
550
566
551
factory .setSoTimeout (1000 );
567
552
factory .setTaskExecutor (compositeExec );
568
- final AtomicReference <String > threadName = new AtomicReference <String >();
553
+ final AtomicReference <String > threadName = new AtomicReference <>();
569
554
final CountDownLatch latch = new CountDownLatch (1 );
570
- factory .registerListener (new TcpListener () {
571
-
572
- @ Override
573
- public boolean onMessage (Message <?> message ) {
574
- if (!(message instanceof ErrorMessage )) {
575
- threadName .set (Thread .currentThread ().getName ());
576
- latch .countDown ();
577
- }
578
- return false ;
555
+ factory .registerListener (message -> {
556
+ if (!(message instanceof ErrorMessage )) {
557
+ threadName .set (Thread .currentThread ().getName ());
558
+ latch .countDown ();
579
559
}
580
-
560
+ return false ;
581
561
});
582
562
factory .start ();
583
563
TestingUtilities .waitListening (factory , null );
@@ -598,7 +578,7 @@ public boolean onMessage(Message<?> message) {
598
578
socket .getOutputStream ().write ("foo\r \n " .getBytes ());
599
579
socket .close ();
600
580
601
- assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
581
+ assertThat (latch .await (30 , TimeUnit .SECONDS )).isTrue ();
602
582
assertThat (threadName .get ()).contains ("assembler" );
603
583
604
584
factory .stop ();
@@ -720,17 +700,12 @@ public void publishEvent(Object event) {
720
700
});
721
701
final CountDownLatch assemblerLatch = new CountDownLatch (1 );
722
702
final AtomicReference <Thread > assembler = new AtomicReference <Thread >();
723
- factory .registerListener (new TcpListener () {
724
-
725
- @ Override
726
- public boolean onMessage (Message <?> message ) {
727
- if (!(message instanceof ErrorMessage )) {
728
- assembler .set (Thread .currentThread ());
729
- assemblerLatch .countDown ();
730
- }
731
- return false ;
703
+ factory .registerListener (message -> {
704
+ if (!(message instanceof ErrorMessage )) {
705
+ assembler .set (Thread .currentThread ());
706
+ assemblerLatch .countDown ();
732
707
}
733
-
708
+ return false ;
734
709
});
735
710
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor ();
736
711
te .setCorePoolSize (3 ); // selector, reader, assembler
@@ -756,38 +731,26 @@ public boolean onMessage(Message<?> message) {
756
731
757
732
final CountDownLatch readerLatch = new CountDownLatch (4 ); // 3 dataAvailable, 1 continuing
758
733
final CountDownLatch readerFinishedLatch = new CountDownLatch (1 );
759
- doAnswer (new Answer <Void >() {
760
-
761
- @ Override
762
- public Void answer (InvocationOnMock invocation ) throws Throwable {
763
- invocation .callRealMethod ();
764
- // delay the reader thread resetting writingToPipe
765
- readerLatch .await (10 , TimeUnit .SECONDS );
766
- Thread .sleep (100 );
767
- readerFinishedLatch .countDown ();
768
- return null ;
769
- }
734
+ doAnswer (invocation -> {
735
+ invocation .callRealMethod ();
736
+ // delay the reader thread resetting writingToPipe
737
+ readerLatch .await (10 , TimeUnit .SECONDS );
738
+ Thread .sleep (100 );
739
+ readerFinishedLatch .countDown ();
740
+ return null ;
770
741
}).when (cis ).write (any (ByteBuffer .class ));
771
742
772
743
doReturn (true ).when (logger ).isTraceEnabled ();
773
- doAnswer (new Answer <Void >() {
774
-
775
- @ Override
776
- public Void answer (InvocationOnMock invocation ) throws Throwable {
777
- invocation .callRealMethod ();
778
- readerLatch .countDown ();
779
- return null ;
780
- }
744
+ doAnswer (invocation -> {
745
+ invocation .callRealMethod ();
746
+ readerLatch .countDown ();
747
+ return null ;
781
748
}).when (logger ).trace (contains ("checking data avail" ));
782
749
783
- doAnswer (new Answer <Void >() {
784
-
785
- @ Override
786
- public Void answer (InvocationOnMock invocation ) throws Throwable {
787
- invocation .callRealMethod ();
788
- readerLatch .countDown ();
789
- return null ;
790
- }
750
+ doAnswer (invocation -> {
751
+ invocation .callRealMethod ();
752
+ readerLatch .countDown ();
753
+ return null ;
791
754
}).when (logger ).trace (contains ("Nio assembler continuing" ));
792
755
793
756
socket .getOutputStream ().write ("foo\r \n " .getBytes ());
0 commit comments