Skip to content

Commit 0a22629

Browse files
committed
busy-waiting loops replaced with event driven programming
1 parent f0611bc commit 0a22629

File tree

5 files changed

+166
-93
lines changed

5 files changed

+166
-93
lines changed

queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,20 @@
2626

2727
import lombok.extern.slf4j.Slf4j;
2828

29+
import java.util.concurrent.BlockingQueue;
30+
2931
/**
30-
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
32+
* ServiceExecuotr class. This class will pick up Messages one by one from the
33+
* Blocking Queue and
3134
* process them.
3235
*/
3336
@Slf4j
37+
3438
public class ServiceExecutor implements Runnable {
3539

36-
private final MessageQueue msgQueue;
40+
private final BlockingQueue<Message> msgQueue;
3741

38-
public ServiceExecutor(MessageQueue msgQueue) {
42+
public ServiceExecutor(BlockingQueue<Message> msgQueue) {
3943
this.msgQueue = msgQueue;
4044
}
4145

@@ -44,19 +48,14 @@ public ServiceExecutor(MessageQueue msgQueue) {
4448
*/
4549
public void run() {
4650
try {
47-
while (!Thread.currentThread().isInterrupted()) {
48-
var msg = msgQueue.retrieveMsg();
49-
50-
if (null != msg) {
51-
LOGGER.info(msg + " is served.");
52-
} else {
53-
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
54-
}
51+
while (true) {
52+
Message msg = msgQueue.take(); // This will block until a message is available
5553

56-
Thread.sleep(1000);
54+
LOGGER.info(msg + " is served.");
5755
}
58-
} catch (Exception e) {
59-
LOGGER.error(e.getMessage());
56+
} catch (InterruptedException e) {
57+
LOGGER.error("ServiceExecutor interrupted", e);
58+
Thread.currentThread().interrupt();
6059
}
6160
}
6261
}

retry/src/main/java/com/iluwatar/retry/BusinessException.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
import java.io.Serial;
2828

2929
/**
30-
* The top-most type in our exception hierarchy that signifies that an unexpected error condition
31-
* occurred. Its use is reserved as a "catch-all" for cases where no other subtype captures the
32-
* specificity of the error condition in question. Calling code is not expected to be able to handle
30+
* The top-most type in our exception hierarchy that signifies that an
31+
* unexpected error condition
32+
* occurred. Its use is reserved as a "catch-all" for cases where no other
33+
* subtype captures the
34+
* specificity of the error condition in question. Calling code is not expected
35+
* to be able to handle
3336
* this error and should be reported to the maintainers immediately.
3437
*
3538
*/
@@ -38,11 +41,19 @@ public class BusinessException extends Exception {
3841
private static final long serialVersionUID = 6235833142062144336L;
3942

4043
/**
41-
* Ctor.
44+
* Ctor .
4245
*
4346
* @param message the error message
4447
*/
4548
public BusinessException(String message) {
4649
super(message);
4750
}
51+
52+
public BusinessException(String message, Throwable cause) {
53+
super(message, cause);
54+
}
55+
56+
public BusinessException(Throwable cause) {
57+
super(cause);
58+
}
4859
}

retry/src/main/java/com/iluwatar/retry/Retry.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import java.util.Arrays;
2929
import java.util.Collections;
3030
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
3132
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.function.Predicate;
3334

3435
/**
35-
* Decorates {@link BusinessOperation business operation} with "retry" capabilities.
36+
* Decorates {@link BusinessOperation business operation} with "retry"
37+
* capabilities.
3638
*
3739
* @param <T> the remote op's return type
3840
*/
@@ -43,23 +45,24 @@ public final class Retry<T> implements BusinessOperation<T> {
4345
private final AtomicInteger attempts;
4446
private final Predicate<Exception> test;
4547
private final List<Exception> errors;
48+
private int attemptsCount;
4649

4750
/**
4851
* Ctor.
4952
*
5053
* @param op the {@link BusinessOperation} to retry
5154
* @param maxAttempts number of times to retry
5255
* @param delay delay (in milliseconds) between attempts
53-
* @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions
56+
* @param ignoreTests tests to check whether the remote exception can be
57+
* ignored. No exceptions
5458
* will be ignored if no tests are given
5559
*/
5660
@SafeVarargs
5761
public Retry(
5862
BusinessOperation<T> op,
5963
int maxAttempts,
6064
long delay,
61-
Predicate<Exception>... ignoreTests
62-
) {
65+
Predicate<Exception>... ignoreTests) {
6366
this.op = op;
6467
this.maxAttempts = maxAttempts;
6568
this.delay = delay;
@@ -88,22 +91,42 @@ public int attempts() {
8891

8992
@Override
9093
public T perform() throws BusinessException {
91-
do {
94+
CompletableFuture<T> future = new CompletableFuture<>();
95+
retryAttempt(future, 1);
96+
try {
97+
return future.get();
98+
} catch (Exception e) {
99+
throw new BusinessException("Max retry attempts exceeded.", e);
100+
}
101+
}
102+
103+
private void retryAttempt(CompletableFuture<T> future, int attempt) {
104+
if (attempt > maxAttempts) {
105+
future.completeExceptionally(new BusinessException("Max retry attempts exceeded."));
106+
return;
107+
}
108+
109+
CompletableFuture.runAsync(() -> {
92110
try {
93-
return this.op.perform();
111+
T result = op.perform();
112+
future.complete(result);
94113
} catch (BusinessException e) {
95-
this.errors.add(e);
96-
97-
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
98-
throw e;
114+
errors.add(e);
115+
attemptsCount++;
116+
if (test.test(e)) {
117+
retryAttempt(future, attempt + 1);
118+
} else {
119+
future.completeExceptionally(e);
99120
}
121+
}
122+
});
100123

101-
try {
102-
Thread.sleep(this.delay);
103-
} catch (InterruptedException f) {
104-
//ignore
105-
}
124+
if (attempt < maxAttempts) {
125+
try {
126+
Thread.sleep(delay);
127+
} catch (InterruptedException ignored) {
128+
Thread.currentThread().interrupt();
106129
}
107-
} while (true);
130+
}
108131
}
109132
}

retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import java.util.Collections;
3030
import java.util.List;
3131
import java.util.Random;
32+
import java.util.concurrent.CompletableFuture;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.function.Predicate;
3435

3536
/**
36-
* Decorates {@link BusinessOperation business operation} with "retry" capabilities.
37+
* Decorates {@link BusinessOperation business operation} with "retry"
38+
* capabilities using exponential backoff strategy.
3739
*
3840
* @param <T> the remote op's return type
3941
*/
@@ -42,25 +44,23 @@ public final class RetryExponentialBackoff<T> implements BusinessOperation<T> {
4244
private final BusinessOperation<T> op;
4345
private final int maxAttempts;
4446
private final long maxDelay;
45-
private final AtomicInteger attempts;
4647
private final Predicate<Exception> test;
4748
private final List<Exception> errors;
49+
private final AtomicInteger attempts;
4850

4951
/**
50-
* Ctor.
52+
* Constructor.
5153
*
5254
* @param op the {@link BusinessOperation} to retry
5355
* @param maxAttempts number of times to retry
54-
* @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions
56+
* @param maxDelay the maximum delay between retries (in milliseconds)
57+
* @param ignoreTests tests to check whether the remote exception can be
58+
* ignored. No exceptions
5559
* will be ignored if no tests are given
5660
*/
5761
@SafeVarargs
58-
public RetryExponentialBackoff(
59-
BusinessOperation<T> op,
60-
int maxAttempts,
61-
long maxDelay,
62-
Predicate<Exception>... ignoreTests
63-
) {
62+
public RetryExponentialBackoff(BusinessOperation<T> op, int maxAttempts, long maxDelay,
63+
Predicate<Exception>... ignoreTests) {
6464
this.op = op;
6565
this.maxAttempts = maxAttempts;
6666
this.maxDelay = maxDelay;
@@ -70,7 +70,7 @@ public RetryExponentialBackoff(
7070
}
7171

7272
/**
73-
* The errors encountered while retrying, in the encounter order.
73+
* The errors encountered while retrying, in the order of occurrence.
7474
*
7575
* @return the errors encountered while retrying
7676
*/
@@ -89,24 +89,59 @@ public int attempts() {
8989

9090
@Override
9191
public T perform() throws BusinessException {
92-
do {
93-
try {
94-
return this.op.perform();
95-
} catch (BusinessException e) {
96-
this.errors.add(e);
92+
CompletableFuture<T> future = new CompletableFuture<>();
93+
retryAttempt(future, 1);
94+
try {
95+
return future.get();
96+
} catch (Exception e) {
97+
throw new BusinessException("Max retry attempts exceeded.", e);
98+
}
99+
}
97100

98-
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
99-
throw e;
100-
}
101+
/**
102+
* Recursive method to perform retry attempts asynchronously using
103+
* CompletableFuture.
104+
*
105+
* @param future CompletableFuture to handle the result of the retry attempts
106+
* @param attempt current attempt number
107+
*/
108+
private void retryAttempt(CompletableFuture<T> future, int attempt) {
109+
if (attempt > maxAttempts) {
110+
future.completeExceptionally(new BusinessException("Max retry attempts exceeded."));
111+
return;
112+
}
101113

102-
try {
103-
var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000);
104-
var delay = Math.min(testDelay, this.maxDelay);
105-
Thread.sleep(delay);
106-
} catch (InterruptedException f) {
107-
//ignore
114+
CompletableFuture.runAsync(() -> {
115+
try {
116+
T result = op.perform();
117+
future.complete(result);
118+
} catch (BusinessException e) {
119+
errors.add(e);
120+
attempts.incrementAndGet();
121+
if (attempts.get() >= maxAttempts || !test.test(e)) {
122+
future.completeExceptionally(e);
123+
} else {
124+
long delay = calculateDelay(attempts.get());
125+
try {
126+
Thread.sleep(delay);
127+
} catch (InterruptedException ignored) {
128+
Thread.currentThread().interrupt();
129+
}
130+
retryAttempt(future, attempt + 1);
108131
}
109132
}
110-
} while (true);
133+
});
134+
}
135+
136+
/**
137+
* Calculates the delay for exponential backoff based on the current attempt
138+
* number.
139+
*
140+
* @param attempt the current attempt number
141+
* @return the calculated delay (in milliseconds)
142+
*/
143+
private long calculateDelay(int attempt) {
144+
long testDelay = (long) Math.pow(2, attempt) * 1000 + RANDOM.nextInt(1000);
145+
return Math.min(testDelay, maxDelay);
111146
}
112147
}

0 commit comments

Comments
 (0)