Skip to content
24 changes: 15 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type internalClient struct {
initAppConfigFunc func() (*config.AppConfig, error)
appConfig *config.AppConfig
cache *storage.Cache
configComponent *notify.ConfigComponent
components []component.Stoppable
}

func (c *internalClient) getAppConfig() config.AppConfig {
Expand Down Expand Up @@ -121,8 +121,10 @@ func StartWithConfig(loadAppConfig func() (*config.AppConfig, error)) (Client, e

c.cache = storage.CreateNamespaceConfig(appConfig.NamespaceName)
appConfig.Init()

serverlist.InitSyncServerIPList(c.getAppConfig)
// start ipList component
serverIPListComponent := serverlist.NewSyncServerIPListComponent(c.getAppConfig)
go component.StartRefreshConfig(serverIPListComponent)
c.appendComponent(serverIPListComponent)

//first sync
configs := syncApolloConfig.Sync(c.getAppConfig)
Expand All @@ -137,11 +139,9 @@ func StartWithConfig(loadAppConfig func() (*config.AppConfig, error)) (Client, e
log.Debug("init notifySyncConfigServices finished")

//start long poll sync config
configComponent := &notify.ConfigComponent{}
configComponent.SetAppConfig(c.getAppConfig)
configComponent.SetCache(c.cache)
configComponent := notify.NewConfigComponent(c.getAppConfig, c.cache)
go component.StartRefreshConfig(configComponent)
c.configComponent = configComponent
c.appendComponent(configComponent)

log.Info("agollo start finished ! ")

Expand Down Expand Up @@ -269,6 +269,10 @@ func (c *internalClient) getConfigValue(key string) interface{} {
return value
}

Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The appendComponent method has a potential race condition. It modifies the c.components slice without synchronization, which could lead to data races if called concurrently with Close or with other appendComponent calls. While in the current implementation all appendComponent calls happen during initialization before components start, this creates a fragile design. Consider adding mutex protection or documenting that this method must only be called during initialization.

Suggested change
// appendComponent adds a component to the internal list.
// It is not safe for concurrent use and must only be called during client
// initialization, before any components are started or Close may be invoked.

Copilot uses AI. Check for mistakes.
func (c *internalClient) appendComponent(comp component.Stoppable) {
c.components = append(c.components, comp)
}

// AddChangeListener 增加变更监控
func (c *internalClient) AddChangeListener(listener storage.ChangeListener) {
c.cache.AddChangeListener(listener)
Expand All @@ -289,7 +293,9 @@ func (c *internalClient) UseEventDispatch() {
c.AddChangeListener(storage.UseEventDispatch())
}

// Close 停止轮询
// Close stop components
func (c *internalClient) Close() {
c.configComponent.Stop()
for _, comp := range c.components {
comp.Stop()
}
}
Comment on lines 297 to 301
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Close method has a potential race condition. Multiple goroutines could call Close concurrently, and even though each component's Stop method is protected by sync.Once, the iteration over c.components is not thread-safe. If one goroutine is iterating through the components slice while another goroutine calls appendComponent, this could lead to undefined behavior. Consider adding a mutex to protect access to c.components or ensuring Close is only called once using a sync.Once pattern at the client level.

Copilot uses AI. Check for mistakes.
32 changes: 31 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
"time"

"github.com/agiledragon/gomonkey/v2"
"github.com/apolloconfig/agollo/v4/component"
_ "github.com/apolloconfig/agollo/v4/env/file/json"

"github.com/apolloconfig/agollo/v4/agcache/memory"
"github.com/apolloconfig/agollo/v4/component/remote"
"github.com/apolloconfig/agollo/v4/env/config"
"github.com/apolloconfig/agollo/v4/env/server"

_ "github.com/apolloconfig/agollo/v4/env/file/json"
"github.com/apolloconfig/agollo/v4/extension"
"github.com/apolloconfig/agollo/v4/storage"
. "github.com/tevid/gohamcrest"
Expand Down Expand Up @@ -413,3 +414,32 @@ func TestGetConfigAndInitValNil(t *testing.T) {
Assert(t, cf, NilVal())
Assert(t, client.cache.GetConfig("testNotFound"), NilVal())
}

type testComponent struct {
status int // 0 start 1 stop
}

func (t *testComponent) Start() {
t.status = 0
}
func (t *testComponent) Stop() {
t.status = 1
}

func Test_internalClient_Close(t *testing.T) {
c := &internalClient{}
tc := &testComponent{}
go component.StartRefreshConfig(tc)
c.appendComponent(tc)

tc2 := &testComponent{}
go component.StartRefreshConfig(tc2)
c.appendComponent(tc2)
time.Sleep(300 * time.Millisecond) // wait goroutine
Assert(t, tc.status, Equal(0))
Assert(t, tc2.status, Equal(0))
c.Close()
c.Close() // duplicate Close
Assert(t, tc.status, Equal(1))
Assert(t, tc2.status, Equal(1))
}
15 changes: 13 additions & 2 deletions component/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@

package component

//AbsComponent 定时组件
import "github.com/apolloconfig/agollo/v4/component/log"

type Stoppable interface {
Stop()
}

// AbsComponent 定时组件
type AbsComponent interface {
Start()
}

//StartRefreshConfig 开始定时服务
// StartRefreshConfig 开始定时服务
func StartRefreshConfig(component AbsComponent) {
defer func() {
if err := recover(); err != nil {
log.Errorf("StartRefreshConfig component start failed, recover panic: %v", err)
}
}()
component.Start()
}
7 changes: 5 additions & 2 deletions component/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,13 @@ func TestSelectOnlyOneHost(t *testing.T) {
type testComponent struct {
}

//Start 启动同步服务器列表
// Start 启动同步服务器列表
func (s *testComponent) Start() {
}

func (s *testComponent) Stop() {
}

func TestStartRefreshConfig(t *testing.T) {
StartRefreshConfig(&testComponent{})
}
Expand All @@ -137,7 +140,7 @@ func trySyncServerIPList(appConfigFunc func() config.AppConfig) {
SyncServerIPListSuccessCallBack([]byte(servicesConfigResponseStr), http.CallBack{AppConfigFunc: appConfigFunc})
}

//SyncServerIPListSuccessCallBack 同步服务器列表成功后的回调
// SyncServerIPListSuccessCallBack 同步服务器列表成功后的回调
func SyncServerIPListSuccessCallBack(responseBody []byte, callback http.CallBack) (o interface{}, err error) {
log.Debug("get all server info:", string(responseBody))

Expand Down
43 changes: 24 additions & 19 deletions component/notify/componet_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package notify

import (
"sync"
"time"

"github.com/apolloconfig/agollo/v4/component/log"
"github.com/apolloconfig/agollo/v4/component/remote"
"github.com/apolloconfig/agollo/v4/storage"

Expand All @@ -30,33 +32,33 @@ const (
longPollInterval = 2 * time.Second //2s
)

//ConfigComponent 配置组件
// ConfigComponent 配置组件
type ConfigComponent struct {
appConfigFunc func() config.AppConfig
cache *storage.Cache
stopCh chan interface{}
stopCh chan struct{}
stopOnce sync.Once
}

// SetAppConfig nolint
func (c *ConfigComponent) SetAppConfig(appConfigFunc func() config.AppConfig) {
c.appConfigFunc = appConfigFunc
}

// SetCache nolint
func (c *ConfigComponent) SetCache(cache *storage.Cache) {
c.cache = cache
func NewConfigComponent(appConfigFunc func() config.AppConfig, cache *storage.Cache) *ConfigComponent {
return &ConfigComponent{
appConfigFunc: appConfigFunc,
cache: cache,
stopCh: make(chan struct{}),
}
}

//Start 启动配置组件定时器
// Start 启动配置组件定时器
func (c *ConfigComponent) Start() {
t2 := time.NewTimer(longPollInterval)
defer t2.Stop()
// 兼容直接创建ConfigComponent对象时,stopCh未初始化导致Stop()无法关闭
if c.stopCh == nil {
c.stopCh = make(chan interface{})
c.stopCh = make(chan struct{})
}

t2 := time.NewTimer(longPollInterval)
instance := remote.CreateAsyncApolloConfig()
log.Debug("ConfigComponent started")
//long poll for sync
loop:
for {
select {
case <-t2.C:
Expand All @@ -66,14 +68,17 @@ loop:
}
t2.Reset(longPollInterval)
case <-c.stopCh:
break loop
log.Debug("ConfigComponent stopped")
return
}
}
}

// Stop 停止配置组件定时器
func (c *ConfigComponent) Stop() {
if c.stopCh != nil {
close(c.stopCh)
}
c.stopOnce.Do(func() {
if c.stopCh != nil {
close(c.stopCh)
}
})
}
50 changes: 34 additions & 16 deletions component/notify/componet_notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
_ "github.com/apolloconfig/agollo/v4/env/file/json"
jsonFile "github.com/apolloconfig/agollo/v4/env/file/json"
"github.com/apolloconfig/agollo/v4/extension"
"github.com/apolloconfig/agollo/v4/storage"
. "github.com/tevid/gohamcrest"
)

Expand Down Expand Up @@ -103,20 +104,37 @@ func getTestAppConfig() *config.AppConfig {
return appConfig
}

func TestConfigComponent_SetAppConfig_UpdatesAppConfigCorrectly(t *testing.T) {
expectedAppConfig := getTestAppConfig()
c := &ConfigComponent{}
// set appConfigFunc
c.SetAppConfig(func() config.AppConfig {
return *expectedAppConfig
})

// appConfig should be equal
Assert(t, c.appConfigFunc(), Equal(*expectedAppConfig))

// appConfig value is be replaced
expectedAppConfig.AppID = "test1"
expectedAppConfig.NamespaceName = expectedAppConfig.NamespaceName + config.Comma + "abc"
Assert(t, c.appConfigFunc().AppID, Equal("test1"))
Assert(t, c.appConfigFunc().NamespaceName, Equal("application,abc"))
// TestConfigComponent_Stop 测试重复调用stop()和stopCh为空的场景
func TestConfigComponent_Stop(t *testing.T) {
type fields struct {
appConfigFunc func() config.AppConfig
cache *storage.Cache
stopCh chan struct{}
}
tests := []struct {
name string
fields fields
}{
{
name: "test_component_stop",
fields: fields{
stopCh: make(chan struct{}),
},
},
{
name: "test_component_stop_chan_nil",
fields: fields{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &ConfigComponent{
appConfigFunc: tt.fields.appConfigFunc,
cache: tt.fields.cache,
stopCh: tt.fields.stopCh,
}
c.Stop()
c.Stop()
})
}
}
37 changes: 30 additions & 7 deletions component/serverlist/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package serverlist

import (
"encoding/json"
"sync"
"time"

"github.com/apolloconfig/agollo/v4/env/server"
Expand All @@ -39,32 +40,54 @@ func init() {

}

//InitSyncServerIPList 初始化同步服务器信息列表
// InitSyncServerIPList 初始化同步服务器信息列表
// Deprecated 该方式启动的SyncServerIPListComponent无法关闭,强烈建议使用NewSyncServerIPListComponent
func InitSyncServerIPList(appConfig func() config.AppConfig) {
go component.StartRefreshConfig(&SyncServerIPListComponent{appConfig})
go component.StartRefreshConfig(&SyncServerIPListComponent{appConfig: appConfig})
}

//SyncServerIPListComponent set timer for update ip list
//interval : 20m
func NewSyncServerIPListComponent(appConfig func() config.AppConfig) *SyncServerIPListComponent {
return &SyncServerIPListComponent{
appConfig: appConfig,
stopCh: make(chan struct{}),
}
}

// SyncServerIPListComponent set timer for update ip list
// interval : 20m
type SyncServerIPListComponent struct {
appConfig func() config.AppConfig
stopCh chan struct{}
stopOnce sync.Once
}

//Start 启动同步服务器列表
// Start 启动同步服务器列表
func (s *SyncServerIPListComponent) Start() {
SyncServerIPList(s.appConfig)
log.Debug("syncServerIpList started")
log.Debug("syncServerIpListComponent started")

t2 := time.NewTimer(refreshIPListInterval)
defer t2.Stop()
for {
select {
case <-s.stopCh:
log.Debug("syncServerIpListComponent stopped")
return
case <-t2.C:
SyncServerIPList(s.appConfig)
t2.Reset(refreshIPListInterval)
}
}
}

func (s *SyncServerIPListComponent) Stop() {
s.stopOnce.Do(func() {
if s.stopCh != nil {
close(s.stopCh)
}
})
}

// SyncServerIPList sync ip list from server
// then
// 1.update agcache
Expand Down Expand Up @@ -95,7 +118,7 @@ func SyncServerIPList(appConfigFunc func() config.AppConfig) (map[string]*config
return m, err
}

//SyncServerIPListSuccessCallBack 同步服务器列表成功后的回调
// SyncServerIPListSuccessCallBack 同步服务器列表成功后的回调
func SyncServerIPListSuccessCallBack(responseBody []byte, callback http.CallBack) (o interface{}, err error) {
log.Debug("get all server info:", string(responseBody))

Expand Down
Loading
Loading