Skip to content

Commit 7f1e8c3

Browse files
committed
fix: prevLogIndex error
1 parent 5b1d968 commit 7f1e8c3

File tree

4 files changed

+44
-17
lines changed

4 files changed

+44
-17
lines changed

config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ func DefaultConfig() *Config {
5454
Address: "localhost:4399",
5555
MaxLogEntriesPerRequest: 40,
5656
HeartbeatInterval: time.Millisecond * 100,
57-
HeartbeatTimeout: time.Millisecond * 300,
58-
ElectionTimeout: time.Millisecond * 200,
57+
HeartbeatTimeout: time.Millisecond * 250,
58+
ElectionTimeout: time.Millisecond * 300,
5959
DialTimeout: time.Millisecond * 300,
6060
DialOptions: []grpc.DialOption{
6161
grpc.WithTransportCredentials(insecure.NewCredentials()),

example/kv/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"net/http"
1111
"os"
1212
"os/signal"
13-
"strings"
1413
"sync"
1514
"time"
1615
)
@@ -161,7 +160,7 @@ func main() {
161160
}
162161

163162
if *existing != "" {
164-
peer.Join(strings.Split(*existing, ","))
163+
peer.Join(*existing)
165164
}
166165

167166
done := make(chan os.Signal, 1)

log.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]p
219219

220220
// Return an error if the index doesn't exist.
221221
if index > (uint64(len(l.entries)) + l.startIndex) {
222-
panic(fmt.Sprintf("raft: Index is beyond end of log: %v %v", len(l.entries), index))
222+
panic(fmt.Sprintf("index is beyond end of log: %v %v", len(l.entries), index))
223223
}
224224

225225
// If we're going from the beginning of the log then return the whole log.
@@ -325,38 +325,37 @@ func (l *Log) flushCommitIndex() {
325325
_ = l.levelDB.Put([]byte(commitIndexKey), uint64ToBytes(l.commitIndex), nil)
326326
}
327327

328-
// Truncates the log to the given index and term. This only works if the log
329-
// at the index has not been committed.
328+
// truncate 截断index之后的未提交的所有日志条目,如果已提交或超出日志范围,则返回错误
330329
func (l *Log) truncate(index uint64, term uint64) error {
331330
l.mutex.Lock()
332331
defer l.mutex.Unlock()
333332
log.Debug("log.truncate: ", index)
334333

335-
// Do not allow committed entries to be truncated.
334+
// 不允许截断已经提交的日志,即要求 index >= commitIndex
336335
if index < l.commitIndex {
337336
log.Debug("log.truncate.before")
338-
return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.commitIndex, index, term)
337+
return fmt.Errorf("index is already committed (%v): (IDX=%v, TERM=%v)", l.commitIndex, index, term)
339338
}
340339

341-
// Do not truncate past end of entries.
340+
// 要截断的日志超出当前日志范围,返回错误
342341
if index > l.startIndex+uint64(len(l.entries)) {
343342
log.Debug("log.truncate.after")
344-
return fmt.Errorf("raft.Log: Entry index does not exist (MAX=%v): (IDX=%v, TERM=%v)", len(l.entries), index, term)
343+
return fmt.Errorf("entry index does not exist (MAX=%v): (IDX=%v, TERM=%v)", len(l.entries), index, term)
345344
}
346345

347-
// If we're truncating everything then just clear the entries.
346+
// 开始截断操作
348347
if index == l.startIndex {
349348
log.Debug("log.truncate.clear")
350349
l.entries = []pb.LogEntry{}
351350
} else {
352-
// Do not truncate if the Entry at index does not have the matching term.
351+
// 相同索引,却是不同任期,则不进行截断,并返回错误
353352
entry := l.entries[index-l.startIndex-1]
354353
if len(l.entries) > 0 && entry.Term != term {
355354
log.Debug("log.truncate.termMismatch")
356355
return fmt.Errorf("entry at index does not have matching term (%v): (IDX=%v, TERM=%v)", entry.Term, index, term)
357356
}
358357

359-
// otherwise, truncate up to the desired Entry.
358+
// 截断所要求的条目
360359
if index < l.startIndex+uint64(len(l.entries)) {
361360
log.Debug("log.truncate.finish")
362361
for i := index - l.startIndex; i < uint64(len(l.entries)); i++ {

server.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ func (s *server) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest
234234
log.Warn("stale term")
235235
return resp, nil
236236
} else if req.Term == currentTerm {
237-
// TODO: 相同任期,都是leader的情况
238237
if s.State() == pb.NodeState_Candidate {
239238
s.setState(pb.NodeState_Follower)
240239
}
@@ -251,7 +250,7 @@ func (s *server) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest
251250
// 2、reply false if log doesn't contain an Entry at prevLogIndex whose
252251
// term matches prevLogTerm
253252
if err = s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
254-
log.Warn(err)
253+
log.Debug(err)
255254
return resp, nil
256255
}
257256

@@ -341,6 +340,7 @@ func (s *server) Membership(ctx context.Context, req *pb.MembershipRequest) (
341340
continue
342341
}
343342
s.members.Store(m.Id, newMember(m, s))
343+
log.Infof("add member %d %s", m.Id, m.Address)
344344
}
345345
return &pb.MembershipResponse{Success: true}, nil
346346
}
@@ -383,6 +383,8 @@ func (s *server) Stop() {
383383
return
384384
}
385385

386+
s.leaveCluster()
387+
386388
// 结束运行的事件循环
387389
close(s.stopped)
388390

@@ -394,7 +396,31 @@ func (s *server) Stop() {
394396
s.log.close()
395397
s.setState(pb.NodeState_Stopped)
396398

397-
log.Info("state")
399+
log.Info("stopped")
400+
}
401+
402+
// leaveCluster 离开集群
403+
func (s *server) leaveCluster() {
404+
if s.IsLeader() {
405+
s.members.Range(func(key, value interface{}) bool {
406+
m := value.(*member)
407+
_ = m.sendRemoveMember(&pb.MemberRequest{Leader: true, Member: s.Self()})
408+
return true
409+
})
410+
return
411+
}
412+
413+
if s.State() == pb.NodeState_Candidate {
414+
return
415+
}
416+
417+
value, ok := s.members.Load(s.leaderId)
418+
if !ok {
419+
return
420+
}
421+
422+
leader := value.(*member)
423+
_ = leader.sendRemoveMember(&pb.MemberRequest{Leader: false, Member: s.Self()})
398424
}
399425

400426
func (s *server) IsLeader() bool {
@@ -649,6 +675,7 @@ func (s *server) leaderLoop() {
649675

650676
case resp := <-s.leaderRespChan:
651677
if resp.Term > s.CurrentTerm() {
678+
// 主动退位
652679
s.updateCurrentTerm(resp.Term, 0)
653680
break
654681
}
@@ -703,6 +730,7 @@ func (s *server) addMember(mem pb.Member) {
703730
m := newMember(mem, s)
704731

705732
if s.State() == pb.NodeState_Leader {
733+
m.setPrevLogIndex(s.log.CurrentIndex())
706734
m.startHeartbeat()
707735
}
708736

@@ -741,6 +769,7 @@ func (s *server) removeMember(id uint64) {
741769
if !loaded {
742770
return
743771
}
772+
log.Infof("remove member %d", id)
744773

745774
if s.IsLeader() {
746775
m := val.(*member)

0 commit comments

Comments
 (0)