From 60769923e606373f305ef2c052a3332593ae4d7b Mon Sep 17 00:00:00 2001 From: qnnn <65326092+qnnn@users.noreply.github.com> Date: Thu, 7 Sep 2023 15:45:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BD=93=E4=B8=80=E8=87=B4=E6=80=A7=E5=93=88?= =?UTF-8?q?=E5=B8=8C=E7=8E=AF=E4=B8=8A=E4=B8=8D=E5=AD=98=E5=9C=A8=E5=BD=93?= =?UTF-8?q?=E5=89=8Dserver=E8=8A=82=E7=82=B9=E6=97=B6=EF=BC=8C=E8=B7=B3?= =?UTF-8?q?=E8=BF=87=E9=81=8D=E5=8E=86=E5=AE=9E=E4=BE=8B=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 当一致性哈希环上不存在当前server节点时,跳过遍历实例逻辑 * fix:Referenced from [Pull Requests 387], in order to improve the processing of service discovery QPS when using api-http server * fix:Referenced from [Pull Requests 387], in order to improve the processing of service discovery QPS when using api-http server * fix:Referenced from [Pull Requests 387], in order to improve the processing of service discovery QPS when using api-http server * fix:Referenced from [Pull Requests 387], in order to improve the processing of service discovery QPS when using api-http server * 当前server节点不在哈希环上时,跳过遍历实例逻辑 * 当前server节点不在哈希环上时,跳过遍历实例逻辑 * 当一致性哈希环上不存在当前server节点时,跳过遍历实例逻辑 --- common/hash/ketama.go | 20 +++++++++++++++++--- service/healthcheck/dispatch.go | 4 ++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/common/hash/ketama.go b/common/hash/ketama.go index 479714b88..a30949b05 100644 --- a/common/hash/ketama.go +++ b/common/hash/ketama.go @@ -36,7 +36,8 @@ type continuumPoint struct { // Continuum consistent hash ring type Continuum struct { - ring points + ring points + hosts map[string]struct{} } type points []continuumPoint @@ -70,6 +71,7 @@ func New(buckets map[Bucket]bool) *Continuum { } ring := make(points, 0, numBuckets*160) + hosts := make(map[string]struct{}, numBuckets) var totalWeight uint32 for bucket := range buckets { @@ -81,7 +83,10 @@ func New(buckets map[Bucket]bool) *Continuum { // this is the equivalent of C's promotion rules, but in Go, to maintain exact compatibility with the C library limit := int(pct * 40.0 * float64(numBuckets)) - + // 跳过权重为0节点 + if limit != 0 { + hosts[bucket.Host] = struct{}{} + } for k := 0; k < limit; k++ { /* 40 hashes, 4 numbers per hash = 160 points per bucket */ ss := fmt.Sprintf("%s-%d", bucket.Host, k) @@ -100,7 +105,8 @@ func New(buckets map[Bucket]bool) *Continuum { sort.Sort(ring) return &Continuum{ - ring: ring, + ring: ring, + hosts: hosts, } } @@ -119,3 +125,11 @@ func (c *Continuum) Hash(h uint) string { return c.ring[i].bucket.Host } + +func (c *Continuum) ContainsHost(host string) bool { + if len(c.hosts) == 0 { + return false + } + _, ok := c.hosts[host] + return ok +} diff --git a/service/healthcheck/dispatch.go b/service/healthcheck/dispatch.go index 99fa57816..d6cfa5f0b 100644 --- a/service/healthcheck/dispatch.go +++ b/service/healthcheck/dispatch.go @@ -148,7 +148,7 @@ func (d *Dispatcher) reloadSelfContinuum() bool { func (d *Dispatcher) reloadManagedClients() { nextClients := make(map[string]*ClientWithChecker) - if d.continuum != nil { + if d.continuum != nil && d.continuum.ContainsHost(d.svr.localHost) { d.svr.cacheProvider.RangeHealthCheckClients(func(itemChecker ItemWithChecker, client *model.Client) { clientId := client.Proto().GetId().GetValue() host := d.continuum.Hash(itemChecker.GetHashValue()) @@ -187,7 +187,7 @@ func (d *Dispatcher) reloadManagedClients() { func (d *Dispatcher) reloadManagedInstances() { nextInstances := make(map[string]*InstanceWithChecker) - if d.continuum != nil { + if d.continuum != nil && d.continuum.ContainsHost(d.svr.localHost) { d.svr.cacheProvider.RangeHealthCheckInstances(func(itemChecker ItemWithChecker, instance *model.Instance) { instanceId := instance.ID() host := d.continuum.Hash(itemChecker.GetHashValue())