Skip to content

Commit ee56b98

Browse files
committed
fix(zero): make zero shutdown cleanly
1 parent bfe6f0c commit ee56b98

File tree

3 files changed

+36
-11
lines changed

3 files changed

+36
-11
lines changed

conn/pool.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ func (p *Pools) GetAll() []*Pool {
9191
return pool
9292
}
9393

94+
// RemoveAll removes all pool entries.
95+
func (p *Pools) RemoveAll() {
96+
p.Lock()
97+
defer p.Unlock()
98+
99+
for k, pool := range p.all {
100+
glog.Warningf("CONN: Disconnecting from %s\n", k)
101+
delete(p.all, k)
102+
pool.shutdown()
103+
}
104+
}
105+
94106
// RemoveInvalid removes invalid nodes from the list of pools.
95107
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
96108
// Keeps track of valid IP addresses, assigned to active nodes. We do this
@@ -277,11 +289,20 @@ func (p *Pool) MonitorHealth() {
277289

278290
// We might have lost connection to the destination. In that case, re-dial
279291
// the connection.
280-
reconnect := func() {
292+
// Returns true, if reconnection was successful
293+
reconnect := func() bool {
294+
reconnectionTicker := time.NewTicker(time.Second)
295+
defer reconnectionTicker.Stop()
281296
for {
282-
time.Sleep(time.Second)
297+
select {
298+
case <-p.closer.HasBeenClosed():
299+
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
300+
return false
301+
case <-reconnectionTicker.C:
302+
}
303+
283304
if err := p.closer.Ctx().Err(); err != nil {
284-
return
305+
return false
285306
}
286307
ctx, cancel := context.WithTimeout(p.closer.Ctx(), 10*time.Second)
287308
conn, err := grpc.NewClient(p.Addr, p.dialOpts...)
@@ -298,7 +319,7 @@ func (p *Pool) MonitorHealth() {
298319
}
299320
p.conn = conn
300321
p.Unlock()
301-
return
322+
return true
302323
}
303324
glog.Errorf("CONN: Unable to connect with %s : %s\n", p.Addr, err)
304325
if conn != nil {
@@ -309,19 +330,21 @@ func (p *Pool) MonitorHealth() {
309330
}
310331
}
311332

333+
ticker := time.NewTicker(time.Second)
334+
defer ticker.Stop()
312335
for {
313336
select {
314337
case <-p.closer.HasBeenClosed():
315338
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
316339
return
317-
default:
318-
err := p.listenToHeartbeat()
319-
if err != nil {
320-
reconnect()
340+
case <-ticker.C:
341+
}
342+
343+
err := p.listenToHeartbeat()
344+
if err != nil {
345+
if reconnect() {
321346
glog.Infof("CONN: Re-established connection with %s.\n", p.Addr)
322347
}
323-
// Sleep for a bit before retrying.
324-
time.Sleep(echoDuration)
325348
}
326349
}
327350
}

dgraph/cmd/zero/raft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ func (n *node) Run() {
879879
// snapshot can cause select loop to block while deleting entries, so run
880880
// it in goroutine
881881
readStateCh := make(chan raft.ReadState, 100)
882-
closer := z.NewCloser(5)
882+
closer := z.NewCloser(4)
883883
defer func() {
884884
closer.SignalAndWait()
885885
n.closer.Done()

dgraph/cmd/zero/run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ func run() {
346346
st.node.closer.SignalAndWait()
347347
// Stop all internal requests.
348348
_ = grpcListener.Close()
349+
// Stop all pools
350+
conn.GetPools().RemoveAll()
349351
}()
350352

351353
st.zero.closer.AddRunning(2)

0 commit comments

Comments
 (0)