Skip to content

feat: replace events library with shdict #12353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 26, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ lua {
{% if status then %}
lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
{% end %}
lua_shared_dict nacos 10m;
}

{% if enabled_stream_plugins["prometheus"] and not enable_http then %}
Expand Down
122 changes: 47 additions & 75 deletions apisix/discovery/nacos/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,25 @@ local str_find = core.string.find
local log = core.log

local default_weight
local applications
local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name
if not nacos_dict then
error("lua_shared_dict \"nacos\" not configured")
end

local auth_path = 'auth/login'
local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
local default_namespace_id = "public"
local default_group_name = "DEFAULT_GROUP"
local access_key
local secret_key

local events
local events_list


local _M = {}

local function discovery_nacos_callback(data, event, source, pid)
applications = data
log.notice("update local variable application, event is: ", event,
"source: ", source, "server pid:", pid,
", application: ", core.json.encode(applications, true))
local function get_key(namespace_id, group_name, service_name)
return namespace_id .. '.' .. group_name .. '.' .. service_name
end


local function request(request_uri, path, body, method, basic_auth)
local url = request_uri .. path
log.info('request url:', url)
Expand Down Expand Up @@ -278,29 +275,24 @@ local function is_grpc(scheme)
return false
end


local curr_service_in_use = {}
local function fetch_full_registry(premature)
if premature then
return
end

local up_apps = {}
local base_uri, username, password = get_base_uri()
local token_param, err = get_token_param(base_uri, username, password)
if err then
log.error('get_token_param error:', err)
if not applications then
applications = up_apps
end
return
end

local infos = get_nacos_services()
if #infos == 0 then
applications = up_apps
return
end

local service_names = {}
for _, service_info in ipairs(infos) do
local data, err
local namespace_id = service_info.namespace_id
Expand All @@ -318,50 +310,35 @@ local function fetch_full_registry(premature)
goto CONTINUE
end

if not up_apps[namespace_id] then
up_apps[namespace_id] = {}
end

if not up_apps[namespace_id][group_name] then
up_apps[namespace_id][group_name] = {}
end

local nodes = {}
local key = get_key(namespace_id, group_name, service_info.service_name)
service_names[key] = true
for _, host in ipairs(data.hosts) do
local nodes = up_apps[namespace_id]
[group_name][service_info.service_name]
if not nodes then
nodes = {}
up_apps[namespace_id]
[group_name][service_info.service_name] = nodes
end

local node = {
host = host.ip,
port = host.port,
weight = host.weight or default_weight,
}

-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
node.port = host.metadata.gRPC_port
end

core.table.insert(nodes, node)
end

if #nodes > 0 then
local content = core.json.encode(nodes)
nacos_dict:set(key, content)
end
::CONTINUE::
end
local new_apps_md5sum = ngx.md5(core.json.encode(up_apps))
local old_apps_md5sum = ngx.md5(core.json.encode(applications))
if new_apps_md5sum == old_apps_md5sum then
return
end
applications = up_apps
local ok, err = events:post(events_list._source, events_list.updating,
applications)
if not ok then
log.error("post_event failure with ", events_list._source,
", update application error: ", err)
local old_curr_service_in_use = curr_service_in_use
curr_service_in_use = service_names
-- remove services that are not in use anymore
for key, _ in pairs(old_curr_service_in_use) do
if not service_names[key] then
nacos_dict:delete(key)
end
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
local old_curr_service_in_use = curr_service_in_use
curr_service_in_use = service_names
-- remove services that are not in use anymore
for key, _ in pairs(old_curr_service_in_use) do
if not service_names[key] then
nacos_dict:delete(key)
end
end
-- remove services that are not in use anymore
for key, _ in pairs(curr_service_in_use) do
if not service_names[key] then
nacos_dict:delete(key)
end
end
curr_service_in_use = service_names

end

Expand All @@ -371,40 +348,18 @@ function _M.nodes(service_name, discovery_args)
discovery_args.namespace_id or default_namespace_id
local group_name = discovery_args
and discovery_args.group_name or default_group_name

local logged = false
-- maximum waiting time: 5 seconds
local waiting_time = 5
local step = 0.1
while not applications and waiting_time > 0 do
if not logged then
log.warn('wait init')
logged = true
end
ngx.sleep(step)
waiting_time = waiting_time - step
end

if not applications or not applications[namespace_id]
or not applications[namespace_id][group_name]
then
local key = get_key(namespace_id, group_name, service_name)
local value = nacos_dict:get(key)
if not value then
core.log.error("nacos service not found: ", service_name)
return nil
end
return applications[namespace_id][group_name][service_name]
local nodes = core.json.decode(value)
return nodes
end


function _M.init_worker()
events = require("apisix.events")
events_list = events:event_list("discovery_nacos_update_application",
"updating")

if 0 ~= ngx.worker.id() then
events:register(discovery_nacos_callback, events_list._source,
events_list.updating)
return
end

default_weight = local_conf.discovery.nacos.weight
log.info('default_weight:', default_weight)
local fetch_interval = local_conf.discovery.nacos.fetch_interval
Expand All @@ -417,7 +372,24 @@ end


function _M.dump_data()
return {config = local_conf.discovery.nacos, services = applications or {}}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config is removed from here as it's a security risk.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.

local keys = nacos_dict:get_keys(0)
local applications = {}
for _, key in ipairs(keys) do
local value = nacos_dict:get(key)
if value then
local nodes = core.json.decode(value)
if nodes and #nodes > 0 then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dump_data is for debug purpose, if value in shared dict for this key is empty , we should return the real empty value for debugging instead hide it.

applications[key] = {
nodes = nodes,
}
end
else
applications[key] = {
nodes = {},
}
end
end
return {services = applications or {}}
end


Expand Down
1 change: 1 addition & 0 deletions t/APISIX.pm
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ lua {
lua_shared_dict prometheus-metrics 15m;
lua_shared_dict standalone-config 10m;
lua_shared_dict status-report 1m;
lua_shared_dict nacos 10m;
}
_EOC_
}
Expand Down
Loading
Loading