diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index 0f6e73087462..23f3c0e97f2f 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -73,6 +73,7 @@ lua { {% if status then %} lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *}; {% end %} + lua_shared_dict nacos 10m; } {% if enabled_stream_plugins["prometheus"] and not enable_http then %} diff --git a/apisix/discovery/nacos/init.lua b/apisix/discovery/nacos/init.lua index 86e9d4125a83..d4fec7977018 100644 --- a/apisix/discovery/nacos/init.lua +++ b/apisix/discovery/nacos/init.lua @@ -20,6 +20,7 @@ local local_conf = require('apisix.core.config_local').local_conf() local http = require('resty.http') local core = require('apisix.core') local ipairs = ipairs +local pairs = pairs local type = type local math = math local math_random = math.random @@ -34,7 +35,11 @@ local str_find = core.string.find local log = core.log local default_weight -local applications +local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name +if not nacos_dict then + error("lua_shared_dict \"nacos\" not configured") +end + local auth_path = 'auth/login' local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName=' local default_namespace_id = "public" @@ -42,20 +47,13 @@ local default_group_name = "DEFAULT_GROUP" local access_key local secret_key -local events -local events_list - local _M = {} -local function discovery_nacos_callback(data, event, source, pid) - applications = data - log.notice("update local variable application, event is: ", event, - "source: ", source, "server pid:", pid, - ", application: ", core.json.encode(applications, true)) +local function get_key(namespace_id, group_name, service_name) + return namespace_id .. '.' .. group_name .. '.' .. service_name end - local function request(request_uri, path, body, method, basic_auth) local url = request_uri .. path log.info('request url:', url) @@ -278,29 +276,24 @@ local function is_grpc(scheme) return false end - +local curr_service_in_use = {} local function fetch_full_registry(premature) if premature then return end - local up_apps = {} local base_uri, username, password = get_base_uri() local token_param, err = get_token_param(base_uri, username, password) if err then log.error('get_token_param error:', err) - if not applications then - applications = up_apps - end return end local infos = get_nacos_services() if #infos == 0 then - applications = up_apps return end - + local service_names = {} for _, service_info in ipairs(infos) do local data, err local namespace_id = service_info.namespace_id @@ -318,29 +311,15 @@ local function fetch_full_registry(premature) goto CONTINUE end - if not up_apps[namespace_id] then - up_apps[namespace_id] = {} - end - - if not up_apps[namespace_id][group_name] then - up_apps[namespace_id][group_name] = {} - end - + local nodes = {} + local key = get_key(namespace_id, group_name, service_info.service_name) + service_names[key] = true for _, host in ipairs(data.hosts) do - local nodes = up_apps[namespace_id] - [group_name][service_info.service_name] - if not nodes then - nodes = {} - up_apps[namespace_id] - [group_name][service_info.service_name] = nodes - end - local node = { host = host.ip, port = host.port, weight = host.weight or default_weight, } - -- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496 if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then node.port = host.metadata.gRPC_port @@ -348,21 +327,19 @@ local function fetch_full_registry(premature) core.table.insert(nodes, node) end - + if #nodes > 0 then + local content = core.json.encode(nodes) + nacos_dict:set(key, content) + end ::CONTINUE:: end - local new_apps_md5sum = ngx.md5(core.json.encode(up_apps)) - local old_apps_md5sum = ngx.md5(core.json.encode(applications)) - if new_apps_md5sum == old_apps_md5sum then - return - end - applications = up_apps - local ok, err = events:post(events_list._source, events_list.updating, - applications) - if not ok then - log.error("post_event failure with ", events_list._source, - ", update application error: ", err) + -- remove services that are not in use anymore + for key, _ in pairs(curr_service_in_use) do + if not service_names[key] then + nacos_dict:delete(key) + end end + curr_service_in_use = service_names end @@ -371,40 +348,18 @@ function _M.nodes(service_name, discovery_args) discovery_args.namespace_id or default_namespace_id local group_name = discovery_args and discovery_args.group_name or default_group_name - - local logged = false - -- maximum waiting time: 5 seconds - local waiting_time = 5 - local step = 0.1 - while not applications and waiting_time > 0 do - if not logged then - log.warn('wait init') - logged = true - end - ngx.sleep(step) - waiting_time = waiting_time - step - end - - if not applications or not applications[namespace_id] - or not applications[namespace_id][group_name] - then + local key = get_key(namespace_id, group_name, service_name) + local value = nacos_dict:get(key) + if not value then + core.log.error("nacos service not found: ", service_name) return nil end - return applications[namespace_id][group_name][service_name] + local nodes = core.json.decode(value) + return nodes end function _M.init_worker() - events = require("apisix.events") - events_list = events:event_list("discovery_nacos_update_application", - "updating") - - if 0 ~= ngx.worker.id() then - events:register(discovery_nacos_callback, events_list._source, - events_list.updating) - return - end - default_weight = local_conf.discovery.nacos.weight log.info('default_weight:', default_weight) local fetch_interval = local_conf.discovery.nacos.fetch_interval @@ -417,7 +372,20 @@ end function _M.dump_data() - return {config = local_conf.discovery.nacos, services = applications or {}} + local keys = nacos_dict:get_keys(0) + local applications = {} + for _, key in ipairs(keys) do + local value = nacos_dict:get(key) + if value then + local nodes = core.json.decode(value) + if nodes then + applications[key] = { + nodes = nodes, + } + end + end + end + return {services = applications or {}} end diff --git a/t/APISIX.pm b/t/APISIX.pm index 69f4b6fb18ac..035a661f921c 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -277,6 +277,7 @@ lua { lua_shared_dict prometheus-metrics 15m; lua_shared_dict standalone-config 10m; lua_shared_dict status-report 1m; + lua_shared_dict nacos 10m; } _EOC_ } diff --git a/t/discovery/nacos.t b/t/discovery/nacos.t index 9af1ee14a814..f2ebee57ea7b 100644 --- a/t/discovery/nacos.t +++ b/t/discovery/nacos.t @@ -65,6 +65,12 @@ discovery: _EOC_ +add_block_preprocessor(sub { + my ($block) = @_; + $block->set_value("timeout", "10"); + +}); + run_tests(); __DATA__ @@ -127,18 +133,43 @@ routes: type: roundrobin #END ---- pipelined_requests eval -[ - "GET /hello", - "GET /hello", -] ---- response_body_like eval -[ - qr/server [1-2]/, - qr/server [1-2]/, -] ---- no_error_log -[error, error] +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local uri = "http://127.0.0.1:" .. ngx.var.server_port + + -- Wait for 2 seconds for APISIX initialization + ngx.sleep(2) + local httpc = http.new() + local valid_responses = 0 + + for i = 1, 2 do + local res, err = httpc:request_uri(uri .. "/hello") + if not res then + ngx.log(ngx.ERR, "Request failed: ", err) + else + -- Clean and validate response + local clean_body = res.body:gsub("%s+$", "") + if clean_body == "server 1" or clean_body == "server 2" then + valid_responses = valid_responses + 1 + else + ngx.log(ngx.ERR, "Invalid response: ", clean_body) + end + end + end + -- Final check + if valid_responses == 2 then + ngx.say("PASS") + else + ngx.say("FAIL: only ", valid_responses, " valid responses") + end + } + } +--- request +GET /t +--- response_body +PASS @@ -245,16 +276,43 @@ discovery: host: - "http://127.0.0.1:8858" fetch_interval: 1 ---- pipelined_requests eval -[ - "GET /hello", - "GET /hello", -] ---- response_body_like eval -[ - qr/server [1-2]/, - qr/server [1-2]/, -] +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local uri = "http://127.0.0.1:" .. ngx.var.server_port + + -- Wait for 2 seconds for APISIX initialization + ngx.sleep(2) + local httpc = http.new() + local valid_responses = 0 + + for i = 1, 2 do + local res, err = httpc:request_uri(uri .. "/hello") + if not res then + ngx.log(ngx.ERR, "Request failed: ", err) + else + -- Clean and validate response + local clean_body = res.body:gsub("%s+$", "") + if clean_body == "server 1" or clean_body == "server 2" then + valid_responses = valid_responses + 1 + else + ngx.log(ngx.ERR, "Invalid response: ", clean_body) + end + end + end + -- Final check + if valid_responses == 2 then + ngx.say("PASS") + else + ngx.say("FAIL: only ", valid_responses, " valid responses") + end + } + } +--- request +GET /t +--- response_body +PASS @@ -393,16 +451,43 @@ discovery: host: - "http://127.0.0.1:8858" fetch_interval: 1 ---- pipelined_requests eval -[ - "GET /hello", - "GET /hello", -] ---- response_body_like eval -[ - qr/server [1-2]/, - qr/server [1-2]/, -] +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local uri = "http://127.0.0.1:" .. ngx.var.server_port + + -- Wait for 2 seconds for APISIX initialization + ngx.sleep(2) + local httpc = http.new() + local valid_responses = 0 + + for i = 1, 2 do + local res, err = httpc:request_uri(uri .. "/hello") + if not res then + ngx.log(ngx.ERR, "Request failed: ", err) + else + -- Clean and validate response + local clean_body = res.body:gsub("%s+$", "") + if clean_body == "server 1" or clean_body == "server 2" then + valid_responses = valid_responses + 1 + else + ngx.log(ngx.ERR, "Invalid response: ", clean_body) + end + end + end + -- Final check + if valid_responses == 2 then + ngx.say("PASS") + else + ngx.say("FAIL: only ", valid_responses, " valid responses") + end + } + } +--- request +GET /t +--- response_body +PASS @@ -541,16 +626,43 @@ discovery: host: - "http://127.0.0.1:8858" fetch_interval: 1 ---- pipelined_requests eval -[ - "GET /hello", - "GET /hello", -] ---- response_body_like eval -[ - qr/server [1-2]/, - qr/server [1-2]/, -] +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local uri = "http://127.0.0.1:" .. ngx.var.server_port + + -- Wait for 2 seconds for APISIX initialization + ngx.sleep(2) + local httpc = http.new() + local valid_responses = 0 + + for i = 1, 2 do + local res, err = httpc:request_uri(uri .. "/hello") + if not res then + ngx.log(ngx.ERR, "Request failed: ", err) + else + -- Clean and validate response + local clean_body = res.body:gsub("%s+$", "") + if clean_body == "server 1" or clean_body == "server 2" then + valid_responses = valid_responses + 1 + else + ngx.log(ngx.ERR, "Invalid response: ", clean_body) + end + end + end + -- Final check + if valid_responses == 2 then + ngx.say("PASS") + else + ngx.say("FAIL: only ", valid_responses, " valid responses") + end + } + } +--- request +GET /t +--- response_body +PASS @@ -737,16 +849,43 @@ discovery: host: - "http://127.0.0.1:8858" fetch_interval: 1 ---- pipelined_requests eval -[ - "GET /hello", - "GET /hello", -] ---- response_body_like eval -[ - qr/server [1-2]/, - qr/server [1-2]/, -] +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local uri = "http://127.0.0.1:" .. ngx.var.server_port + + -- Wait for 2 seconds for APISIX initialization + ngx.sleep(2) + local httpc = http.new() + local valid_responses = 0 + + for i = 1, 2 do + local res, err = httpc:request_uri(uri .. "/hello") + if not res then + ngx.log(ngx.ERR, "Request failed: ", err) + else + -- Clean and validate response + local clean_body = res.body:gsub("%s+$", "") + if clean_body == "server 1" or clean_body == "server 2" then + valid_responses = valid_responses + 1 + else + ngx.log(ngx.ERR, "Invalid response: ", clean_body) + end + end + end + -- Final check + if valid_responses == 2 then + ngx.say("PASS") + else + ngx.say("FAIL: only ", valid_responses, " valid responses") + end + } + } +--- request +GET /t +--- response_body +PASS diff --git a/t/discovery/nacos2.t b/t/discovery/nacos2.t index e7dc8f97318a..e45d22c43a75 100644 --- a/t/discovery/nacos2.t +++ b/t/discovery/nacos2.t @@ -148,18 +148,43 @@ routes: type: roundrobin #END ---- pipelined_requests eval -[ - "GET /hello", - "GET /hello", -] ---- response_body_like eval -[ - qr/server [1-2]/, - qr/server [1-2]/, -] ---- no_error_log -[error, error] +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local uri = "http://127.0.0.1:" .. ngx.var.server_port + + -- Wait for 2 seconds for APISIX initialization + ngx.sleep(2) + local httpc = http.new() + local valid_responses = 0 + + for i = 1, 2 do + local res, err = httpc:request_uri(uri .. "/hello") + if not res then + ngx.log(ngx.ERR, "Request failed: ", err) + else + -- Clean and validate response + local clean_body = res.body:gsub("%s+$", "") + if clean_body == "server 1" or clean_body == "server 2" then + valid_responses = valid_responses + 1 + else + ngx.log(ngx.ERR, "Invalid response: ", clean_body) + end + end + end + -- Final check + if valid_responses == 2 then + ngx.say("PASS") + else + ngx.say("FAIL: only ", valid_responses, " valid responses") + end + } + } +--- request +GET /t +--- response_body +PASS @@ -308,8 +333,8 @@ discovery: local body = json_decode(res.body) local services = body.services - local service = services["public"]["DEFAULT_GROUP"]["APISIX-NACOS"] - local number = table.getn(service) + local service = services["public.DEFAULT_GROUP.APISIX-NACOS"] + local number = table.getn(service.nodes) ngx.say(number) } }