Skip to content

Commit 03b9570

Browse files
committed
spring-projectsGH-3444: Add Custom TTL support for RedisLock, and JdbcLock
Fixes: spring-projects#3444 * Add `DistributedLock` interface which is implemented by `RedisLock` and `JdbcLock`. * Modify `LockRegistry`, `ExpirableLockRegistry`, `RenewableLockRegistry` interfaces. * Modify implementation of `DefaultLockRepository, `JdbcLockRegistry`, `RedisLockRegistry` * Modify ddl of `INT_LOCK` table. * Maintain test cases and documents. Signed-off-by: Eddie Cho <[email protected]>
1 parent f4e7763 commit 03b9570

File tree

41 files changed

+577
-166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+577
-166
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
135135

136136
private boolean sequenceAware;
137137

138-
private LockRegistry lockRegistry = new DefaultLockRegistry();
138+
private LockRegistry<?> lockRegistry = new DefaultLockRegistry();
139139

140140
private boolean lockRegistrySet = false;
141141

@@ -198,7 +198,7 @@ public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) {
198198
this(processor, new SimpleMessageStore(0), null, null);
199199
}
200200

201-
public void setLockRegistry(LockRegistry lockRegistry) {
201+
public void setLockRegistry(LockRegistry<?> lockRegistry) {
202202
Assert.isTrue(!this.lockRegistrySet, "'this.lockRegistry' can not be reset once its been set");
203203
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
204204
this.lockRegistry = lockRegistry;
@@ -516,7 +516,7 @@ protected boolean isSequenceAware() {
516516
return this.sequenceAware;
517517
}
518518

519-
protected LockRegistry getLockRegistry() {
519+
protected LockRegistry<?> getLockRegistry() {
520520
return this.lockRegistry;
521521
}
522522

spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe
6666

6767
private String outputChannelName;
6868

69-
private LockRegistry lockRegistry;
69+
private LockRegistry<?> lockRegistry;
7070

7171
private MessageGroupStore messageStore;
7272

@@ -125,7 +125,7 @@ public void setOutputChannelName(String outputChannelName) {
125125
this.outputChannelName = outputChannelName;
126126
}
127127

128-
public void setLockRegistry(LockRegistry lockRegistry) {
128+
public void setLockRegistry(LockRegistry<?> lockRegistry) {
129129
this.lockRegistry = lockRegistry;
130130
}
131131

spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ public S forceReleaseAdvice(Advice... advice) {
312312
* @param lockRegistry the {@link LockRegistry} to use.
313313
* @return the endpoint spec.
314314
*/
315-
public S lockRegistry(LockRegistry lockRegistry) {
315+
public S lockRegistry(LockRegistry<?> lockRegistry) {
316316
Assert.notNull(lockRegistry, "'lockRegistry' must not be null.");
317317
this.handler.setLockRegistry(lockRegistry);
318318
return _this();

spring-integration-core/src/main/java/org/springframework/integration/handler/advice/LockRequestHandlerAdvice.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
*/
4949
public class LockRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
5050

51-
private final LockRegistry lockRegistry;
51+
private final LockRegistry<?> lockRegistry;
5252

5353
private final Expression lockKeyExpression;
5454

@@ -65,7 +65,7 @@ public class LockRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
6565
* @param lockRegistry the {@link LockRegistry} to use.
6666
* @param lockKey the static (shared) lock key for all the calls.
6767
*/
68-
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) {
68+
public LockRequestHandlerAdvice(LockRegistry<?> lockRegistry, Object lockKey) {
6969
this(lockRegistry, new ValueExpression<>(lockKey));
7070
}
7171

@@ -75,7 +75,7 @@ public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) {
7575
* @param lockRegistry the {@link LockRegistry} to use.
7676
* @param lockKeyExpression the SpEL expression to evaluate a lock key against request message.
7777
*/
78-
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExpression) {
78+
public LockRequestHandlerAdvice(LockRegistry<?> lockRegistry, Expression lockKeyExpression) {
7979
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
8080
Assert.notNull(lockKeyExpression, "'lockKeyExpression' must not be null");
8181
this.lockRegistry = lockRegistry;
@@ -88,7 +88,7 @@ public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExp
8888
* @param lockRegistry the {@link LockRegistry} to use.
8989
* @param lockKeyFunction the function to evaluate a lock key against request message.
9090
*/
91-
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Function<Message<?>, Object> lockKeyFunction) {
91+
public LockRequestHandlerAdvice(LockRegistry<?> lockRegistry, Function<Message<?>, Object> lockKeyFunction) {
9292
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
9393
Assert.notNull(lockKeyFunction, "'lockKeyFunction' must not be null");
9494
this.lockRegistry = lockRegistry;

spring-integration-core/src/main/java/org/springframework/integration/metadata/PropertiesPersistingMetadataStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,7 +65,7 @@ public class PropertiesPersistingMetadataStore implements ConcurrentMetadataStor
6565

6666
private final DefaultPropertiesPersister persister = new DefaultPropertiesPersister();
6767

68-
private final LockRegistry lockRegistry = new DefaultLockRegistry();
68+
private final LockRegistry<Lock> lockRegistry = new DefaultLockRegistry();
6969

7070
private String baseDirectory = System.getProperty("java.io.tmpdir") + "/spring-integration/";
7171

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG
6767

6868
private boolean timeoutOnIdle;
6969

70-
private LockRegistry lockRegistry = new DefaultLockRegistry();
70+
private LockRegistry<?> lockRegistry = new DefaultLockRegistry();
7171

7272
protected AbstractMessageGroupStore() {
7373
}
@@ -127,12 +127,12 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) {
127127
* @param lockRegistry lockRegistryType
128128
* @since 6.5
129129
*/
130-
public final void setLockRegistry(LockRegistry lockRegistry) {
130+
public final void setLockRegistry(LockRegistry<?> lockRegistry) {
131131
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
132132
this.lockRegistry = lockRegistry;
133133
}
134134

135-
protected LockRegistry getLockRegistry() {
135+
protected LockRegistry<?> getLockRegistry() {
136136
return this.lockRegistry;
137137
}
138138

spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperB
106106
* @param lockRegistry The lock registry.
107107
* @see #SimpleMessageStore(int, int, long, LockRegistry)
108108
*/
109-
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry lockRegistry) {
109+
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry<?> lockRegistry) {
110110
this(individualCapacity, groupCapacity, 0, lockRegistry);
111111
}
112112

@@ -122,7 +122,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistr
122122
*/
123123
@SuppressWarnings("this-escape")
124124
public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperBoundTimeout,
125-
LockRegistry lockRegistry) {
125+
LockRegistry<?> lockRegistry) {
126126

127127
super(false);
128128
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,7 +74,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
7474
* A lock registry. The locks it manages should be global (whatever that means for the
7575
* system) and expiring, in case the holder dies without notifying anyone.
7676
*/
77-
private final LockRegistry locks;
77+
private final LockRegistry<?> locks;
7878

7979
/**
8080
* Candidate for leader election. User injects this to receive callbacks on leadership
@@ -162,7 +162,7 @@ public String getRole() {
162162
* candidate (which just logs the leadership events).
163163
* @param locks lock registry
164164
*/
165-
public LockRegistryLeaderInitiator(LockRegistry locks) {
165+
public LockRegistryLeaderInitiator(LockRegistry<?> locks) {
166166
this(locks, new DefaultCandidate());
167167
}
168168

@@ -172,7 +172,7 @@ public LockRegistryLeaderInitiator(LockRegistry locks) {
172172
* @param locks lock registry
173173
* @param candidate leadership election candidate
174174
*/
175-
public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
175+
public LockRegistryLeaderInitiator(LockRegistry<?> locks, Candidate candidate) {
176176
Assert.notNull(locks, "'locks' must not be null");
177177
Assert.notNull(candidate, "'candidate' must not be null");
178178
this.locks = locks;

spring-integration-core/src/main/java/org/springframework/integration/support/locks/DefaultLockRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@
3434
* @since 2.1.1
3535
*
3636
*/
37-
public final class DefaultLockRegistry implements LockRegistry {
37+
public final class DefaultLockRegistry implements LockRegistry<Lock> {
3838

3939
private final Lock[] lockTable;
4040

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.locks;
18+
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.locks.Lock;
21+
22+
/**
23+
* A {@link Lock} implementing for spring distributed locks
24+
*
25+
* @author Eddie Cho
26+
*
27+
* @since 7.0
28+
*/
29+
public interface DistributedLock extends Lock {
30+
31+
/**
32+
* Attempt to acquire a lock with a specific time-to-live
33+
* @param customTtl the specific time-to-live for the lock status data
34+
* @param customTtlTimeUnit the time unit of the {@code customTtl} argument
35+
*/
36+
void lock(long customTtl, TimeUnit customTtlTimeUnit);
37+
38+
/**
39+
* Acquires the lock with a specific time-to-live if it is free within the
40+
* given waiting time and the current thread has not been {@linkplain Thread#interrupt interrupted}.
41+
* @param time the maximum time to wait for the lock
42+
* @param unit the time unit of the {@code time} argument
43+
* @param customTtl the specific time-to-live for the lock status data
44+
* @param customTtlTimeUnit the time unit of the customTtl argument
45+
* @return {@code true} if the lock was acquired and {@code false}
46+
* if the waiting time elapsed before the lock was acquired
47+
*
48+
* @throws InterruptedException if the current thread is interrupted
49+
* while acquiring the lock (and interruption of lock
50+
* acquisition is supported)
51+
*/
52+
boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlTimeUnit) throws InterruptedException;
53+
}

spring-integration-core/src/main/java/org/springframework/integration/support/locks/ExpirableLockRegistry.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,18 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import java.util.concurrent.locks.Lock;
20+
1921
/**
2022
* A {@link LockRegistry} implementing this interface supports the removal of aged locks
2123
* that are not currently locked.
24+
* @param <L> The expected class of the lock implementation
2225
*
2326
* @author Gary Russell
2427
* @since 4.2
2528
*
2629
*/
27-
public interface ExpirableLockRegistry extends LockRegistry {
30+
public interface ExpirableLockRegistry<L extends Lock> extends LockRegistry<L> {
2831

2932
/**
3033
* Remove locks last acquired more than 'age' ago that are not currently locked.

spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626

2727
/**
2828
* Strategy for maintaining a registry of shared locks.
29+
* @param <L> The expected class of the lock implementation
2930
*
3031
* @author Oleg Zhurakousky
3132
* @author Gary Russell
@@ -34,14 +35,14 @@
3435
* @since 2.1.1
3536
*/
3637
@FunctionalInterface
37-
public interface LockRegistry {
38+
public interface LockRegistry<L extends Lock> {
3839

3940
/**
4041
* Obtain the lock associated with the parameter object.
4142
* @param lockKey The object with which the lock is associated.
4243
* @return The associated lock.
4344
*/
44-
Lock obtain(Object lockKey);
45+
L obtain(Object lockKey);
4546

4647
/**
4748
* Perform the provided task when the lock for the key is locked.

spring-integration-core/src/main/java/org/springframework/integration/support/locks/PassThruLockRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,7 +33,7 @@
3333
* @since 2.2
3434
*
3535
*/
36-
public final class PassThruLockRegistry implements LockRegistry {
36+
public final class PassThruLockRegistry implements LockRegistry<Lock> {
3737

3838
@Override
3939
public Lock obtain(Object lockKey) {

spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,19 +16,23 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.locks.Lock;
21+
1922
import org.springframework.scheduling.TaskScheduler;
2023

2124
/**
2225
* A {@link LockRegistry} implementing this interface supports the renewal
2326
* of the time to live of a lock.
27+
* @param <L> The expected class of the lock implementation
2428
*
2529
* @author Alexandre Strubel
2630
* @author Artem Bilan
2731
* @author Youbin Wu
2832
*
2933
* @since 5.4
3034
*/
31-
public interface RenewableLockRegistry extends LockRegistry {
35+
public interface RenewableLockRegistry<L extends Lock> extends LockRegistry<L> {
3236

3337
/**
3438
* Renew the time to live of the lock is associated with the parameter object.
@@ -37,6 +41,16 @@ public interface RenewableLockRegistry extends LockRegistry {
3741
*/
3842
void renewLock(Object lockKey);
3943

44+
/**
45+
* Renew the time to live of the lock is associated with the parameter object with a specific value.
46+
* The lock must be held by the current thread
47+
* @param lockKey The object with which the lock is associated.
48+
* @param customTtl the specific time-to-live for the lock status data
49+
* @param customTtlTimeUnit the time unit of the {@code customTtl} argument
50+
*
51+
*/
52+
void renewLock(Object lockKey, long customTtl, TimeUnit customTtlTimeUnit);
53+
4054
/**
4155
* Set the {@link TaskScheduler} to use for the renewal task.
4256
* When renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that

0 commit comments

Comments
 (0)