Skip to content

Commit cf0b58e

Browse files
committed
feat: pubsub and replication's type
1 parent 7f1e8c3 commit cf0b58e

File tree

7 files changed

+292
-58
lines changed

7 files changed

+292
-58
lines changed

config.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ type Config struct {
4444
// LogPath 日志文件路径
4545
LogPath string
4646

47-
// PullMembershipInterval 同步成员信息的间隔时间
48-
PullMembershipInterval time.Duration
47+
// SubscribeTTL 订阅事件的超时时间
48+
SubscribeTTL time.Duration
49+
50+
// ReplicationType 日志复制类型
51+
ReplicationType ReplicationType
4952
}
5053

5154
func DefaultConfig() *Config {
@@ -54,7 +57,7 @@ func DefaultConfig() *Config {
5457
Address: "localhost:4399",
5558
MaxLogEntriesPerRequest: 40,
5659
HeartbeatInterval: time.Millisecond * 100,
57-
HeartbeatTimeout: time.Millisecond * 250,
60+
HeartbeatTimeout: time.Millisecond * 300,
5861
ElectionTimeout: time.Millisecond * 300,
5962
DialTimeout: time.Millisecond * 300,
6063
DialOptions: []grpc.DialOption{
@@ -63,8 +66,9 @@ func DefaultConfig() *Config {
6366
ServerOptions: []grpc.ServerOption{
6467
grpc.Creds(insecure.NewCredentials()),
6568
},
66-
SnapshotPath: "./snapshot",
67-
LogPath: "./log",
68-
PullMembershipInterval: time.Second * 5,
69+
SnapshotPath: "./snapshot",
70+
LogPath: "./log",
71+
SubscribeTTL: time.Second * 3,
72+
ReplicationType: Asynchronous,
6973
}
7074
}

example/kv/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ func (s *Server) Serve(addr string) {
8686
cmd.Value = c.Query("value")
8787
data, _ := json.Marshal(cmd)
8888

89-
index, err := s.peer.Do(SET, data)
89+
entry, err := s.peer.Do(SET, data)
9090
if err != nil {
9191
c.JSON(http.StatusOK, gin.H{"code": -1, "error": err.Error()})
9292
return
9393
}
9494

95-
log.Info("index=", index)
95+
log.Info("index=", entry.Index)
9696
c.JSON(http.StatusOK, gin.H{"code": 0})
9797
})
9898
r.POST("/del", func(c *gin.Context) {
@@ -105,13 +105,13 @@ func (s *Server) Serve(addr string) {
105105
cmd.Key = c.Query("key")
106106
data, _ := json.Marshal(cmd)
107107

108-
index, err := s.peer.Do(SET, data)
108+
entry, err := s.peer.Do(SET, data)
109109
if err != nil {
110110
c.JSON(http.StatusOK, gin.H{"code": -1, "error": err.Error()})
111111
return
112112
}
113113

114-
log.Info("index=", index)
114+
log.Info("index=", entry.Index)
115115
c.JSON(http.StatusOK, gin.H{"code": 0})
116116
})
117117
r.GET("/get", func(c *gin.Context) {

log.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
log "github.com/treeforest/logger"
99
"github.com/treeforest/raft/pb"
1010
"os"
11+
"strconv"
1112
"sync"
13+
"time"
1214
)
1315

1416
const (
@@ -19,6 +21,8 @@ const (
1921
currentTermKey = "__latest_term_key__"
2022
)
2123

24+
type notify []<-chan struct{}
25+
2226
// Log a log is a collection of log entries that are persisted to durable storage.
2327
type Log struct {
2428
ApplyFunc func(commandName string, command []byte)
@@ -31,17 +35,27 @@ type Log struct {
3135
startTerm uint64
3236
currentTerm uint64
3337
initialized bool
38+
pubsub *PubSub
3439
}
3540

3641
// newLog creates a new log.
3742
func newLog(path string, applyFunc func(string, []byte)) *Log {
3843
l := &Log{
3944
ApplyFunc: applyFunc,
4045
entries: make([]pb.LogEntry, 0),
46+
pubsub: NewPubSub(),
4147
}
4248
return l
4349
}
4450

51+
func (l *Log) Subscribe(index uint64, ttl time.Duration) Subscription {
52+
return l.pubsub.Subscribe(strconv.FormatUint(index, 10), ttl)
53+
}
54+
55+
func (l *Log) publish(index uint64) {
56+
_ = l.pubsub.Publish(strconv.FormatUint(index, 10), struct{}{})
57+
}
58+
4559
// CommitIndex the last committed index in the log.
4660
func (l *Log) CommitIndex() uint64 {
4761
l.mutex.RLock()
@@ -310,6 +324,9 @@ func (l *Log) setCommitIndex(index uint64) error {
310324
// Update commit index.
311325
l.commitIndex = entry.Index
312326
l.flushCommitIndex()
327+
log.Infof("update commitIndex: %d", l.commitIndex)
328+
329+
l.publish(entry.Index)
313330

314331
// Apply the changes to the state machine and store the error code.
315332
l.ApplyFunc(entry.CommandName, entry.Command)

pubsub.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package raft
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
)
8+
9+
type PubSub struct {
10+
sync.RWMutex
11+
subscriptions map[string]*Set // topic -> subscriptions
12+
}
13+
14+
type subscription struct {
15+
top string
16+
ttl time.Duration
17+
c chan interface{}
18+
}
19+
20+
type Subscription interface {
21+
Listen() (interface{}, error)
22+
}
23+
24+
// Listen 监听订阅的结果,超时返回timed out
25+
func (s *subscription) Listen() (interface{}, error) {
26+
select {
27+
case <-time.After(s.ttl):
28+
return nil, errors.New("timed out")
29+
case item := <-s.c:
30+
return item, nil
31+
}
32+
}
33+
34+
func NewPubSub() *PubSub {
35+
return &PubSub{
36+
subscriptions: make(map[string]*Set),
37+
}
38+
}
39+
40+
const (
41+
subscriptionBuffSize = 50
42+
)
43+
44+
// Publish 发布topic对应的item,监听topic的所有订阅者都将收到item
45+
func (ps *PubSub) Publish(topic string, item interface{}) error {
46+
ps.RLock()
47+
defer ps.RUnlock()
48+
s, subscribed := ps.subscriptions[topic]
49+
if !subscribed {
50+
return errors.New("no subscribers")
51+
}
52+
for _, sub := range s.ToArray() {
53+
c := sub.(*subscription).c
54+
// Not enough room in buffer, continue in order to not block publisher
55+
if len(c) == subscriptionBuffSize {
56+
continue
57+
}
58+
c <- item
59+
}
60+
return nil
61+
}
62+
63+
// Subscribe 订阅事件,返回一个Listen接口
64+
func (ps *PubSub) Subscribe(topic string, ttl time.Duration) Subscription {
65+
sub := &subscription{
66+
top: topic,
67+
ttl: ttl,
68+
c: make(chan interface{}, subscriptionBuffSize),
69+
}
70+
71+
ps.Lock()
72+
// Add subscription to subscriptions map
73+
s, exists := ps.subscriptions[topic]
74+
// If no subscription set for the topic exists, create one
75+
if !exists {
76+
s = NewSet()
77+
ps.subscriptions[topic] = s
78+
}
79+
ps.Unlock()
80+
81+
// Add the subscription
82+
s.Add(sub)
83+
84+
// When the timeout expires, remove the subscription
85+
time.AfterFunc(ttl, func() {
86+
ps.unSubscribe(sub)
87+
})
88+
return sub
89+
}
90+
91+
func (ps *PubSub) unSubscribe(sub *subscription) {
92+
ps.Lock()
93+
defer ps.Unlock()
94+
ps.subscriptions[sub.top].Remove(sub)
95+
if ps.subscriptions[sub.top].Size() != 0 {
96+
return
97+
}
98+
// Else, this is the last subscription for the topic.
99+
// Remove the set from the subscriptions map
100+
delete(ps.subscriptions, sub.top)
101+
}
102+
103+
type Set struct {
104+
items map[interface{}]struct{}
105+
lock *sync.RWMutex
106+
}
107+
108+
func NewSet() *Set {
109+
return &Set{lock: &sync.RWMutex{}, items: make(map[interface{}]struct{})}
110+
}
111+
112+
func (s *Set) Add(item interface{}) {
113+
s.lock.Lock()
114+
defer s.lock.Unlock()
115+
s.items[item] = struct{}{}
116+
}
117+
118+
func (s *Set) Exists(item interface{}) bool {
119+
s.lock.RLock()
120+
defer s.lock.RUnlock()
121+
_, exists := s.items[item]
122+
return exists
123+
}
124+
125+
func (s *Set) Size() int {
126+
s.lock.RLock()
127+
defer s.lock.RUnlock()
128+
return len(s.items)
129+
}
130+
131+
func (s *Set) ToArray() []interface{} {
132+
s.lock.RLock()
133+
defer s.lock.RUnlock()
134+
a := make([]interface{}, len(s.items))
135+
i := 0
136+
for item := range s.items {
137+
a[i] = item
138+
i++
139+
}
140+
return a
141+
}
142+
143+
func (s *Set) Clear() {
144+
s.lock.Lock()
145+
defer s.lock.Unlock()
146+
s.items = make(map[interface{}]struct{})
147+
}
148+
149+
func (s *Set) Remove(item interface{}) {
150+
s.lock.Lock()
151+
defer s.lock.Unlock()
152+
delete(s.items, item)
153+
}

raft.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package raft
22

3+
import "github.com/treeforest/raft/pb"
4+
35
type Raft interface {
46
Start() error
57
Stop()
68
Join(existing string)
7-
Do(commandName string, command []byte) (uint64, error)
9+
Do(commandName string, command []byte) (*pb.LogEntry, error)
810
Running() bool
911
IsLeader() bool
1012
LeaderId() uint64

replication_type.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package raft
2+
3+
// ReplicationType 日志复制类型
4+
type ReplicationType int
5+
6+
const (
7+
// Synchronous 同步复制。leader执行完一个写请求后,必须等待大于1/2的follower
8+
// 都执行完毕,并收到确认后,才回复客户端写入成功。
9+
Synchronous ReplicationType = iota
10+
11+
// Asynchronous 异步复制。leader执行完写请求后,会立即将结果返回给客户端,
12+
// 无须等待其它副本是否写入完成。
13+
Asynchronous
14+
15+
// Semisynchronous 半同步复制。介于同步复制与异步复制之间一种复制机制。leader只
16+
// 需要等待一个follower执行完毕并返回确认信息,不需要等待大于1/2的follower都完成。
17+
// 保证至少有两个节点拥有最新的数据副本。
18+
Semisynchronous
19+
)

0 commit comments

Comments
 (0)