Skip to content

Commit 79a1268

Browse files
committed
spring-projectsGH-3444: Add Custom TTL support for RedisLock, and JdbcLock
Fixes: spring-projects#3444 * Add `DistributedLock` interface. * Modify `LockRegistry`, `ExpirableLockRegistry`, `RenewableLockRegistry` interfaces. * Modify implementation of `DefaultLockRepository, `JdbcLockRegistry`, `RedisLockRegistry` * Modify ddl of `INT_LOCK` table. * Maintain test cases and documents.
1 parent 4108ea5 commit 79a1268

File tree

37 files changed

+338
-149
lines changed

37 files changed

+338
-149
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -130,7 +130,7 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
130130

131131
private boolean sequenceAware;
132132

133-
private LockRegistry lockRegistry = new DefaultLockRegistry();
133+
private LockRegistry<?> lockRegistry = new DefaultLockRegistry();
134134

135135
private boolean lockRegistrySet = false;
136136

@@ -193,7 +193,7 @@ public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) {
193193
this(processor, new SimpleMessageStore(0), null, null);
194194
}
195195

196-
public void setLockRegistry(LockRegistry lockRegistry) {
196+
public void setLockRegistry(LockRegistry<?> lockRegistry) {
197197
Assert.isTrue(!this.lockRegistrySet, "'this.lockRegistry' can not be reset once its been set");
198198
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
199199
this.lockRegistry = lockRegistry;
@@ -499,7 +499,7 @@ protected boolean isSequenceAware() {
499499
return this.sequenceAware;
500500
}
501501

502-
protected LockRegistry getLockRegistry() {
502+
protected LockRegistry<?> getLockRegistry() {
503503
return this.lockRegistry;
504504
}
505505

spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,7 +65,7 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
6565

6666
private String outputChannelName;
6767

68-
private LockRegistry lockRegistry;
68+
private LockRegistry<?> lockRegistry;
6969

7070
private MessageGroupStore messageStore;
7171

@@ -122,7 +122,7 @@ public void setOutputChannelName(String outputChannelName) {
122122
this.outputChannelName = outputChannelName;
123123
}
124124

125-
public void setLockRegistry(LockRegistry lockRegistry) {
125+
public void setLockRegistry(LockRegistry<?> lockRegistry) {
126126
this.lockRegistry = lockRegistry;
127127
}
128128

spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -299,7 +299,7 @@ public S forceReleaseAdvice(Advice... advice) {
299299
* @param lockRegistry the {@link LockRegistry} to use.
300300
* @return the endpoint spec.
301301
*/
302-
public S lockRegistry(LockRegistry lockRegistry) {
302+
public S lockRegistry(LockRegistry<?> lockRegistry) {
303303
Assert.notNull(lockRegistry, "'lockRegistry' must not be null.");
304304
this.handler.setLockRegistry(lockRegistry);
305305
return _this();

spring-integration-core/src/main/java/org/springframework/integration/metadata/PropertiesPersistingMetadataStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,7 +65,7 @@ public class PropertiesPersistingMetadataStore implements ConcurrentMetadataStor
6565

6666
private final DefaultPropertiesPersister persister = new DefaultPropertiesPersister();
6767

68-
private final LockRegistry lockRegistry = new DefaultLockRegistry();
68+
private final LockRegistry<Lock> lockRegistry = new DefaultLockRegistry();
6969

7070
private String baseDirectory = System.getProperty("java.io.tmpdir") + "/spring-integration/";
7171

spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -71,7 +71,7 @@ public class SimpleMessageStore extends AbstractMessageGroupStore
7171

7272
private final long upperBoundTimeout;
7373

74-
private LockRegistry lockRegistry;
74+
private LockRegistry<?> lockRegistry;
7575

7676
private boolean copyOnGet = false;
7777

@@ -111,7 +111,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperB
111111
* @param lockRegistry The lock registry.
112112
* @see #SimpleMessageStore(int, int, long, LockRegistry)
113113
*/
114-
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry lockRegistry) {
114+
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry<?> lockRegistry) {
115115
this(individualCapacity, groupCapacity, 0, lockRegistry);
116116
}
117117

@@ -126,7 +126,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistr
126126
* @since 4.3
127127
*/
128128
public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperBoundTimeout,
129-
LockRegistry lockRegistry) {
129+
LockRegistry<?> lockRegistry) {
130130

131131
super(false);
132132
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
@@ -162,7 +162,7 @@ public void setCopyOnGet(boolean copyOnGet) {
162162
this.copyOnGet = copyOnGet;
163163
}
164164

165-
public void setLockRegistry(LockRegistry lockRegistry) {
165+
public void setLockRegistry(LockRegistry<?> lockRegistry) {
166166
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
167167
Assert.isTrue(!(this.isUsed), "Cannot change the lock registry after the store has been used");
168168
this.lockRegistry = lockRegistry;

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,7 +74,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
7474
* A lock registry. The locks it manages should be global (whatever that means for the
7575
* system) and expiring, in case the holder dies without notifying anyone.
7676
*/
77-
private final LockRegistry locks;
77+
private final LockRegistry<?> locks;
7878

7979
/**
8080
* Candidate for leader election. User injects this to receive callbacks on leadership
@@ -162,7 +162,7 @@ public String getRole() {
162162
* candidate (which just logs the leadership events).
163163
* @param locks lock registry
164164
*/
165-
public LockRegistryLeaderInitiator(LockRegistry locks) {
165+
public LockRegistryLeaderInitiator(LockRegistry<?> locks) {
166166
this(locks, new DefaultCandidate());
167167
}
168168

@@ -172,7 +172,7 @@ public LockRegistryLeaderInitiator(LockRegistry locks) {
172172
* @param locks lock registry
173173
* @param candidate leadership election candidate
174174
*/
175-
public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
175+
public LockRegistryLeaderInitiator(LockRegistry<?> locks, Candidate candidate) {
176176
Assert.notNull(locks, "'locks' must not be null");
177177
Assert.notNull(candidate, "'candidate' must not be null");
178178
this.locks = locks;

spring-integration-core/src/main/java/org/springframework/integration/support/locks/DefaultLockRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@
3434
* @since 2.1.1
3535
*
3636
*/
37-
public final class DefaultLockRegistry implements LockRegistry {
37+
public final class DefaultLockRegistry implements LockRegistry<Lock> {
3838

3939
private final Lock[] lockTable;
4040

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.locks;
18+
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.locks.Lock;
21+
22+
/**
23+
* A {@link Lock} implementing for spring distributed locks
24+
*
25+
* @author Eddie Cho
26+
*
27+
* @since 6.3
28+
*/
29+
public interface DistributedLock extends Lock {
30+
31+
/**
32+
* Attempt to acquire a lock with a specific time-to-live
33+
* @param time the maximum time to wait for the lock unit
34+
* @param unit the time unit of the time argument
35+
* @param customTtl the specific time-to-live for the lock status data
36+
* @param customTtlUnit the time unit of the customTtl argument
37+
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
38+
* @throws InterruptedException -
39+
* if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported)
40+
*/
41+
boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException;
42+
43+
/**
44+
* Attempt to acquire a lock with a specific time-to-live
45+
* @param customTtl the specific time-to-live for the lock status data
46+
* @param customTtlUnit the time unit of the customTtl argument
47+
*/
48+
void lock(long customTtl, TimeUnit customTtlUnit);
49+
}

spring-integration-core/src/main/java/org/springframework/integration/support/locks/ExpirableLockRegistry.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,18 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import java.util.concurrent.locks.Lock;
20+
1921
/**
2022
* A {@link LockRegistry} implementing this interface supports the removal of aged locks
2123
* that are not currently locked.
24+
* @param <L> The expected class of the lock implementation
2225
*
2326
* @author Gary Russell
2427
* @since 4.2
2528
*
2629
*/
27-
public interface ExpirableLockRegistry extends LockRegistry {
30+
public interface ExpirableLockRegistry<L extends Lock> extends LockRegistry<L> {
2831

2932
/**
3033
* Remove locks last acquired more than 'age' ago that are not currently locked.

spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626

2727
/**
2828
* Strategy for maintaining a registry of shared locks.
29+
* @param <L> The expected class of the lock implementation
2930
*
3031
* @author Oleg Zhurakousky
3132
* @author Gary Russell
@@ -34,14 +35,14 @@
3435
* @since 2.1.1
3536
*/
3637
@FunctionalInterface
37-
public interface LockRegistry {
38+
public interface LockRegistry<L extends Lock> {
3839

3940
/**
4041
* Obtain the lock associated with the parameter object.
4142
* @param lockKey The object with which the lock is associated.
4243
* @return The associated lock.
4344
*/
44-
Lock obtain(Object lockKey);
45+
L obtain(Object lockKey);
4546

4647
/**
4748
* Perform the provided task when the lock for the key is locked.

spring-integration-core/src/main/java/org/springframework/integration/support/locks/PassThruLockRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,7 +33,7 @@
3333
* @since 2.2
3434
*
3535
*/
36-
public final class PassThruLockRegistry implements LockRegistry {
36+
public final class PassThruLockRegistry implements LockRegistry<Lock> {
3737

3838
@Override
3939
public Lock obtain(Object lockKey) {

spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,16 +16,19 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import java.util.concurrent.locks.Lock;
20+
1921
/**
2022
* A {@link LockRegistry} implementing this interface supports the renewal
2123
* of the time to live of a lock.
24+
* @param <L> The expected class of the lock implementation
2225
*
2326
* @author Alexandre Strubel
2427
* @author Artem Bilan
2528
*
2629
* @since 5.4
2730
*/
28-
public interface RenewableLockRegistry extends LockRegistry {
31+
public interface RenewableLockRegistry<L extends Lock> extends LockRegistry<L> {
2932

3033
/**
3134
* Renew the time to live of the lock is associated with the parameter object.

spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class LockRegistryLeaderInitiatorTests {
6161

6262
private CountDownLatch revoked = new CountDownLatch(1);
6363

64-
private final LockRegistry registry = new DefaultLockRegistry();
64+
private final LockRegistry<Lock> registry = new DefaultLockRegistry();
6565

6666
private final LockRegistryLeaderInitiator initiator =
6767
new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate());
@@ -159,6 +159,7 @@ public void publishOnGranted(Object source, Context context, String role) {
159159
}
160160

161161
@Test
162+
@SuppressWarnings("rawtypes")
162163
public void competingWithLock() throws Exception {
163164
// switch used to toggle which registry obtains lock
164165
AtomicBoolean firstLocked = new AtomicBoolean(true);
@@ -220,6 +221,7 @@ public void competingWithLock() throws Exception {
220221
}
221222

222223
@Test
224+
@SuppressWarnings("rawtypes")
223225
public void testGracefulLeaderSelectorExit() throws Exception {
224226
AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
225227

@@ -275,7 +277,7 @@ public void testExceptionFromLock() throws Exception {
275277
}
276278
}).given(mockLock).tryLock(anyLong(), any(TimeUnit.class));
277279

278-
LockRegistry registry = lockKey -> mockLock;
280+
LockRegistry<Lock> registry = lockKey -> mockLock;
279281

280282
CountDownLatch onGranted = new CountDownLatch(1);
281283

0 commit comments

Comments
 (0)