Skip to content

Commit bec9318

Browse files
authored
Add concurrency to PulsarContainerProperties
This commit adds concurrency property to `PulsarContainerProperties` instead of managing it directly on `ConcurrentPulsarListenerContainerFactory`, which provides consistency with both reactive counterpart and container properties in general. Resolves: #820
1 parent 136d465 commit bec9318

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
* @author Soby Chacko
3737
* @author Chris Bono
3838
* @author Alexander Preuß
39+
* @author Vedran Pavic
3940
*/
4041
public class ConcurrentPulsarListenerContainerFactory<T>
4142
extends AbstractPulsarListenerContainerFactory<ConcurrentPulsarMessageListenerContainer<T>, T> {
@@ -44,8 +45,6 @@ public class ConcurrentPulsarListenerContainerFactory<T>
4445

4546
private static final AtomicInteger COUNTER = new AtomicInteger();
4647

47-
private Integer concurrency;
48-
4948
public ConcurrentPulsarListenerContainerFactory(PulsarConsumerFactory<? super T> consumerFactory,
5049
PulsarContainerProperties containerProperties) {
5150
super(consumerFactory, containerProperties);
@@ -55,8 +54,9 @@ public ConcurrentPulsarListenerContainerFactory(PulsarConsumerFactory<? super T>
5554
* Specify the container concurrency.
5655
* @param concurrency the number of consumers to create.
5756
*/
57+
@Deprecated(since = "1.2.0", forRemoval = true)
5858
public void setConcurrency(Integer concurrency) {
59-
this.concurrency = concurrency;
59+
getContainerProperties().setConcurrency(concurrency);
6060
}
6161

6262
@Override
@@ -71,7 +71,6 @@ public Collection<String> getTopics() {
7171
};
7272
ConcurrentPulsarMessageListenerContainer<T> container = createContainerInstance(endpoint);
7373
initializeContainer(container, endpoint);
74-
// customizeContainer(container);
7574
return container;
7675
}
7776

@@ -130,8 +129,8 @@ protected void initializeContainer(ConcurrentPulsarMessageListenerContainer<T> i
130129
if (endpoint.getConcurrency() != null) {
131130
instance.setConcurrency(endpoint.getConcurrency());
132131
}
133-
else if (this.concurrency != null) {
134-
instance.setConcurrency(this.concurrency);
132+
else if (getContainerProperties().getConcurrency() > 0) {
133+
instance.setConcurrency(getContainerProperties().getConcurrency());
135134
}
136135
}
137136

spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,6 +46,7 @@
4646
* @author Soby Chacko
4747
* @author Alexander Preuß
4848
* @author Chris Bono
49+
* @author Vedran Pavic
4950
*/
5051
public class PulsarContainerProperties {
5152

@@ -77,6 +78,8 @@ public class PulsarContainerProperties {
7778

7879
private AsyncTaskExecutor consumerTaskExecutor;
7980

81+
private int concurrency = 1;
82+
8083
private int maxNumMessages = -1;
8184

8285
private int maxNumBytes = 10 * 1024 * 1024;
@@ -127,6 +130,14 @@ public void setConsumerTaskExecutor(AsyncTaskExecutor consumerExecutor) {
127130
this.consumerTaskExecutor = consumerExecutor;
128131
}
129132

133+
public int getConcurrency() {
134+
return this.concurrency;
135+
}
136+
137+
public void setConcurrency(int concurrency) {
138+
this.concurrency = concurrency;
139+
}
140+
130141
public SubscriptionType getSubscriptionType() {
131142
return this.subscriptionType;
132143
}

spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,9 +63,9 @@ void createConcurrentContainerFromFactoryAndVerifyBatchReceivePolicy() {
6363
containerProperties.setBatchTimeoutMillis(60_000);
6464
containerProperties.setMaxNumMessages(120);
6565
containerProperties.setMaxNumBytes(32000);
66+
containerProperties.setConcurrency(1);
6667
ConcurrentPulsarListenerContainerFactory<String> containerFactory = new ConcurrentPulsarListenerContainerFactory<>(
6768
consumerFactory, containerProperties);
68-
containerFactory.setConcurrency(1);
6969
PulsarListenerEndpoint pulsarListenerEndpoint = mock(PulsarListenerEndpoint.class);
7070
when(pulsarListenerEndpoint.getConcurrency()).thenReturn(1);
7171

0 commit comments

Comments
 (0)