Skip to content

Commit 7b509bc

Browse files
committed
fix: some error
1 parent e5bf0df commit 7b509bc

File tree

15 files changed

+460
-430
lines changed

15 files changed

+460
-430
lines changed

config.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
package raft
22

33
import (
4+
"github.com/bwmarrin/snowflake"
45
"google.golang.org/grpc"
56
"google.golang.org/grpc/credentials/insecure"
67
"time"
78
)
89

910
type Config struct {
11+
// MemberId 节点的id
12+
MemberId uint64
13+
14+
// Address 节点的监听地址
15+
Address string
16+
1017
// MaxLogEntriesPerRequest 每次最多请求的日志条目
1118
MaxLogEntriesPerRequest uint64
1219

@@ -16,9 +23,6 @@ type Config struct {
1623
// ElectionTimeout 选举超时
1724
ElectionTimeout time.Duration
1825

19-
// ElectionTimeoutThresholdPercent 选举超时的阈值
20-
ElectionTimeoutThresholdPercent float64
21-
2226
// DialTimeout 拨号超时时间
2327
DialTimeout time.Duration
2428

@@ -28,20 +32,29 @@ type Config struct {
2832
// ServerOptions 服务端参数设置
2933
ServerOptions []grpc.ServerOption
3034

35+
// SnapshotPath 快照存储路径
3136
SnapshotPath string
37+
38+
// LogPath 日志文件路径
39+
LogPath string
3240
}
3341

3442
func DefaultConfig() *Config {
43+
node, _ := snowflake.NewNode(time.Now().UnixNano())
3544
return &Config{
36-
HeartbeatInterval: time.Millisecond * 100,
37-
ElectionTimeout: time.Millisecond * 150,
38-
DialTimeout: time.Second,
45+
MemberId: uint64(node.Generate().Int64()),
46+
Address: "localhost:4399",
47+
MaxLogEntriesPerRequest: 40,
48+
HeartbeatInterval: time.Millisecond * 100,
49+
ElectionTimeout: time.Millisecond * 150,
50+
DialTimeout: time.Second,
3951
DialOptions: []grpc.DialOption{
4052
grpc.WithTransportCredentials(insecure.NewCredentials()),
4153
},
4254
ServerOptions: []grpc.ServerOption{
4355
grpc.Creds(insecure.NewCredentials()),
4456
},
4557
SnapshotPath: "./snapshot",
58+
LogPath: "./log",
4659
}
4760
}

event.go

Lines changed: 0 additions & 39 deletions
This file was deleted.

event_dispatcher.go

Lines changed: 0 additions & 57 deletions
This file was deleted.

event_dispatcher_test.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

example/kv/main.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"flag"
6+
"fmt"
7+
"github.com/gin-gonic/gin"
8+
log "github.com/treeforest/logger"
9+
"github.com/treeforest/raft"
10+
"os"
11+
"os/signal"
12+
"strings"
13+
"sync"
14+
"time"
15+
)
16+
17+
type Command struct {
18+
Key string
19+
Value string
20+
}
21+
22+
type Server struct {
23+
raft.StateMachine
24+
peer raft.Raft
25+
r *gin.Engine
26+
state map[string]string
27+
locker sync.RWMutex
28+
pool *sync.Pool
29+
}
30+
31+
func New() *Server {
32+
return &Server{
33+
state: map[string]string{},
34+
locker: sync.RWMutex{},
35+
pool: &sync.Pool{
36+
New: func() interface{} {
37+
return new(Command)
38+
},
39+
},
40+
}
41+
}
42+
43+
// Save 读取状态机快照
44+
func (s *Server) Save() ([]byte, error) {
45+
s.locker.RLock()
46+
defer s.locker.RUnlock()
47+
return json.Marshal(s.state)
48+
}
49+
50+
// Recovery 从快照中恢复状态机状态
51+
func (s *Server) Recovery(state []byte) error {
52+
s.locker.Lock()
53+
defer s.locker.Unlock()
54+
return json.Unmarshal(state, &s.state)
55+
}
56+
57+
// Apply 状态机执行命令的回调函数
58+
func (s *Server) Apply(commandName string, command []byte) {
59+
c := s.pool.Get().(*Command)
60+
_ = json.Unmarshal(command, c)
61+
switch commandName {
62+
case "set":
63+
s.state[c.Key] = c.Value
64+
case "del":
65+
delete(s.state, c.Key)
66+
}
67+
s.pool.Put(c)
68+
}
69+
70+
func (s *Server) Serve(addr string) {
71+
r := gin.Default()
72+
r.POST("/set", func(c *gin.Context) {
73+
if !s.peer.IsLeader() {
74+
c.JSON(200, gin.H{"code": -1, "leader": s.peer.LeaderAddress()})
75+
}
76+
key := c.Query("key")
77+
val := c.Query("val")
78+
79+
})
80+
81+
s.r = r
82+
if err := s.r.Run(addr); err != nil {
83+
log.Error(err)
84+
}
85+
}
86+
87+
func main() {
88+
port := flag.Int("port", 0, "raft server port")
89+
addr := flag.String("addr", "localhost:0", "web address")
90+
existing := flag.String("existing", "", "existing raft member")
91+
flag.Parse()
92+
93+
s := New()
94+
95+
config := raft.DefaultConfig()
96+
config.Address = fmt.Sprintf("localhost:%d", *port)
97+
config.LogPath = fmt.Sprintf("%d", *port)
98+
99+
peer := raft.New(raft.DefaultConfig(), s)
100+
if err := peer.Start(); err != nil {
101+
log.Fatal(err)
102+
}
103+
104+
peer.Join(strings.Split(*existing, ","))
105+
106+
go func() {
107+
s.peer = peer
108+
s.Serve(*addr)
109+
}()
110+
111+
done := make(chan os.Signal, 1)
112+
signal.Notify(done, os.Interrupt, os.Kill)
113+
<-done
114+
115+
srv.Stop()
116+
time.Sleep(time.Millisecond * 500)
117+
}

go.mod

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,38 @@ module github.com/treeforest/raft
33
go 1.17
44

55
require (
6+
github.com/bwmarrin/snowflake v0.3.0
7+
github.com/gin-gonic/gin v1.7.7
68
github.com/gogo/protobuf v1.3.2
79
github.com/golang/protobuf v1.5.2
8-
github.com/stretchr/testify v1.7.0
910
github.com/syndtr/goleveldb v1.0.0
1011
github.com/treeforest/logger v0.1.0
1112
google.golang.org/grpc v1.45.0
1213
)
1314

1415
require (
15-
github.com/davecgh/go-spew v1.1.0 // indirect
16+
github.com/davecgh/go-spew v1.1.1 // indirect
1617
github.com/fatih/color v1.13.0 // indirect
18+
github.com/gin-contrib/sse v0.1.0 // indirect
19+
github.com/go-playground/locales v0.13.0 // indirect
20+
github.com/go-playground/universal-translator v0.17.0 // indirect
21+
github.com/go-playground/validator/v10 v10.4.1 // indirect
1722
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
23+
github.com/json-iterator/go v1.1.9 // indirect
24+
github.com/leodido/go-urn v1.2.0 // indirect
1825
github.com/mattn/go-colorable v0.1.9 // indirect
1926
github.com/mattn/go-isatty v0.0.14 // indirect
27+
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
28+
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
2029
github.com/pmezard/go-difflib v1.0.0 // indirect
30+
github.com/stretchr/testify v1.7.0 // indirect
31+
github.com/ugorji/go/codec v1.1.7 // indirect
32+
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
2133
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
2234
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
2335
golang.org/x/text v0.3.3 // indirect
2436
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
2537
google.golang.org/protobuf v1.26.0 // indirect
38+
gopkg.in/yaml.v2 v2.2.8 // indirect
2639
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
2740
)

0 commit comments

Comments
 (0)