Skip to content

Commit e2e6829

Browse files
authored
feat: replace events library with shdict (#12353)
1 parent fe8a4be commit e2e6829

File tree

5 files changed

+275
-141
lines changed

5 files changed

+275
-141
lines changed

apisix/cli/ngx_tpl.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ lua {
7373
{% if status then %}
7474
lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
7575
{% end %}
76+
lua_shared_dict nacos 10m;
7677
}
7778
7879
{% if enabled_stream_plugins["prometheus"] and not enable_http then %}

apisix/discovery/nacos/init.lua

Lines changed: 43 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ local local_conf = require('apisix.core.config_local').local_conf()
2020
local http = require('resty.http')
2121
local core = require('apisix.core')
2222
local ipairs = ipairs
23+
local pairs = pairs
2324
local type = type
2425
local math = math
2526
local math_random = math.random
@@ -34,28 +35,25 @@ local str_find = core.string.find
3435
local log = core.log
3536

3637
local default_weight
37-
local applications
38+
local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name
39+
if not nacos_dict then
40+
error("lua_shared_dict \"nacos\" not configured")
41+
end
42+
3843
local auth_path = 'auth/login'
3944
local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
4045
local default_namespace_id = "public"
4146
local default_group_name = "DEFAULT_GROUP"
4247
local access_key
4348
local secret_key
4449

45-
local events
46-
local events_list
47-
4850

4951
local _M = {}
5052

51-
local function discovery_nacos_callback(data, event, source, pid)
52-
applications = data
53-
log.notice("update local variable application, event is: ", event,
54-
"source: ", source, "server pid:", pid,
55-
", application: ", core.json.encode(applications, true))
53+
local function get_key(namespace_id, group_name, service_name)
54+
return namespace_id .. '.' .. group_name .. '.' .. service_name
5655
end
5756

58-
5957
local function request(request_uri, path, body, method, basic_auth)
6058
local url = request_uri .. path
6159
log.info('request url:', url)
@@ -278,29 +276,24 @@ local function is_grpc(scheme)
278276
return false
279277
end
280278

281-
279+
local curr_service_in_use = {}
282280
local function fetch_full_registry(premature)
283281
if premature then
284282
return
285283
end
286284

287-
local up_apps = {}
288285
local base_uri, username, password = get_base_uri()
289286
local token_param, err = get_token_param(base_uri, username, password)
290287
if err then
291288
log.error('get_token_param error:', err)
292-
if not applications then
293-
applications = up_apps
294-
end
295289
return
296290
end
297291

298292
local infos = get_nacos_services()
299293
if #infos == 0 then
300-
applications = up_apps
301294
return
302295
end
303-
296+
local service_names = {}
304297
for _, service_info in ipairs(infos) do
305298
local data, err
306299
local namespace_id = service_info.namespace_id
@@ -318,51 +311,35 @@ local function fetch_full_registry(premature)
318311
goto CONTINUE
319312
end
320313

321-
if not up_apps[namespace_id] then
322-
up_apps[namespace_id] = {}
323-
end
324-
325-
if not up_apps[namespace_id][group_name] then
326-
up_apps[namespace_id][group_name] = {}
327-
end
328-
314+
local nodes = {}
315+
local key = get_key(namespace_id, group_name, service_info.service_name)
316+
service_names[key] = true
329317
for _, host in ipairs(data.hosts) do
330-
local nodes = up_apps[namespace_id]
331-
[group_name][service_info.service_name]
332-
if not nodes then
333-
nodes = {}
334-
up_apps[namespace_id]
335-
[group_name][service_info.service_name] = nodes
336-
end
337-
338318
local node = {
339319
host = host.ip,
340320
port = host.port,
341321
weight = host.weight or default_weight,
342322
}
343-
344323
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
345324
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
346325
node.port = host.metadata.gRPC_port
347326
end
348327

349328
core.table.insert(nodes, node)
350329
end
351-
330+
if #nodes > 0 then
331+
local content = core.json.encode(nodes)
332+
nacos_dict:set(key, content)
333+
end
352334
::CONTINUE::
353335
end
354-
local new_apps_md5sum = ngx.md5(core.json.encode(up_apps))
355-
local old_apps_md5sum = ngx.md5(core.json.encode(applications))
356-
if new_apps_md5sum == old_apps_md5sum then
357-
return
358-
end
359-
applications = up_apps
360-
local ok, err = events:post(events_list._source, events_list.updating,
361-
applications)
362-
if not ok then
363-
log.error("post_event failure with ", events_list._source,
364-
", update application error: ", err)
336+
-- remove services that are not in use anymore
337+
for key, _ in pairs(curr_service_in_use) do
338+
if not service_names[key] then
339+
nacos_dict:delete(key)
340+
end
365341
end
342+
curr_service_in_use = service_names
366343
end
367344

368345

@@ -371,40 +348,18 @@ function _M.nodes(service_name, discovery_args)
371348
discovery_args.namespace_id or default_namespace_id
372349
local group_name = discovery_args
373350
and discovery_args.group_name or default_group_name
374-
375-
local logged = false
376-
-- maximum waiting time: 5 seconds
377-
local waiting_time = 5
378-
local step = 0.1
379-
while not applications and waiting_time > 0 do
380-
if not logged then
381-
log.warn('wait init')
382-
logged = true
383-
end
384-
ngx.sleep(step)
385-
waiting_time = waiting_time - step
386-
end
387-
388-
if not applications or not applications[namespace_id]
389-
or not applications[namespace_id][group_name]
390-
then
351+
local key = get_key(namespace_id, group_name, service_name)
352+
local value = nacos_dict:get(key)
353+
if not value then
354+
core.log.error("nacos service not found: ", service_name)
391355
return nil
392356
end
393-
return applications[namespace_id][group_name][service_name]
357+
local nodes = core.json.decode(value)
358+
return nodes
394359
end
395360

396361

397362
function _M.init_worker()
398-
events = require("apisix.events")
399-
events_list = events:event_list("discovery_nacos_update_application",
400-
"updating")
401-
402-
if 0 ~= ngx.worker.id() then
403-
events:register(discovery_nacos_callback, events_list._source,
404-
events_list.updating)
405-
return
406-
end
407-
408363
default_weight = local_conf.discovery.nacos.weight
409364
log.info('default_weight:', default_weight)
410365
local fetch_interval = local_conf.discovery.nacos.fetch_interval
@@ -417,7 +372,20 @@ end
417372

418373

419374
function _M.dump_data()
420-
return {config = local_conf.discovery.nacos, services = applications or {}}
375+
local keys = nacos_dict:get_keys(0)
376+
local applications = {}
377+
for _, key in ipairs(keys) do
378+
local value = nacos_dict:get(key)
379+
if value then
380+
local nodes = core.json.decode(value)
381+
if nodes then
382+
applications[key] = {
383+
nodes = nodes,
384+
}
385+
end
386+
end
387+
end
388+
return {services = applications or {}}
421389
end
422390

423391

t/APISIX.pm

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ lua {
292292
lua_shared_dict prometheus-metrics 15m;
293293
lua_shared_dict standalone-config 10m;
294294
lua_shared_dict status-report 1m;
295+
lua_shared_dict nacos 10m;
295296
}
296297
_EOC_
297298
}

0 commit comments

Comments
 (0)