Skip to content

Commit f8d84b6

Browse files
killme2008fengjiachun
authored andcommitted
(fix) Iterator#setErrorAndRollback may break ReadIndex promise, #317 (#361)
* (fix) Iterator#setErrorAndRollback may break ReadIndex promise, #317 * (feat) Adds comment * fix hang with CI * (fix) FSMCallerTest#testOnCommittedError * feat/release 1.3.0 (#363) * minor change * release 1.3.0 * release 1.3.0 * fix hang with ut
1 parent 8fdde14 commit f8d84b6

File tree

16 files changed

+219
-45
lines changed

16 files changed

+219
-45
lines changed

jraft-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>jraft-parent</artifactId>
77
<groupId>com.alipay.sofa</groupId>
8-
<version>1.2.6</version>
8+
<version>1.3.0</version>
99
</parent>
1010
<artifactId>jraft-core</artifactId>
1111
<packaging>jar</packaging>

jraft-core/src/main/java/com/alipay/sofa/jraft/Iterator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ public interface Iterator extends java.util.Iterator<ByteBuffer> {
6767
* |ntail| tasks (starting from the last iterated one) as not applied. After
6868
* this point, no further changes on the StateMachine as well as the Node
6969
* would be allowed and you should try to repair this replica or just drop it.
70-
*
71-
* If |statInfo| is not NULL, it should describe the detail of the error.
70+
*
71+
* @param ntail the number of tasks (starting from the last iterated one) considered as not to be applied.
72+
* @param st Status to describe the detail of the error.
7273
*/
7374
void setErrorAndRollback(final long ntail, final Status st);
7475
}

jraft-core/src/main/java/com/alipay/sofa/jraft/ReadOnlyService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alipay.sofa.jraft;
1818

1919
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
20+
import com.alipay.sofa.jraft.error.RaftException;
2021
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
2122

2223
/**
@@ -43,4 +44,10 @@ public interface ReadOnlyService extends Lifecycle<ReadOnlyServiceOptions> {
4344
*/
4445
void join() throws InterruptedException;
4546

47+
/**
48+
* Called when the node is turned into error state.
49+
* @param error error with raft info
50+
*/
51+
void setError(final RaftException error);
52+
4653
}

jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,10 +515,10 @@ private void doCommitted(final long committedIndex) {
515515
final long lastIndex = iterImpl.getIndex() - 1;
516516
final long lastTerm = this.logManager.getTerm(lastIndex);
517517
final LogId lastAppliedId = new LogId(lastIndex, lastTerm);
518-
this.lastAppliedIndex.set(committedIndex);
518+
this.lastAppliedIndex.set(lastIndex);
519519
this.lastAppliedTerm = lastTerm;
520520
this.logManager.setAppliedId(lastAppliedId);
521-
notifyLastAppliedIndexUpdated(committedIndex);
521+
notifyLastAppliedIndexUpdated(lastIndex);
522522
} finally {
523523
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
524524
}

jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2373,6 +2373,9 @@ public void onError(final RaftException error) {
23732373
// onError of fsmCaller is guaranteed to be executed once.
23742374
this.fsmCaller.onError(error);
23752375
}
2376+
if (this.readOnlyService != null) {
2377+
this.readOnlyService.setError(error);
2378+
}
23762379
this.writeLock.lock();
23772380
try {
23782381
// If it is leader, need to wake up a new one;

jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.alipay.sofa.jraft.entity.ReadIndexState;
4040
import com.alipay.sofa.jraft.entity.ReadIndexStatus;
4141
import com.alipay.sofa.jraft.error.RaftError;
42+
import com.alipay.sofa.jraft.error.RaftException;
4243
import com.alipay.sofa.jraft.option.RaftOptions;
4344
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
4445
import com.alipay.sofa.jraft.rpc.RpcRequests.ReadIndexRequest;
@@ -82,6 +83,8 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
8283

8384
private NodeMetrics nodeMetrics;
8485

86+
private volatile RaftException error;
87+
8588
// <logIndex, statusList>
8689
private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();
8790

@@ -217,6 +220,20 @@ private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
217220
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
218221
}
219222

223+
private void resetPendingStatusError(final Status st) {
224+
this.lock.lock();
225+
try {
226+
for (final List<ReadIndexStatus> statuses : this.pendingNotifyStatus.values()) {
227+
for (final ReadIndexStatus status : statuses) {
228+
reportError(status, st);
229+
}
230+
}
231+
this.pendingNotifyStatus.clear();
232+
} finally {
233+
this.lock.unlock();
234+
}
235+
}
236+
220237
@Override
221238
public boolean init(final ReadOnlyServiceOptions opts) {
222239
this.node = opts.getNode();
@@ -250,6 +267,13 @@ public boolean init(final ReadOnlyServiceOptions opts) {
250267
return true;
251268
}
252269

270+
@Override
271+
public synchronized void setError(final RaftException error) {
272+
if (this.error == null) {
273+
this.error = error;
274+
}
275+
}
276+
253277
@Override
254278
public synchronized void shutdown() {
255279
if (this.shutdownLatch != null) {
@@ -267,6 +291,7 @@ public void join() throws InterruptedException {
267291
this.shutdownLatch.await();
268292
}
269293
this.readIndexDisruptor.shutdown();
294+
resetPendingStatusError(new Status(RaftError.ESTOP, "Node is quit."));
270295
this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
271296
}
272297

@@ -331,6 +356,16 @@ public void onApplied(final long appliedIndex) {
331356
}
332357

333358
}
359+
360+
/*
361+
* Remaining pending statuses are notified by error if it is presented.
362+
* When the node is in error state, consider following situations:
363+
* 1. If commitIndex > appliedIndex, then all pending statuses should be notified by error status.
364+
* 2. When commitIndex == appliedIndex, there will be no more pending statuses.
365+
*/
366+
if (this.error != null) {
367+
resetPendingStatusError(this.error.getStatus());
368+
}
334369
} finally {
335370
this.lock.unlock();
336371
if (pendingStatuses != null && !pendingStatuses.isEmpty()) {
@@ -356,6 +391,20 @@ TreeMap<Long, List<ReadIndexStatus>> getPendingNotifyStatus() {
356391
return this.pendingNotifyStatus;
357392
}
358393

394+
private void reportError(final ReadIndexStatus status, final Status st) {
395+
final long nowMs = Utils.monotonicMs();
396+
final List<ReadIndexState> states = status.getStates();
397+
final int taskCount = states.size();
398+
for (int i = 0; i < taskCount; i++) {
399+
final ReadIndexState task = states.get(i);
400+
final ReadIndexClosure done = task.getDone();
401+
if (done != null) {
402+
this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
403+
done.run(st);
404+
}
405+
}
406+
}
407+
359408
private void notifySuccess(final ReadIndexStatus status) {
360409
final long nowMs = Utils.monotonicMs();
361410
final List<ReadIndexState> states = status.getStates();

jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ boolean prepareEntry(final long nextSendingIndex, final int offset, final RaftOu
753753
}
754754
emb.setTerm(entry.getId().getTerm());
755755
if (entry.hasChecksum()) {
756-
emb.setChecksum(entry.getChecksum()); //since 1.2.6
756+
emb.setChecksum(entry.getChecksum()); // since 1.2.6
757757
}
758758
emb.setType(entry.getType());
759759
if (entry.getPeers() != null) {

jraft-core/src/main/java/com/alipay/sofa/jraft/entity/ReadIndexStatus.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
*/
2828
public class ReadIndexStatus {
2929

30-
private final ReadIndexRequest request; //raw request
31-
private final List<ReadIndexState> states; //read index requests in batch.
32-
private final long index; //committed log index.
30+
private final ReadIndexRequest request; // raw request
31+
private final List<ReadIndexState> states; // read index requests in batch.
32+
private final long index; // committed log index.
3333

3434
public ReadIndexStatus(List<ReadIndexState> states, ReadIndexRequest request, long index) {
3535
super();

jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
*/
1717
package com.alipay.sofa.jraft.core;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertTrue;
22+
1923
import java.util.concurrent.CountDownLatch;
2024

2125
import org.junit.After;
@@ -47,10 +51,6 @@
4751
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
4852
import com.alipay.sofa.jraft.test.TestUtils;
4953

50-
import static org.junit.Assert.assertEquals;
51-
import static org.junit.Assert.assertFalse;
52-
import static org.junit.Assert.assertTrue;
53-
5454
@RunWith(value = MockitoJUnitRunner.class)
5555
public class FSMCallerTest {
5656
private FSMCallerImpl fsmCaller;
@@ -67,9 +67,9 @@ public void setup() {
6767
this.fsmCaller = new FSMCallerImpl();
6868
this.closureQueue = new ClosureQueueImpl();
6969
final FSMCallerOptions opts = new FSMCallerOptions();
70-
Mockito.when(node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
71-
opts.setNode(node);
72-
opts.setFsm(fsm);
70+
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
71+
opts.setNode(this.node);
72+
opts.setFsm(this.fsm);
7373
opts.setLogManager(this.logManager);
7474
opts.setBootstrapId(new LogId(10, 1));
7575
opts.setClosureQueue(this.closureQueue);
@@ -93,13 +93,13 @@ public void testShutdownJoin() throws Exception {
9393

9494
@Test
9595
public void testOnCommittedError() throws Exception {
96-
Mockito.when(logManager.getTerm(10)).thenReturn(1L);
97-
Mockito.when(logManager.getEntry(11)).thenReturn(null);
96+
Mockito.when(this.logManager.getTerm(10)).thenReturn(1L);
97+
Mockito.when(this.logManager.getEntry(11)).thenReturn(null);
9898

9999
assertTrue(this.fsmCaller.onCommitted(11));
100100

101101
this.fsmCaller.flush();
102-
assertEquals(this.fsmCaller.getLastAppliedIndex(), 11);
102+
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
103103
Mockito.verify(this.logManager).setAppliedId(new LogId(10, 1));
104104
assertFalse(this.fsmCaller.getError().getStatus().isOk());
105105
assertEquals("Fail to get entry at index=11 while committed_index=11", this.fsmCaller.getError().getStatus()
@@ -111,8 +111,8 @@ public void testOnCommitted() throws Exception {
111111
final LogEntry log = new LogEntry(EntryType.ENTRY_TYPE_DATA);
112112
log.getId().setIndex(11);
113113
log.getId().setTerm(1);
114-
Mockito.when(logManager.getTerm(11)).thenReturn(1L);
115-
Mockito.when(logManager.getEntry(11)).thenReturn(log);
114+
Mockito.when(this.logManager.getTerm(11)).thenReturn(1L);
115+
Mockito.when(this.logManager.getEntry(11)).thenReturn(log);
116116
final ArgumentCaptor<Iterator> itArg = ArgumentCaptor.forClass(Iterator.class);
117117

118118
assertTrue(this.fsmCaller.onCommitted(11));
@@ -138,7 +138,7 @@ public void testOnSnapshotLoad() throws Exception {
138138
this.fsmCaller.onSnapshotLoad(new LoadSnapshotClosure() {
139139

140140
@Override
141-
public void run(Status status) {
141+
public void run(final Status status) {
142142
assertTrue(status.isOk());
143143
latch.countDown();
144144
}
@@ -164,7 +164,7 @@ public void testOnSnapshotLoadFSMError() throws Exception {
164164
this.fsmCaller.onSnapshotLoad(new LoadSnapshotClosure() {
165165

166166
@Override
167-
public void run(Status status) {
167+
public void run(final Status status) {
168168
assertFalse(status.isOk());
169169
assertEquals(-1, status.getCode());
170170
assertEquals("StateMachine onSnapshotLoad failed", status.getErrorMsg());
@@ -186,14 +186,14 @@ public void testOnSnapshotSaveEmptyConf() throws Exception {
186186
this.fsmCaller.onSnapshotSave(new SaveSnapshotClosure() {
187187

188188
@Override
189-
public void run(Status status) {
189+
public void run(final Status status) {
190190
assertFalse(status.isOk());
191191
assertEquals("Empty conf entry for lastAppliedIndex=10", status.getErrorMsg());
192192
latch.countDown();
193193
}
194194

195195
@Override
196-
public SnapshotWriter start(SnapshotMeta meta) {
196+
public SnapshotWriter start(final SnapshotMeta meta) {
197197
// TODO Auto-generated method stub
198198
return null;
199199
}
@@ -209,12 +209,12 @@ public void testOnSnapshotSave() throws Exception {
209209
final SaveSnapshotClosure done = new SaveSnapshotClosure() {
210210

211211
@Override
212-
public void run(Status status) {
212+
public void run(final Status status) {
213213

214214
}
215215

216216
@Override
217-
public SnapshotWriter start(SnapshotMeta meta) {
217+
public SnapshotWriter start(final SnapshotMeta meta) {
218218
assertEquals(10, meta.getLastIncludedIndex());
219219
return writer;
220220
}
@@ -269,7 +269,7 @@ public void testOnSnapshotLoadStale() throws Exception {
269269
this.fsmCaller.onSnapshotLoad(new LoadSnapshotClosure() {
270270

271271
@Override
272-
public void run(Status status) {
272+
public void run(final Status status) {
273273
assertFalse(status.isOk());
274274
assertEquals(RaftError.ESTALE, status.getRaftError());
275275
latch.countDown();

0 commit comments

Comments
 (0)