Skip to content

Commit 913aba9

Browse files
committed
update: interface to uint64
1 parent ba15067 commit 913aba9

File tree

3 files changed

+30
-30
lines changed

3 files changed

+30
-30
lines changed

example/kv/main.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,19 @@ func (s *Server) Recovery(state []byte) error {
6464

6565
// Apply 状态机执行命令的回调函数
6666
func (s *Server) Apply(commandName string, command []byte) {
67-
cmd := s.cmdPool.Get().(*Command)
68-
_ = json.Unmarshal(command, &cmd)
69-
70-
switch commandName {
71-
case SET:
72-
s.state[cmd.Key] = cmd.Value
73-
case DEL:
74-
delete(s.state, cmd.Key)
75-
}
76-
77-
*cmd = Command{}
78-
s.cmdPool.Put(cmd)
67+
return
68+
//cmd := s.cmdPool.Get().(*Command)
69+
//_ = json.Unmarshal(command, &cmd)
70+
//
71+
//switch commandName {
72+
//case SET:
73+
// s.state[cmd.Key] = cmd.Value
74+
//case DEL:
75+
// delete(s.state, cmd.Key)
76+
//}
77+
//
78+
//*cmd = Command{}
79+
//s.cmdPool.Put(cmd)
7980
}
8081

8182
func (s *Server) Serve(addr string) {

log.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/treeforest/raft/pb"
99
"io"
1010
"os"
11-
"strconv"
1211
"sync"
1312
"time"
1413
)
@@ -53,11 +52,11 @@ func newLog(path string, applyFunc func(string, []byte)) *Log {
5352
}
5453

5554
func (l *Log) Subscribe(index uint64, ttl time.Duration) Subscription {
56-
return l.ps.Subscribe(strconv.FormatUint(index, 10), ttl)
55+
return l.ps.Subscribe(index, ttl)
5756
}
5857

5958
func (l *Log) publish(index uint64) {
60-
_ = l.ps.Publish(strconv.FormatUint(index, 10), struct{}{})
59+
_ = l.ps.Publish(index, struct{}{})
6160
}
6261

6362
// CommitIndex the last committed index in the log.

pubsub.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88

99
type PubSub struct {
1010
sync.RWMutex
11-
subscriptions map[string]*Set // topic -> subscriptions
11+
subscriptions map[uint64]*Set // topic -> subscriptions
1212
}
1313

1414
type subscription struct {
15-
top string
15+
top uint64
1616
ttl time.Duration
1717
c chan interface{}
1818
}
@@ -33,24 +33,24 @@ func (s *subscription) Listen() (interface{}, error) {
3333

3434
func NewPubSub() *PubSub {
3535
return &PubSub{
36-
subscriptions: make(map[string]*Set),
36+
subscriptions: make(map[uint64]*Set),
3737
}
3838
}
3939

4040
const (
41-
subscriptionBuffSize = 50
41+
subscriptionBuffSize = 500
4242
)
4343

4444
// Publish 发布topic对应的item,监听topic的所有订阅者都将收到item
45-
func (ps *PubSub) Publish(topic string, item interface{}) error {
45+
func (ps *PubSub) Publish(topic uint64, item interface{}) error {
4646
ps.RLock()
4747
defer ps.RUnlock()
4848
s, subscribed := ps.subscriptions[topic]
4949
if !subscribed {
5050
return errors.New("no subscribers")
5151
}
5252
for _, sub := range s.ToArray() {
53-
c := sub.(*subscription).c
53+
c := sub.c
5454
// Not enough room in buffer, continue in order to not block publisher
5555
if len(c) == subscriptionBuffSize {
5656
continue
@@ -61,7 +61,7 @@ func (ps *PubSub) Publish(topic string, item interface{}) error {
6161
}
6262

6363
// Subscribe 订阅事件,返回一个Listen接口
64-
func (ps *PubSub) Subscribe(topic string, ttl time.Duration) Subscription {
64+
func (ps *PubSub) Subscribe(topic uint64, ttl time.Duration) Subscription {
6565
sub := &subscription{
6666
top: topic,
6767
ttl: ttl,
@@ -101,21 +101,21 @@ func (ps *PubSub) unSubscribe(sub *subscription) {
101101
}
102102

103103
type Set struct {
104-
items map[interface{}]struct{}
104+
items map[*subscription]struct{}
105105
lock *sync.RWMutex
106106
}
107107

108108
func NewSet() *Set {
109-
return &Set{lock: &sync.RWMutex{}, items: make(map[interface{}]struct{})}
109+
return &Set{lock: &sync.RWMutex{}, items: make(map[*subscription]struct{})}
110110
}
111111

112-
func (s *Set) Add(item interface{}) {
112+
func (s *Set) Add(item *subscription) {
113113
s.lock.Lock()
114114
defer s.lock.Unlock()
115115
s.items[item] = struct{}{}
116116
}
117117

118-
func (s *Set) Exists(item interface{}) bool {
118+
func (s *Set) Exists(item *subscription) bool {
119119
s.lock.RLock()
120120
defer s.lock.RUnlock()
121121
_, exists := s.items[item]
@@ -128,10 +128,10 @@ func (s *Set) Size() int {
128128
return len(s.items)
129129
}
130130

131-
func (s *Set) ToArray() []interface{} {
131+
func (s *Set) ToArray() []*subscription {
132132
s.lock.RLock()
133133
defer s.lock.RUnlock()
134-
a := make([]interface{}, len(s.items))
134+
a := make([]*subscription, len(s.items))
135135
i := 0
136136
for item := range s.items {
137137
a[i] = item
@@ -143,10 +143,10 @@ func (s *Set) ToArray() []interface{} {
143143
func (s *Set) Clear() {
144144
s.lock.Lock()
145145
defer s.lock.Unlock()
146-
s.items = make(map[interface{}]struct{})
146+
s.items = make(map[*subscription]struct{})
147147
}
148148

149-
func (s *Set) Remove(item interface{}) {
149+
func (s *Set) Remove(item *subscription) {
150150
s.lock.Lock()
151151
defer s.lock.Unlock()
152152
delete(s.items, item)

0 commit comments

Comments
 (0)