Skip to content

Commit 05e6036

Browse files
tengu-altjoao-r-reis
authored andcommitted
Endless query execution fix
Fix for the endless query execution when HostSelectionPolicy returns the same downed host. Internal HostSelectionPolicy will return the host only once if HostID is set. Documentation for external policies was added. patch by Oleksandr Luzhniy; reviewed by João Reis, for CASSGO-50
1 parent b251492 commit 05e6036

File tree

5 files changed

+18
-12
lines changed

5 files changed

+18
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6969

7070
- Don't panic in MapExecuteBatchCAS if no `[applied]` column is returned (CASSGO-42)
7171

72+
- Endless query execution fix (CASSGO-50)
73+
7274
## [1.7.0] - 2024-09-23
7375

7476
This release is the first after the donation of gocql to the Apache Software Foundation (ASF)

events_ccm_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestEventNodeDownControl(t *testing.T) {
104104
}
105105
session.pool.mu.RUnlock()
106106

107-
host := session.ring.getHost(node.Addr)
107+
host, _ := session.ring.getHost(node.Addr)
108108
if host == nil {
109109
t.Fatal("node not in metadata ring")
110110
} else if host.IsUp() {
@@ -146,7 +146,7 @@ func TestEventNodeDown(t *testing.T) {
146146
t.Fatal("node not removed after remove event")
147147
}
148148

149-
host := session.ring.getHost(node.Addr)
149+
host, _ := session.ring.getHost(node.Addr)
150150
if host == nil {
151151
t.Fatal("node not in metadata ring")
152152
} else if host.IsUp() {
@@ -203,7 +203,7 @@ func TestEventNodeUp(t *testing.T) {
203203
t.Fatal("node not added after node added event")
204204
}
205205

206-
host := session.ring.getHost(node.Addr)
206+
host, _ := session.ring.getHost(node.Addr)
207207
if host == nil {
208208
t.Fatal("node not in metadata ring")
209209
} else if !host.IsUp() {

policies.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ func (host *selectedHost) Info() *HostInfo {
323323
func (host *selectedHost) Mark(err error) {}
324324

325325
// NextHost is an iteration function over picked hosts
326+
// Should return nil eventually to prevent endless query execution.
326327
type NextHost func() SelectedHost
327328

328329
// RoundRobinHostPolicy is a round-robin load balancing policy, where each host

query_executor.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package gocql
2727
import (
2828
"context"
2929
"sync"
30+
"sync/atomic"
3031
"time"
3132
)
3233

@@ -89,14 +90,16 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
8990
// check if the host id is specified for the query,
9091
// if it is, the query should be executed at the corresponding host.
9192
if hostID := qry.GetHostID(); hostID != "" {
93+
host, ok := q.pool.session.ring.getHost(hostID)
94+
if !ok {
95+
return nil, ErrNoConnections
96+
}
97+
var returnedHostOnce int32 = 0
9298
hostIter = func() SelectedHost {
93-
pool, ok := q.pool.getPoolByHostID(hostID)
94-
// if the specified host is down
95-
// we return nil to avoid endless query execution in queryExecutor.do()
96-
if !ok || !pool.host.IsUp() {
97-
return nil
99+
if atomic.CompareAndSwapInt32(&returnedHostOnce, 0, 1) {
100+
return (*selectedHost)(host)
98101
}
99-
return (*selectedHost)(pool.host)
102+
return nil
100103
}
101104
}
102105

ring.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ func (r *ring) getHostByIP(ip string) (*HostInfo, bool) {
6767
return r.hosts[hi], ok
6868
}
6969

70-
func (r *ring) getHost(hostID string) *HostInfo {
70+
func (r *ring) getHost(hostID string) (host *HostInfo, ok bool) {
7171
r.mu.RLock()
72-
host := r.hosts[hostID]
72+
host, ok = r.hosts[hostID]
7373
r.mu.RUnlock()
74-
return host
74+
return
7575
}
7676

7777
func (r *ring) allHosts() []*HostInfo {

0 commit comments

Comments
 (0)