Skip to content

Commit a165247

Browse files
committed
Revert "Replace ListenableFuture with CompletableFuture"
This reverts commit a2cf74a as the change was made after spring-kafka 3.0.0-M5 was released. This version is required to release spring-batch 5.0.0-M5 This change will be put back after releasing 5.0.0-M5.
1 parent b524e32 commit a165247

File tree

3 files changed

+24
-23
lines changed

3 files changed

+24
-23
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,10 +21,10 @@
2121
import org.springframework.kafka.core.KafkaTemplate;
2222
import org.springframework.kafka.support.SendResult;
2323
import org.springframework.util.Assert;
24+
import org.springframework.util.concurrent.ListenableFuture;
2425

2526
import java.util.ArrayList;
2627
import java.util.List;
27-
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
2929

3030
/**
@@ -42,32 +42,32 @@ public class KafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
4242

4343
protected KafkaTemplate<K, T> kafkaTemplate;
4444

45-
private final List<CompletableFuture<SendResult<K, T>>> completableFutures = new ArrayList<>();
45+
private final List<ListenableFuture<SendResult<K, T>>> listenableFutures = new ArrayList<>();
4646

4747
private long timeout = -1;
4848

4949
@Override
5050
protected void writeKeyValue(K key, T value) {
5151
if (this.delete) {
52-
this.completableFutures.add(this.kafkaTemplate.sendDefault(key, null));
52+
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
5353
}
5454
else {
55-
this.completableFutures.add(this.kafkaTemplate.sendDefault(key, value));
55+
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
5656
}
5757
}
5858

5959
@Override
6060
protected void flush() throws Exception {
6161
this.kafkaTemplate.flush();
62-
for (var future : this.completableFutures) {
62+
for (ListenableFuture<SendResult<K, T>> future : this.listenableFutures) {
6363
if (this.timeout >= 0) {
6464
future.get(this.timeout, TimeUnit.MILLISECONDS);
6565
}
6666
else {
6767
future.get();
6868
}
6969
}
70-
this.completableFutures.clear();
70+
this.listenableFutures.clear();
7171
}
7272

7373
@Override

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Properties;
25-
import java.util.concurrent.CompletableFuture;
2625
import java.util.concurrent.ExecutionException;
2726

2827
import org.apache.kafka.clients.admin.NewTopic;
@@ -40,10 +39,12 @@
4039
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4140
import org.springframework.kafka.core.KafkaTemplate;
4241
import org.springframework.kafka.core.ProducerFactory;
42+
import org.springframework.kafka.support.SendResult;
4343
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4444
import org.springframework.kafka.test.context.EmbeddedKafka;
4545
import org.springframework.kafka.test.utils.KafkaTestUtils;
4646
import org.springframework.test.context.junit.jupiter.SpringExtension;
47+
import org.springframework.util.concurrent.ListenableFuture;
4748

4849
import static org.hamcrest.MatcherAssert.assertThat;
4950
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -142,12 +143,12 @@ void testValidation() {
142143
@Test
143144
void testReadFromSinglePartition() throws ExecutionException, InterruptedException {
144145
this.template.setDefaultTopic("topic1");
145-
var futures = new ArrayList<CompletableFuture<?>>();
146+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
146147
futures.add(this.template.sendDefault("val0"));
147148
futures.add(this.template.sendDefault("val1"));
148149
futures.add(this.template.sendDefault("val2"));
149150
futures.add(this.template.sendDefault("val3"));
150-
for (var future : futures) {
151+
for (ListenableFuture<SendResult<String, String>> future : futures) {
151152
future.get();
152153
}
153154

@@ -176,12 +177,12 @@ void testReadFromSinglePartition() throws ExecutionException, InterruptedExcepti
176177
@Test
177178
void testReadFromSinglePartitionFromCustomOffset() throws ExecutionException, InterruptedException {
178179
this.template.setDefaultTopic("topic5");
179-
var futures = new ArrayList<CompletableFuture<?>>();
180+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
180181
futures.add(this.template.sendDefault("val0")); // <-- offset 0
181182
futures.add(this.template.sendDefault("val1")); // <-- offset 1
182183
futures.add(this.template.sendDefault("val2")); // <-- offset 2
183184
futures.add(this.template.sendDefault("val3")); // <-- offset 3
184-
for (var future : futures) {
185+
for (ListenableFuture<SendResult<String, String>> future : futures) {
185186
future.get();
186187
}
187188

@@ -212,10 +213,10 @@ void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Exception {
212213
// first run: read a topic from the beginning
213214

214215
this.template.setDefaultTopic("topic6");
215-
var futures = new ArrayList<CompletableFuture<?>>();
216+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
216217
futures.add(this.template.sendDefault("val0")); // <-- offset 0
217218
futures.add(this.template.sendDefault("val1")); // <-- offset 1
218-
for (var future : futures) {
219+
for (ListenableFuture<SendResult<String, String>> future : futures) {
219220
future.get();
220221
}
221222
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
@@ -266,12 +267,12 @@ void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Exception {
266267
@Test
267268
void testReadFromMultiplePartitions() throws ExecutionException, InterruptedException {
268269
this.template.setDefaultTopic("topic2");
269-
var futures = new ArrayList<CompletableFuture<?>>();
270+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
270271
futures.add(this.template.sendDefault("val0"));
271272
futures.add(this.template.sendDefault("val1"));
272273
futures.add(this.template.sendDefault("val2"));
273274
futures.add(this.template.sendDefault("val3"));
274-
for (var future : futures) {
275+
for (ListenableFuture<SendResult<String, String>> future : futures) {
275276
future.get();
276277
}
277278

@@ -294,13 +295,13 @@ void testReadFromMultiplePartitions() throws ExecutionException, InterruptedExce
294295
@Test
295296
void testReadFromSinglePartitionAfterRestart() throws ExecutionException, InterruptedException {
296297
this.template.setDefaultTopic("topic3");
297-
var futures = new ArrayList<CompletableFuture<?>>();
298+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
298299
futures.add(this.template.sendDefault("val0"));
299300
futures.add(this.template.sendDefault("val1"));
300301
futures.add(this.template.sendDefault("val2"));
301302
futures.add(this.template.sendDefault("val3"));
302303
futures.add(this.template.sendDefault("val4"));
303-
for (var future : futures) {
304+
for (ListenableFuture<SendResult<String, String>> future : futures) {
304305
future.get();
305306
}
306307
ExecutionContext executionContext = new ExecutionContext();
@@ -330,7 +331,7 @@ void testReadFromSinglePartitionAfterRestart() throws ExecutionException, Interr
330331

331332
@Test
332333
void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, InterruptedException {
333-
var futures = new ArrayList<CompletableFuture<?>>();
334+
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
334335
futures.add(this.template.send("topic4", 0, null, "val0"));
335336
futures.add(this.template.send("topic4", 0, null, "val2"));
336337
futures.add(this.template.send("topic4", 0, null, "val4"));
@@ -340,7 +341,7 @@ void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, Int
340341
futures.add(this.template.send("topic4", 1, null, "val5"));
341342
futures.add(this.template.send("topic4", 1, null, "val7"));
342343

343-
for (var future : futures) {
344+
for (ListenableFuture<?> future : futures) {
344345
future.get();
345346
}
346347

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717

1818
import java.util.Arrays;
1919
import java.util.List;
20-
import java.util.concurrent.CompletableFuture;
2120
import java.util.concurrent.TimeUnit;
2221

2322
import org.junit.jupiter.api.BeforeEach;
@@ -30,6 +29,7 @@
3029
import org.springframework.core.convert.converter.Converter;
3130
import org.springframework.kafka.core.KafkaTemplate;
3231
import org.springframework.kafka.support.SendResult;
32+
import org.springframework.util.concurrent.ListenableFuture;
3333

3434
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3535
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,7 +46,7 @@ class KafkaItemWriterTests {
4646
private KafkaTemplate<String, String> kafkaTemplate;
4747

4848
@Mock
49-
private CompletableFuture<SendResult<String, String>> future;
49+
private ListenableFuture<SendResult<String, String>> future;
5050

5151
private KafkaItemKeyMapper itemKeyMapper;
5252

0 commit comments

Comments
 (0)