diff --git a/common/hash/ketama.go b/common/hash/ketama.go index 479714b88..a30949b05 100644 --- a/common/hash/ketama.go +++ b/common/hash/ketama.go @@ -36,7 +36,8 @@ type continuumPoint struct { // Continuum consistent hash ring type Continuum struct { - ring points + ring points + hosts map[string]struct{} } type points []continuumPoint @@ -70,6 +71,7 @@ func New(buckets map[Bucket]bool) *Continuum { } ring := make(points, 0, numBuckets*160) + hosts := make(map[string]struct{}, numBuckets) var totalWeight uint32 for bucket := range buckets { @@ -81,7 +83,10 @@ func New(buckets map[Bucket]bool) *Continuum { // this is the equivalent of C's promotion rules, but in Go, to maintain exact compatibility with the C library limit := int(pct * 40.0 * float64(numBuckets)) - + // 跳过权重为0节点 + if limit != 0 { + hosts[bucket.Host] = struct{}{} + } for k := 0; k < limit; k++ { /* 40 hashes, 4 numbers per hash = 160 points per bucket */ ss := fmt.Sprintf("%s-%d", bucket.Host, k) @@ -100,7 +105,8 @@ func New(buckets map[Bucket]bool) *Continuum { sort.Sort(ring) return &Continuum{ - ring: ring, + ring: ring, + hosts: hosts, } } @@ -119,3 +125,11 @@ func (c *Continuum) Hash(h uint) string { return c.ring[i].bucket.Host } + +func (c *Continuum) ContainsHost(host string) bool { + if len(c.hosts) == 0 { + return false + } + _, ok := c.hosts[host] + return ok +} diff --git a/service/healthcheck/dispatch.go b/service/healthcheck/dispatch.go index 99fa57816..d6cfa5f0b 100644 --- a/service/healthcheck/dispatch.go +++ b/service/healthcheck/dispatch.go @@ -148,7 +148,7 @@ func (d *Dispatcher) reloadSelfContinuum() bool { func (d *Dispatcher) reloadManagedClients() { nextClients := make(map[string]*ClientWithChecker) - if d.continuum != nil { + if d.continuum != nil && d.continuum.ContainsHost(d.svr.localHost) { d.svr.cacheProvider.RangeHealthCheckClients(func(itemChecker ItemWithChecker, client *model.Client) { clientId := client.Proto().GetId().GetValue() host := d.continuum.Hash(itemChecker.GetHashValue()) @@ -187,7 +187,7 @@ func (d *Dispatcher) reloadManagedClients() { func (d *Dispatcher) reloadManagedInstances() { nextInstances := make(map[string]*InstanceWithChecker) - if d.continuum != nil { + if d.continuum != nil && d.continuum.ContainsHost(d.svr.localHost) { d.svr.cacheProvider.RangeHealthCheckInstances(func(itemChecker ItemWithChecker, instance *model.Instance) { instanceId := instance.ID() host := d.continuum.Hash(itemChecker.GetHashValue())