Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math/rand/v2"
"net/netip"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -118,7 +119,7 @@ func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routin
}
for i, ep := range endpoints {
endpointStartIndex := hashesPerEndpoint * i
for j := 0; j < hashesPerEndpoint; j++ {
for j := range hashesPerEndpoint {
ch.hashRing[endpointStartIndex+j] = endpointHash{i, hash(fmt.Sprintf("%s-%d", ep, j))}
}
}
Expand All @@ -127,7 +128,8 @@ func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routin
}

func newConsistentHash(endpoints []string) routing.LBAlgorithm {
return newConsistentHashInternal(endpoints, 100)
n := 10000 / len(endpoints)
return newConsistentHashInternal(endpoints, n)
}

func hash(s string) uint64 {
Expand All @@ -136,6 +138,10 @@ func hash(s string) uint64 {

func skipEndpoint(c *routing.LBContext, index int) bool {
host := c.Route.LBEndpoints[index].Host
if len(c.LBEndpoints) > 300 { // 300 see https://github.com/zalando/skipper/pull/3918/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to make this a constant with descriptive name

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes true

return !existsBinarySearch(host, c.LBEndpoints)
}
// linear scan for low numbers
for i := range c.LBEndpoints {
if c.LBEndpoints[i].Host == host {
return false
Expand All @@ -144,6 +150,30 @@ func skipEndpoint(c *routing.LBContext, index int) bool {
return true
}

func existsBinarySearch(target string, list []routing.LBEndpoint) bool {
targetAP, err := netip.ParseAddrPort(target)
if err != nil {
return false
}

idx := sort.Search(len(list), func(i int) bool {
Comment thread
MustafaSaber marked this conversation as resolved.
itemAP, err := netip.ParseAddrPort(list[i].Host)
if err != nil {
return false
}

if itemAP.Addr() != targetAP.Addr() {
return !itemAP.Addr().Less(targetAP.Addr())
}
return itemAP.Port() >= targetAP.Port()
})

if idx < len(list) && list[idx].Host == target {
return true
}
return false
}

// Returns index in hash ring with the closest hash to key's hash
func (ch *consistentHash) searchRing(key string, ctx *routing.LBContext) int {
h := hash(key)
Expand Down
110 changes: 98 additions & 12 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"math"
"net/http"
"net/netip"
"sort"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -229,10 +231,11 @@ func TestApply(t *testing.T) {
const R = 1000
const N = 10
eps := make([]string, 0, N)
for i := 0; i < N; i++ {
for i := range N {
ep := fmt.Sprintf("http://127.0.0.1:123%d/foo", i)
eps = append(eps, ep)
}
sortLBEndpoints(eps)

for _, tt := range []struct {
name string
Expand Down Expand Up @@ -283,7 +286,7 @@ func TestApply(t *testing.T) {
}

h := make(map[string]int)
for i := 0; i < R; i++ {
for range R {
lbe := tt.algorithm.Apply(lbctx)
h[lbe.Host] += 1
}
Expand Down Expand Up @@ -316,6 +319,7 @@ func TestConsistentHashSearch(t *testing.T) {
}

endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
sortLBEndpoints(endpoints)
const key = "192.168.0.1"

ep := apply(key, endpoints)
Expand All @@ -339,6 +343,7 @@ func TestConsistentHashSearch(t *testing.T) {

func TestConsistentHashBoundedLoadSearch(t *testing.T) {
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
sortLBEndpoints(endpoints)
r, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
route := NewAlgorithmProvider().Do([]*routing.Route{{
Route: eskip.Route{
Expand All @@ -359,7 +364,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
defer endpointRegistry.Close()
endpointRegistry.Do([]*routing.Route{route})
noLoad := ch.Apply(ctx)
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]any{}})

if noLoad != nonBounded {
t.Error("When no endpoints are overloaded, the chosen endpoint should be the same as standard consistentHash")
Expand Down Expand Up @@ -388,6 +393,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {

func TestConsistentHashKey(t *testing.T) {
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
sortLBEndpoints(endpoints)
ch := newConsistentHash(endpoints)

r, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
Expand All @@ -401,16 +407,16 @@ func TestConsistentHashKey(t *testing.T) {
},
}})[0]

defaultEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: make(map[string]interface{})})
remoteHostEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: net.RemoteHost(r).String()}})
defaultEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: make(map[string]any)})
remoteHostEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]any{ConsistentHashKey: net.RemoteHost(r).String()}})

if defaultEndpoint != remoteHostEndpoint {
t.Error("remote host should be used as a default key")
}

for i, ep := range endpoints {
key := fmt.Sprintf("%s-%d", ep, 1) // "ep-0" to "ep-99" is the range of keys for this endpoint. If we use this as the hash key it should select endpoint ep.
selected := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}})
selected := ch.Apply(&routing.LBContext{Request: r, Route: rt, LBEndpoints: rt.LBEndpoints, Params: map[string]any{ConsistentHashKey: key}})
if selected != rt.LBEndpoints[i] {
t.Errorf("expected: %v, got %v", rt.LBEndpoints[i], selected)
}
Expand All @@ -419,6 +425,7 @@ func TestConsistentHashKey(t *testing.T) {

func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
sortLBEndpoints(endpoints)
r, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
route := NewAlgorithmProvider().Do([]*routing.Route{{
Route: eskip.Route{
Expand All @@ -434,13 +441,13 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
Request: r,
Route: route,
LBEndpoints: route.LBEndpoints,
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
Params: map[string]any{ConsistentHashBalanceFactor: balanceFactor},
}
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
endpointRegistry.Do([]*routing.Route{route})

for i := 0; i < 100; i++ {
for range 100 {
ep := ch.Apply(ctx)
ifr0 := route.LBEndpoints[0].Metrics.InflightRequests()
ifr1 := route.LBEndpoints[1].Metrics.InflightRequests()
Expand Down Expand Up @@ -475,7 +482,7 @@ func TestConsistentHashKeyDistribution(t *testing.T) {
}

func addInflightRequests(registry *routing.EndpointRegistry, endpoint routing.LBEndpoint, count int) {
for i := 0; i < count; i++ {
for range count {
endpoint.Metrics.IncInflightRequest()
registry.GetMetrics(endpoint.Host).IncInflightRequest()
}
Expand Down Expand Up @@ -545,9 +552,88 @@ func BenchmarkRandomAlgorithm(b *testing.B) {
LBEndpoints: lbeps,
}

b.ResetTimer()

for n := 0; n < b.N; n++ {
for b.Loop() {
alg.Apply(lbc)
}
}

// similar to datasource processRouteDef() use of sort.SliceStable
func sortLBEndpoints(lbEndpoints []string) {
sort.SliceStable(lbEndpoints, func(i, j int) bool {
apI, errI := netip.ParseAddrPort(lbEndpoints[i])
apJ, errJ := netip.ParseAddrPort(lbEndpoints[j])

if errI != nil || errJ != nil {
return errI == nil
}

ipI := apI.Addr()
ipJ := apJ.Addr()

if ipI != ipJ {
return ipI.Less(ipJ)
}

return apI.Port() < apJ.Port()
})

}

func BenchmarkConsistentHash(b *testing.B) {
for _, N := range []int{10, 100, 1000} {
eps := make([]string, 0, N)
var j int
for i := range N {
j = i / 255
ep := fmt.Sprintf("http://10.0.%d.%d:8080/", j, i%255)
eps = append(eps, ep)
}
sortLBEndpoints(eps)

req, err := http.NewRequest("GET", "http://consistent.bench.test", nil)
if err != nil {
b.Fatalf("Failed to create request: %v", err)
}

route := NewAlgorithmProvider().Do([]*routing.Route{{
Route: eskip.Route{
BackendType: eskip.LBBackend,
LBAlgorithm: ConsistentHash.String(),
LBEndpoints: eps,
},
}})[0]

alg := newConsistentHash(eps)
lbCtx := &routing.LBContext{
Request: req,
Route: route,
LBEndpoints: route.LBEndpoints,
Params: map[string]any{
ConsistentHashKey: "Foo",
ConsistentHashBalanceFactor: 0.2,
},
}

// populate metrics
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
endpointRegistry.Do([]*routing.Route{route})

// set Foo header value
headerValues := [10000]string{}
for i := range len(headerValues) {
headerValues[i] = fmt.Sprintf("foo-%d", i)
}

b.Run(fmt.Sprintf("%d endpoints", N), func(b *testing.B) {
var iter int64
b.ResetTimer()
b.ReportAllocs()
for b.Loop() {
iter++
req.Header.Set("Foo", headerValues[iter%10000])
alg.Apply(lbCtx)
}
})
}
}
35 changes: 35 additions & 0 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2607,3 +2607,38 @@ func benchmarkCopyStream(b *testing.B, size int64, cpStream func(to flushWriter,
}
}
}

func BenchmarkConsistentHashSelectEndpoint(b *testing.B) {
for _, N := range []int{10, 100, 250, 300, 400, 500, 1000, 5000} {
eps := make([]string, 0, N)
var j int
for i := range N {
j = i / 255
ep := fmt.Sprintf("http://10.0.%d.%d:8080", j, i%255)
eps = append(eps, ep)
}
req, err := http.NewRequest("GET", "http://consistent.bench.test", nil)
if err != nil {
b.Fatalf("Failed to create request: %v", err)
}
doc := fmt.Sprintf(`r: * -> <consistentHash, "%s">`, strings.Join(eps, `", "`))
tp, err := newTestProxyWithParams(doc, Params{
AccessLogDisabled: false,
})
if err != nil {
b.Error(err)
return
}
defer tp.close()
route, _ := tp.routing.Get().Do(req)
ctx := newContext(nil, req, tp.proxy)
ctx.route = route
b.Run(fmt.Sprintf("%d endpoints", N), func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for b.Loop() {
tp.proxy.selectEndpoint(ctx)
}
})
}
}
21 changes: 21 additions & 0 deletions routing/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package routing
import (
"errors"
"fmt"
"net/netip"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -534,6 +535,26 @@ func processRouteDef(o *Options, cpm map[string]PredicateSpec, def *eskip.Route)
}

r := &Route{Route: *def, Scheme: scheme, Host: host, Predicates: cps, Filters: fs, weight: weight}
if def.LBAlgorithm == "consistentHash" {
sort.SliceStable(def.LBEndpoints, func(i, j int) bool {
Comment thread
MustafaSaber marked this conversation as resolved.
apI, errI := netip.ParseAddrPort(def.LBEndpoints[i])
apJ, errJ := netip.ParseAddrPort(def.LBEndpoints[j])

if errI != nil || errJ != nil {
return errI == nil
}

ipI := apI.Addr()
ipJ := apJ.Addr()

if ipI != ipJ {
return ipI.Less(ipJ)
}

return apI.Port() < apJ.Port()
})
}

if err := processTreePredicates(r, def.Predicates); err != nil {
return nil, err
}
Expand Down
Loading