1
1
/*
2
- * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
2
+ * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
3
3
*
4
4
* This program and the accompanying materials are made available under the
5
5
* terms of the Eclipse Public License 2.0 which is available at
22
22
import io .vertx .core .json .JsonObject ;
23
23
import io .vertx .core .net .NetClient ;
24
24
import io .vertx .core .net .NetServer ;
25
+ import io .vertx .core .net .NetSocket ;
25
26
import io .vertx .core .net .SocketAddress ;
26
27
import io .vertx .core .spi .VertxMetricsFactory ;
27
28
import io .vertx .core .spi .VertxTracerFactory ;
28
29
import io .vertx .core .spi .cluster .ClusterManager ;
29
30
import io .vertx .core .transport .Transport ;
30
31
31
32
import java .util .Arrays ;
33
+ import java .util .Objects ;
32
34
import java .util .concurrent .TimeUnit ;
35
+ import java .util .stream .Collectors ;
33
36
34
37
/**
35
38
* Created by tim on 08/01/15.
@@ -73,19 +76,17 @@ public void example7(Vertx vertx) {
73
76
vertx .executeBlocking (() -> {
74
77
// Call some blocking API that takes a significant amount of time to return
75
78
return someAPI .blockingMethod ("hello" );
76
- }).onComplete (res -> {
77
- System .out .println ("The result is: " + res .result ());
78
- });
79
+ })
80
+ .onSuccess (result -> System .out .println ("The result is: " + result ));
79
81
}
80
82
81
83
public void workerExecutor1 (Vertx vertx ) {
82
84
WorkerExecutor executor = vertx .createSharedWorkerExecutor ("my-worker-pool" );
83
85
executor .executeBlocking (() -> {
84
86
// Call some blocking API that takes a significant amount of time to return
85
87
return someAPI .blockingMethod ("hello" );
86
- }).onComplete (res -> {
87
- System .out .println ("The result is: " + res .result ());
88
- });
88
+ })
89
+ .onSuccess (result -> System .out .println ("The result is: " + result ));
89
90
}
90
91
91
92
public void workerExecutor2 (WorkerExecutor executor ) {
@@ -127,7 +128,7 @@ public void clusteredVertxBuilder(VertxOptions options, ClusterManager clusterMa
127
128
.buildClustered ();
128
129
}
129
130
130
- public void exampleFuture1 (Vertx vertx , Handler < HttpServerRequest > requestHandler ) {
131
+ public void exampleFuture1 (Vertx vertx ) {
131
132
FileSystem fs = vertx .fileSystem ();
132
133
133
134
Future <FileProps > future = fs .props ("/my_file.txt" );
@@ -142,6 +143,36 @@ public void exampleFuture1(Vertx vertx, Handler<HttpServerRequest> requestHandle
142
143
});
143
144
}
144
145
146
+ public void exampleFuture2 (Vertx vertx ) {
147
+ FileSystem fs = vertx .fileSystem ();
148
+
149
+ Future <FileProps > future = fs .props ("/my_file.txt" );
150
+
151
+ future
152
+ .onSuccess ((FileProps fileProps ) -> {
153
+ System .out .println ("File size = " + fileProps .size ());
154
+ })
155
+ .onFailure ((Throwable e ) -> {
156
+ System .out .println ("Failure: " + e .getMessage ());
157
+ });
158
+ }
159
+
160
+ public void exampleFuture3 (Vertx vertx ) {
161
+ FileSystem fs = vertx .fileSystem ();
162
+
163
+ fs .props ("/my_file.txt" )
164
+ .onSuccess (fileProps -> System .out .println ("File size = " + fileProps .size ()))
165
+ .onFailure (e -> System .out .println ("Failure: " + e .getMessage ()));
166
+ }
167
+
168
+ public void exampleFuture4 (Vertx vertx ) {
169
+ FileSystem fs = vertx .fileSystem ();
170
+
171
+ fs .props ("/my_file.txt" )
172
+ .onComplete (fileProps -> System .out .println ("File size = " + fileProps .size ()),
173
+ e -> System .out .println ("Failure: " + e .getMessage ()));
174
+ }
175
+
145
176
public void promiseCallbackOrder (Future <Void > future ) {
146
177
future .onComplete (ar -> {
147
178
// Do something
@@ -170,61 +201,55 @@ public void exampleFutureComposition1(Vertx vertx) {
170
201
});
171
202
}
172
203
173
- public void exampleFuture2 (Vertx vertx , Handler <HttpServerRequest > requestHandler ) {
174
- FileSystem fs = vertx .fileSystem ();
204
+ public void exampleFutureComposition2 (Vertx vertx ) {
175
205
176
- Future < FileProps > future = fs . props ( "/my_file.txt" );
206
+ FileSystem fs = vertx . fileSystem ( );
177
207
178
- future .onComplete ((AsyncResult <FileProps > ar ) -> {
179
- if (ar .succeeded ()) {
180
- FileProps props = ar .result ();
181
- System .out .println ("File size = " + props .size ());
182
- } else {
183
- System .out .println ("Failure: " + ar .cause ().getMessage ());
184
- }
185
- });
208
+ Future <Void > future = fs
209
+ .createFile ("/foo" )
210
+ // When the file is created (fut1), execute this:
211
+ .compose (v -> fs .writeFile ("/foo" , Buffer .buffer ()))
212
+ // When the file is written (fut2), execute this:
213
+ .compose (v -> fs .move ("/foo" , "/bar" ));
186
214
}
187
215
188
- public void exampleFutureAll1 (HttpServer httpServer , NetServer netServer ) {
216
+ public Future <?> exampleFutureAll1 (HttpServer httpServer , NetServer netServer ) {
189
217
Future <HttpServer > httpServerFuture = httpServer .listen ();
190
218
191
219
Future <NetServer > netServerFuture = netServer .listen ();
192
220
193
- Future .all (httpServerFuture , netServerFuture ).onComplete (ar -> {
194
- if (ar .succeeded ()) {
195
- // All servers started
196
- } else {
221
+ return Future .all (httpServerFuture , netServerFuture )
222
+ .onFailure (e -> {
197
223
// At least one server failed
198
- }
199
- });
224
+ });
200
225
}
201
226
202
227
public void exampleFutureAll2 (Future <?> future1 , Future <?> future2 , Future <?> future3 ) {
203
228
Future .all (Arrays .asList (future1 , future2 , future3 ));
204
229
}
205
230
206
- public void exampleFutureAny1 (Future <String > future1 , Future <String > future2 ) {
207
- Future .any (future1 , future2 ).onComplete (ar -> {
208
- if (ar .succeeded ()) {
209
- // At least one is succeeded
210
- } else {
231
+ public Future <String > exampleFutureAny1 (Future <String > future1 , Future <String > future2 ) {
232
+ return Future .any (future1 , future2 )
233
+ .map (result -> future1 .succeeded () ? future1 : future2 ) // At least one is succeeded
234
+ .onFailure (e -> {
211
235
// All failed
212
- }
213
- });
236
+ });
214
237
}
215
238
216
239
public void exampleFutureAny2 (Future <?> f1 , Future <?> f2 , Future <?> f3 ) {
217
240
Future .any (Arrays .asList (f1 , f2 , f3 ));
218
241
}
219
242
220
- public void exampleFutureJoin1 (Future <?> future1 , Future <?> future2 , Future <?> future3 ) {
221
- Future .join (future1 , future2 , future3 ).onComplete (ar -> {
222
- if (ar .succeeded ()) {
223
- // All succeeded
224
- } else {
225
- // All completed and at least one failed
226
- }
227
- });
243
+ public Future <String > exampleFutureJoin1 (Future <String > future1 , Future <String > future2 , Future <String > future3 ) {
244
+ CompositeFuture compositeFuture = Future .join (future1 , future2 , future3 );
245
+
246
+ return compositeFuture
247
+ .map (x -> {
248
+ // All completed, each is either succeeded or failed
249
+ return compositeFuture .<String >list ().stream ()
250
+ .filter (Objects ::nonNull ) // failed have null
251
+ .collect (Collectors .joining (", " ));
252
+ });
228
253
}
229
254
230
255
public void exampleFutureJoin2 (Future <?> future1 , Future <?> future2 , Future <?> future3 ) {
@@ -294,6 +319,16 @@ public void start(Promise<Void> startPromise) throws Exception {
294
319
}
295
320
});
296
321
322
+ // Or
323
+ future
324
+ .onSuccess (x -> startPromise .complete ())
325
+ .onFailure (startPromise ::fail );
326
+
327
+ // Or
328
+ future
329
+ .onComplete (x -> startPromise .complete (),
330
+ startPromise ::fail );
331
+
297
332
// Or
298
333
future
299
334
.<Void >mapEmpty ()
@@ -334,28 +369,14 @@ public void example9(Vertx vertx) {
334
369
335
370
}
336
371
337
- public void example10 (Vertx vertx ) {
338
- vertx
372
+ public Future < String > example10 (Vertx vertx ) {
373
+ return vertx
339
374
.deployVerticle (new MyOrderProcessorVerticle ())
340
- .onComplete (res -> {
341
- if (res .succeeded ()) {
342
- System .out .println ("Deployment id is: " + res .result ());
343
- } else {
344
- System .out .println ("Deployment failed!" );
345
- }
346
- });
375
+ .onSuccess (deploymentID -> System .out .println ("Deployment id is: " + deploymentID ));
347
376
}
348
377
349
378
public void example11 (Vertx vertx , String deploymentID ) {
350
- vertx
351
- .undeploy (deploymentID )
352
- .onComplete (res -> {
353
- if (res .succeeded ()) {
354
- System .out .println ("Undeployed ok" );
355
- } else {
356
- System .out .println ("Undeploy failed!" );
357
- }
358
- });
379
+ vertx .undeploy (deploymentID );
359
380
}
360
381
361
382
public void example12 (Vertx vertx ) {
@@ -516,83 +537,61 @@ public void configureBSDOptions(Vertx vertx, boolean reusePort) {
516
537
vertx .createHttpServer (new HttpServerOptions ().setReusePort (reusePort ));
517
538
}
518
539
519
- public void tcpServerWithDomainSockets (Vertx vertx ) {
540
+ public Future < NetServer > tcpServerWithDomainSockets (Vertx vertx ) {
520
541
NetServer netServer = vertx .createNetServer ();
521
542
522
543
// Only available when running on JDK16+, or using a native transport
523
544
SocketAddress address = SocketAddress .domainSocketAddress ("/var/tmp/myservice.sock" );
524
545
525
- netServer
546
+ return netServer
526
547
.connectHandler (so -> {
527
- // Handle application
548
+ // Handle application
528
549
})
529
- .listen (address )
530
- .onComplete (ar -> {
531
- if (ar .succeeded ()) {
532
- // Bound to socket
533
- } else {
534
- // Handle failure
535
- }
536
- });
550
+ .listen (address );
537
551
}
538
552
539
- public void httpServerWithDomainSockets (Vertx vertx ) {
553
+ public Future < HttpServer > httpServerWithDomainSockets (Vertx vertx ) {
540
554
HttpServer httpServer = vertx .createHttpServer ();
541
555
542
556
// Only available when running on JDK16+, or using a native transport
543
557
SocketAddress address = SocketAddress .domainSocketAddress ("/var/tmp/myservice.sock" );
544
558
545
- httpServer
559
+ return httpServer
546
560
.requestHandler (req -> {
547
561
// Handle application
548
562
})
549
- .listen (address )
550
- .onComplete (ar -> {
551
- if (ar .succeeded ()) {
552
- // Bound to socket
553
- } else {
554
- // Handle failure
555
- }
556
- });
563
+ .listen (address );
557
564
}
558
565
559
- public void tcpClientWithDomainSockets (Vertx vertx ) {
566
+ public Future < NetSocket > tcpClientWithDomainSockets (Vertx vertx ) {
560
567
NetClient netClient = vertx .createNetClient ();
561
568
562
569
// Only available when running on JDK16+, or using a native transport
563
570
SocketAddress addr = SocketAddress .domainSocketAddress ("/var/tmp/myservice.sock" );
564
571
565
572
// Connect to the server
566
- netClient
567
- .connect (addr )
568
- .onComplete (ar -> {
569
- if (ar .succeeded ()) {
570
- // Connected
571
- } else {
572
- // Handle failure
573
- }
574
- });
573
+ return netClient
574
+ .connect (addr );
575
575
}
576
576
577
- public void httpClientWithDomainSockets (Vertx vertx ) {
577
+ private Future <Void > process (Buffer buffer , String parameter ) {
578
+ return Future .succeededFuture ();
579
+ }
580
+
581
+ public Future <Void > httpClientWithDomainSockets (Vertx vertx ) {
578
582
HttpClient httpClient = vertx .createHttpClient ();
579
583
580
584
// Only available when running on JDK16+, or using a native transport
581
585
SocketAddress addr = SocketAddress .domainSocketAddress ("/var/tmp/myservice.sock" );
582
586
583
587
// Send request to the server
584
- httpClient .request (new RequestOptions ()
588
+ return httpClient .request (new RequestOptions ()
585
589
.setServer (addr )
586
590
.setHost ("localhost" )
587
591
.setPort (8080 )
588
592
.setURI ("/" ))
589
- .compose (request -> request .send ().compose (HttpClientResponse ::body ))
590
- .onComplete (ar -> {
591
- if (ar .succeeded ()) {
592
- // Process response
593
- } else {
594
- // Handle failure
595
- }
596
- });
593
+ .compose (request -> request .send ())
594
+ .compose (HttpClientResponse ::body )
595
+ .compose (buffer -> process (buffer , "some parameter" )); // Process response
597
596
}
598
597
}
0 commit comments