This article was originally published in the company in September 2020, and now it is sorted out and added some comments for public release.
When I first joined the company's Infrastructure team, a confused task I received was to learn from Lua and OpenResty At that time, I received the PDF files of two books and asked to understand and learn as soon as possible Kong , and has R & D capability.
At that time, I didn't have much contact with the open source community. My ability only stayed in Git Clone. It took me about two weeks. After learning Lua's basic grammar, I began to read the source code of the Kong project, found several entry points and sorted out the source code analysis documents. It should also be this achievement that made the team leader agree with me. After that, I was responsible for the development of the company's API Gateway, And related landing work.
In this paper, the startup process, plug-in mechanism, caching mechanism and request life cycle of Kong are described in detail. However, what is still lacking is the proxy forwarding function, such as load balancing, health check and service discovery. However, at that time, I was still an ignorant newcomer, I'll forgive myself first [the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-ybygjfst-1641957618915)( https://twemoji.maxcdn.com/v/13.1.0/svg/1f43e.svg )].
Following this article, I also wrote the source code analysis of Kong Ingress Controller, followed by the source code analysis of apisik.
1. General
This article is aimed at Kong Version 2.1 (Stable).
The Commits I read and annotated in Chinese can be seen here:
https://github.com/mayocream/kong/commits?author=mayocream
Execution phase of Kong (OpenResty):
Kong's plug-in mechanism is also based on the life cycle of OpenResty, but it is a little encapsulated in the upper layer.
Kong's database association relationship:
Although Kong nominates himself as a Cloud Native project 1 , also on the CNCF list Panorama However, it also relies on the traditional database PostgreSQL and defines many function s, which is much weaker than the Etcd stored behind APIs IX. Compared with Etcd, which can establish a long HTTP connection, when the Watch data changes, Kong can only rely on regular polling to update the status from the database. The high availability of the database is much more complex than Etcd cluster.
2. Configuration file
Kong will resolve in the startup phase kong/templates Under the directory lua template file, inject environment variables and Kong Conf overrides the configuration and generates the Nginx startup configuration file Nginx conf.
The structure is as follows:
pid pids/nginx.pid; error_log logs/error.log notice; # injected nginx_main_* directives env SKYWALKING_URL; events { # injected nginx_events_* directives multi_accept on; worker_connections 16384; } http { lua_package_path './?.lua;./?/init.lua;;;;'; lua_package_cpath ';;;'; lua_shared_dict kong 5m; lua_shared_dict kong_locks 8m; ... # injected nginx_http_* directives client_body_buffer_size 8k; init_by_lua_block { Kong = require 'kong' Kong.init() } init_worker_by_lua_block { Kong.init_worker() } upstream kong_upstream { server 0.0.0.1; # injected nginx_upstream_* directives balancer_by_lua_block { Kong.balancer() } } # Kong Proxy server { server_name kong; ... } # Kong Admin API server { server_name kong_admin; ... } }
Kong defines nginx_ MAIN_ 30. Environment variables such as these will be loaded into nginx. Net during the parsing configuration phase The specified location of conf can avoid directly modifying the template file.
For example:
# Define the env variable in the main block $ export NGINX_MAIN_ENV SKYWALKING_URL; # Create a new lua shared dict $ export NGINX_HTTP_Lua_SHARED_DICT tracing_buffer 128m;
Kong Official configuration document The meaning of each parameter has been explained in great detail.
It is added here that we usually need to define multiple shared dicts, and the configuration writing method needs to be changed to this ugly form:
nginx_http_lua_shared_dict = cache_buffer_one 128m; lua_shared_dict cache_buffer_two 128m
3. Initialization
3.1. database initialized
Kong. Initialize database related in init() method:
-- Database connection related local db = assert(DB.new(config)) assert(db:init_connector()) kong.db = db
DB. Schema. Is called in turn in the new () method new(),Entity.new(),DAO. The new () method is described one by one below.
3.1.1. Schema
Kong's Schema data structure is located under db/schema/entities, just routes Lua as an example:
local typedefs = require "kong.db.schema.typedefs" return { name = "routes", primary_key = { "id" }, endpoint_key = "name", workspaceable = true, subschema_key = "protocols", fields = { { id = typedefs.uuid, }, { created_at = typedefs.auto_timestamp_s }, { updated_at = typedefs.auto_timestamp_s }, { name = typedefs.name }, { protocols = { type = "set", len_min = 1, required = true, elements = typedefs.protocol, mutually_exclusive_subsets = { { "http", "https" }, { "tcp", "tls" }, { "grpc", "grpcs" }, }, default = { "http", "https" }, -- TODO: different default depending on service's scheme }, }, { methods = typedefs.methods }, { hosts = typedefs.hosts }, { paths = typedefs.paths }, { headers = typedefs.headers }, { https_redirect_status_code = { type = "integer", one_of = { 426, 301, 302, 307, 308 }, default = 426, required = true, }, }, { regex_priority = { type = "integer", default = 0 }, }, { strip_path = { type = "boolean", default = true }, }, { path_handling = { type = "string", default = "v0", one_of = { "v0", "v1" }, }, }, { preserve_host = { type = "boolean", default = false }, }, { snis = { type = "set", elements = typedefs.sni }, }, { sources = typedefs.sources }, { destinations = typedefs.destinations }, { tags = typedefs.tags }, { service = { type = "foreign", reference = "services" }, }, }, entity_checks = { { conditional = { if_field = "protocols", if_match = { elements = { type = "string", not_one_of = { "grpcs", "https", "tls" }}}, then_field = "snis", then_match = { len_eq = 0 }, then_err = "'snis' can only be set when 'protocols' is 'grpcs', 'https' or 'tls'", }}, } }
primary_key is the primary key in the database and also the cache_ Default cache when key is not defined_ key.
In the case of type=foreign, the entity will be loaded as a subschema when it is loaded.
Unlike other entities, plug-ins have specific caches_ key.
name = "plugins", primary_key = { "id" }, cache_key = { "name", "route", "service", "consumer" },
Call Entity. in Cache related operations cache_ Key() get.
local cache_key = dao:cache_key(entity) local ok, err = cache:safe_set(cache_key, entity)
Generate cache specifically_ The key method returns a string as the cache key.
function DAO:cache_key(key, arg2, arg3, arg4, arg5, ws_id) if self.schema.workspaceable then ws_id = ws_id or workspaces.get_workspace_id() end -- Fast path: passing the cache_key/primary_key entries in -- order as arguments, this produces the same result as -- the generic code below, but building the cache key -- becomes a single string.format operation if type(key) == "string" then return fmt("%s:%s:%s:%s:%s:%s:%s", self.schema.name, key == nil and "" or key, arg2 == nil and "" or arg2, arg3 == nil and "" or arg3, arg4 == nil and "" or arg4, arg5 == nil and "" or arg5, ws_id == nil and "" or ws_id) end -- Generic path: build the cache key from the fields -- listed in cache_key or primary_key if type(key) ~= "table" then error("key must be a string or an entity table", 2) end if key.ws_id then ws_id = key.ws_id end local values = new_tab(7, 0) values[1] = self.schema.name local source = self.schema.cache_key or self.schema.primary_key local i = 2 for _, name in ipairs(source) do local field = self.schema.fields[name] local value = key[name] if value == null or value == nil then value = "" elseif field.type == "foreign" then -- FIXME extract foreign key, do not assume `id` value = value.id end values[i] = tostring(value) i = i + 1 end for n = i, 6 do values[n] = "" end values[7] = ws_id or "" return concat(values, ":") end
schema/init. The basic methods of schema related operations are defined in Lua:
-- each_field() For traversal schema of fields -- yes schema The most frequent related operations function Schema:each_field(values) local i = 1 local subschema if values then subschema = get_subschema(self, values) end return function() local item = self.fields[i] if not item then return nil end local key = next(item) local field = resolve_field(self, key, item[key], subschema) i = i + 1 return key, field end end
Schema. Set by tuple in new() method__ index allows the structure to inherit a series of operation methods defined under the schema.
function Schema.new(definition, is_subschema) if not definition then return nil, validation_errors.SCHEMA_NO_DEFINITION end if not definition.fields then return nil, validation_errors.SCHEMA_NO_FIELDS end local self = copy(definition) -- inherit Schema A series of operation methods defined under setmetatable(self, Schema) -- entity Cached cache_key, -- If this field is not available, it is used by default schema Defined -- primary_key As cache_key -- cache_key It's an array, -- It's just stored separately if self.cache_key then self.cache_key_set = {} for _, name in ipairs(self.cache_key) do self.cache_key_set[name] = true end end -- By tuple __index Method call Schema:each_field() method -- ergodic schema of fields table for key, field in self:each_field() do -- Also give access to fields by name self.fields[key] = field if field.type == "record" and field.fields then allow_record_fields_by_name(field) end -- If there are foreign keys -- The associated with the foreign key is loaded schema come in if field.type == "foreign" then local err field.schema, err = get_foreign_schema_for_field(field) if not field.schema then return nil, err end if not is_subschema then -- Store the inverse relation for implementing constraints local constraints = assert(_cache[field.reference]).constraints table.insert(constraints, { schema = self, field_name = key, on_delete = field.on_delete, }) end end end if self.workspaceable and self.name then if not _workspaceable[self.name] then _workspaceable[self.name] = true table.insert(_workspaceable, { schema = self }) end end if self.name then -- do not reset the constraints list if a schema in reloaded if not _cache[self.name] then _cache[self.name] = { constraints = {}, } end -- but always update the schema object in cache _cache[self.name].schema = self end return self end
Tuples for subordinate inheritance, a false shot.
local Schema = {} Schema.__index = Schema
3.1.2. Entity
Entity simply encapsulates the Schema.
-- definition yes schema structural morphology function Entity.new(definition) -- initialization Schema object local self, err = Schema.new(definition) if not self then return nil, err end -- ergodic schema fields for name, field in self:each_field() do if field.nilable then return nil, entity_errors.NO_NILABLE:format(name) end if field.abstract then goto continue end if field.type == "map" then if field.keys.type ~= "string" then return nil, entity_errors.MAP_KEY_STRINGS_ONLY:format(name) end elseif field.type == "record" then make_records_required(field) elseif field.type == "function" then return nil, entity_errors.NO_FUNCTIONS:format(name) end ::continue:: end self.new_subschema = Entity.new_subschema return self end
The Entity object is then loaded into dB In the new() function:
local schemas = {} do -- load schemas -- core entities are for now the only source of schemas. -- TODO: support schemas from plugins entities as well. -- Load core entity,Why the core entity -- Because there are plugin Custom entity -- these entity yes Kong Own for _, entity_name in ipairs(constants.CORE_ENTITIES) do -- load schema(Data structure) local entity_schema = require("kong.db.schema.entities." .. entity_name) -- validate core entities schema via metaschema local ok, err_t = MetaSchema:validate(entity_schema) if not ok then return nil, fmt("schema of entity '%s' is invalid: %s", entity_name, tostring(errors:schema_violation(err_t))) end -- load entity object local entity, err = Entity.new(entity_schema) if not entity then return nil, fmt("schema of entity '%s' is invalid: %s", entity_name, err) end schemas[entity_name] = entity -- load core entities subschemas local subschemas ok, subschemas = utils.load_module_if_exists("kong.db.schema.entities." .. entity_name .. "_subschemas") if ok then for name, subschema in pairs(subschemas) do local ok, err = entity:new_subschema(name, subschema) if not ok then return nil, ("error initializing schema for %s: %s"):format(entity_name, err) end end end end end
3.1.3. DAO
db/dao/init.lua defines a series of methods for database operations, such as:
function DAO:select(primary_key, options) function DAO:page(size, offset, options) function DAO:each(size, options) function DAO:insert(entity, options) function DAO:update(primary_key, entity, options) function DAO:delete(primary_key, options) ...
DAO.new() will create a table containing db connection information and entity.
-- schema Parameter is Entity object -- DB Structure: local self = { -- daos = daos, -- each of those has the connector singleton -- strategies = strategies, -- connector = connector, -- strategy = strategy, -- errors = errors, -- infos = connector:infos(), -- kong_config = kong_config, -- } function _M.new(db, schema, strategy, errors) local fk_methods = generate_foreign_key_methods(schema) -- inherit DAO Basic method local super = setmetatable(fk_methods, DAO) local self = { db = db, schema = schema, strategy = strategy, errors = errors, pagination = utils.shallow_copy(defaults.pagination), super = super, } if schema.dao then -- Plug in custom dao local custom_dao = require(schema.dao) for name, method in pairs(custom_dao) do self[name] = method end end return setmetatable(self, { __index = super }) end
In DB \ init Load all DAO objects in Lua.
do -- load DAOs for _, schema in pairs(schemas) do local strategy = strategies[schema.name] if not strategy then return nil, fmt("no strategy found for schema '%s'", schema.name) end -- Store daos daos[schema.name] = DAO.new(self, schema, strategy, errors) end end
Like the above structure, DB Set tuple for table at the end of new()__ index method.
-- Set tuple __index method -- Access non-existent objects first -- DB.xxx Revisit DB.daos.xxx return setmetatable(self, DB) local DB = {} DB.__index = function(self, k) -- rawget Do not call tuples for __index Method to directly obtain the original data return DB[k] or rawget(self, "daos")[k] end
The database method is called elsewhere in Kong, and the operator is Kong db. services:each_ Fields (), that is, Daos. Is actually called services,entity:each_fields() (actually Schema:each_fields()).
There is also an encapsulated database operation layer under DAO, such as the method of generating SQL statements by postgresql, which will not be repeated here.
The encapsulation of DAO layer by layer shows that I have a real headache. The IDE has not yet provided an intelligent prompt to lua, and the hair can't be displayed by pressing Ctrl.
3.2. Cache build
init_ by_ In the Lua stage, the Master process is initialized to parse the configuration file, connect to the database, empty the shared memory, and build the routing cache.
reset_ kong_ Clean up shared memory in SHM code block.
local shms = { "kong", "kong_locks", "kong_healthchecks", "kong_process_events", "kong_cluster_events", "kong_rate_limiting_counters", "kong_core_db_cache" .. suffix, "kong_core_db_cache_miss" .. suffix, "kong_db_cache" .. suffix, "kong_db_cache_miss" .. suffix, "kong_clustering", } for _, shm in ipairs(shms) do local dict = ngx.shared[shm] -- Clear shared memory if dict then dict:flush_all() dict:flush_expired(0) end end
3.2.1. Route cache
else -- DB pattern local default_ws = db.workspaces:select_by_name("default") kong.default_workspace = default_ws and default_ws.id local ok, err = runloop.build_plugins_iterator("init") if not ok then error("error building initial plugins: " .. tostring(err)) end -- Initialize route -- Build routing cache assert(runloop.build_router("init")) end db:close() end
In DB mode, the last step will call runloop build_ Router ("init") builds the route cache.
In the process of building the Route Cache, judge whether Kong has initialized the Cache component. If the initialization of the Cache is not completed in the init stage, create a Lua table to Cache the Route information. build_ Services_ init_ The Cache () method loads all Services into the table in pages. For the extracted Services, judge whether the currently used Nginx mode (http/stream) corresponds to the protocol specified by the Route. If so, take out the Service object and associate it with the Route. Finally, it is passed to the router The new () method establishes a tree structure and a routing index through an algorithm.
Kong is based on the protocol correspondence supported by Nginx Subsystem:
- http/https -> http
- grpc/grpcs -> http
- tcp/tls -> stream
build_router = function(version) local db = kong.db -- table Store all route-service data local routes, i = {}, 0 local err -- The router is initially created on init phase, where kong.core_cache is -- still not ready. For those cases, use a plain Lua table as a cache -- instead -- init stage core_cache Initialization is not complete yet -- Use here table Store local services_init_cache = {} if not kong.core_cache and db.strategy ~= "off" then -- Get all services,Use default paging parameters services_init_cache, err = build_services_init_cache(db) if err then services_init_cache = {} log(WARN, "could not build services init cache: ", err) end end local counter = 0 local page_size = db.routes.pagination.page_size for route, err in db.routes:each(nil, GLOBAL_QUERY_OPTS) do if err then return nil, "could not load routes: " .. err end -- inspect router Has the data changed -- clear through router hash Consistent judgment -- If it has changed, exit the function if db.strategy ~= "off" then if kong.core_cache and counter > 0 and counter % page_size == 0 then local new_version, err = get_router_version() if err then return nil, "failed to retrieve router version: " .. err end if new_version ~= version then return nil, "router was changed while rebuilding it" end end end -- subsystem Whether the protocol of the current route is supported if should_process_route(route) then -- obtain route of service local service, err = get_service_for_route(db, route, services_init_cache) if err then return nil, err end local r = { route = route, service = service, } i = i + 1 -- Store all route-service routes[i] = r end counter = counter + 1 end local new_router, err = Router.new(routes) if not new_router then return nil, "could not create router: " .. err end -- router example router = new_router if version then router_version = version end -- LEGACY - singletons module is deprecated singletons.router = router -- /LEGACY return true end
In the process of building the route Cache, judge whether Kong has initialized the Cache component. If the initialization of the Cache is not completed in the init stage, create a Lua table Cache service.
-- with [service.id] = service -- Structure stored in table in local function build_services_init_cache(db) local services_init_cache = {} for service, err in db.services:each(nil, GLOBAL_QUERY_OPTS) do if err then return nil, err end services_init_cache[service.id] = service end return services_init_cache end
build_services_init_cache(db) method, call DAO:each() function, and use the default paging parameter page_size=1000, get by page, and then return a single record that can be iterated. This is because init_ by_ The Lua stage does not initialize the cache (kong.core_cache), so Lua table is used to store cache data.
function DAO:each(size, options) if size ~= nil then validate_size_type(size) end -- Get paging conditions with default values options = get_pagination_options(self, options) if size ~= nil then local ok, err = validate_size_value(size, options.pagination.max_page_size) if not ok then local err_t = self.errors:invalid_size(err) return nil, tostring(err_t), err_t end else size = options.pagination.page_size end local ok, errors = validate_options_value(self, options) if not ok then local err_t = self.errors:invalid_options(errors) return nil, tostring(err_t), err_t end local pager = function(size, offset, options) return self.strategy:page(size, offset, options) end return iteration.by_row(self, pager, size, options) end
The default paging parameter is dB / strategies / connector In Lua file:
local Connector = { defaults = { -- Default paging condition pagination = { page_size = 1000, max_page_size = 50000, }, }, }
Next, it will traverse all Routes and call should one by one_ process_ Route() and get_service_for_route() method. The former will judge whether the Nginx Subsystem is consistent with the route protocol. The latter will first find the Service in the cache. If it does not exist in the cache, it will get it from the database.
local function get_service_for_route(db, route, services_init_cache) -- route Associated service Foreign key local service_pk = route.service if not service_pk then return nil end -- Find cache table Inside service local id = service_pk.id local service = services_init_cache[id] if service then return service end local err -- kong.core_cache is available, not in init phase if kong.core_cache then -- adopt mlcache query service local cache_key = db.services:cache_key(service_pk.id, nil, nil, nil, nil, route.ws_id) -- query cache Get, if not, call load_service_from_db obtain service, err = kong.core_cache:get(cache_key, TTL_ZERO, load_service_from_db, service_pk) else -- init phase, kong.core_cache not available -- A new service/route has been inserted while the initial route -- was being created, on init (perhaps by a different Kong node). -- Load the service individually and update services_init_cache with it -- Directly query the database for service service, err = load_service_from_db(service_pk) services_init_cache[id] = service end if err then return nil, "error raised while finding service for route (" .. route.id .. "): " .. err elseif not service then return nil, "could not find service for route (" .. route.id .. ")" end -- TODO: this should not be needed as the schema should check it already if SUBSYSTEMS[service.protocol] ~= subsystem then log(WARN, "service with protocol '", service.protocol, "' cannot be used with '", subsystem, "' subsystem") return nil end return service end
load_ Service_ from_ The DB () method simply calls the DAO:select() method to fetch the Service and cache it to the services_ init_ In the cache table, do not update Kong core_ Cache component.
In the processing of database entity objects, only create, update and delete will be broadcast to other workers for synchronization through events, which will be described in detail in the following events section.
Next, pass the {router, service} array into router Router in IUA The new() function handles.
local new_router, err = Router.new(routes) if not new_router then return nil, "could not create router: " .. err end -- binding router example router = new_router
The specific process of building routing index is in router In Lua_ M.new(routes) function, using lua-resty-lrucache Packet cache: sort the routing and Service combinations through algorithms, build indexes, store structures such as {cache_key: {route, service}} in the cache, and return the Router instance.
Construction method of routing index Key:
local cache_key = req_method .. "|" .. req_uri .. "|" .. req_host .. "|" .. ctx.src_ip .. "|" .. ctx.src_port .. "|" .. ctx.dst_ip .. "|" .. ctx.dst_port .. "|" .. ctx.sni
The Router instance is built by the Master process and fork ed to each Worker process for use.
After the Worker completes the construction of shared memory, it registers scheduled tasks and periodically reconstructs the cache.
-- Periodically rebuild routing cache if kong.db.strategy ~= "off" then timer_every(worker_state_update_frequency, function(premature) if premature then return end -- Don't wait for the semaphore (timeout = 0) when updating via the -- timer. -- If the semaphore is locked, that means that the rebuild is -- already ongoing. local ok, err = rebuild_router(ROUTER_ASYNC_OPTS) if not ok then log(ERR, "could not rebuild router via timer: ", err) end end) timer_every(worker_state_update_frequency, function(premature) if premature then return end local ok, err = rebuild_plugins_iterator(PLUGINS_ITERATOR_ASYNC_OPTS) if not ok then log(ERR, "could not rebuild plugins iterator via timer: ", err) end end) end
3.2.2. Entity cache
Here's the first introduction lua-resty-mlcache This cache library is based on lua_shared_dict and lua-resty-lrucache After two layers of caching, the Worker will have its own process level LRU cache. First, it will query in this layer, then use shared memory for caching, and finally provide callback to query from the database lua-resty-lock The library creates a lock that allows only a single process to perform a callback.
mlcache architecture diagram:
┌─────────────────────────────────────────────────┐ │ Nginx │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │worker │ │worker │ │worker │ │ │ L1 │ │ │ │ │ │ │ │ │ Lua cache │ │ Lua cache │ │ Lua cache │ │ │ └───────────┘ └───────────┘ └───────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌───────────────────────────────────────┐ │ │ │ │ │ │ L2 │ lua_shared_dict │ │ │ │ │ │ │ └───────────────────────────────────────┘ │ │ │ mutex │ │ ▼ │ │ ┌──────────────────┐ │ │ │ callback │ │ │ └────────┬─────────┘ │ └───────────────────────────┼─────────────────────┘ │ L3 │ I/O fetch ▼ Database, API, DNS, Disk, any I/O...
Kong. init_ Initialize cache in worker():
-- Initialize shared memory based cache local cache, err = kong_global.init_cache(kong.configuration, cluster_events, worker_events) if not cache then stash_init_worker_error("failed to instantiate 'kong.cache' module: " .. err) return end kong.cache = cache local core_cache, err = kong_global.init_core_cache(kong.configuration, cluster_events, worker_events) if not core_cache then stash_init_worker_error("failed to instantiate 'kong.core_cache' module: " .. err) return end kong.core_cache = core_cache ok, err = runloop.set_init_versions_in_cache() if not ok then stash_init_worker_error(err) -- 'err' fully formatted return end
global. init_ The cache () structure is as follows:
function _GLOBAL.init_cache(kong_config, cluster_events, worker_events) local db_cache_ttl = kong_config.db_cache_ttl local db_cache_neg_ttl = kong_config.db_cache_neg_ttl local cache_pages = 1 if kong_config.database == "off" then db_cache_ttl = 0 db_cache_neg_ttl = 0 cache_pages = 2 end return kong_cache.new { shm_name = "kong_db_cache", cluster_events = cluster_events, worker_events = worker_events, ttl = db_cache_ttl, neg_ttl = db_cache_neg_ttl or db_cache_ttl, resurrect_ttl = kong_config.resurrect_ttl, cache_pages = cache_pages, resty_lock_opts = { exptime = 10, timeout = 5, }, } end
Eventually, cache.exe will be called In Lua_ M.new() verifies the necessary parameters, detects whether the shared memory block can be accessed, associates cluster events and Worker events, defines serialization and deserialization methods, and encapsulates mlcache in one layer.
function _M.new(opts) -- opts validation opts = opts or {} local mlcaches = {} local shm_names = {} for i = 1, opts.cache_pages or 1 do local channel_name = (i == 1) and "mlcache" or "mlcache_2" local shm_name = (i == 1) and opts.shm_name or opts.shm_name .. "_2" local shm_miss_name = (i == 1) and opts.shm_name .. "_miss" or opts.shm_name .. "_miss_2" if ngx.shared[shm_name] then local mlcache, err = resty_mlcache.new(shm_name, shm_name, { shm_miss = shm_miss_name, shm_locks = "kong_locks", shm_set_retries = 3, lru_size = LRU_SIZE, ttl = max(opts.ttl or 3600, 0), neg_ttl = max(opts.neg_ttl or 300, 0), resurrect_ttl = opts.resurrect_ttl or 30, resty_lock_opts = opts.resty_lock_opts, ipc = { -- Function binding for interprocess communication register_listeners = function(events) for _, event_t in pairs(events) do opts.worker_events.register(function(data) event_t.handler(data) end, channel_name, event_t.channel) end end, broadcast = function(channel, data) local ok, err = opts.worker_events.post(channel_name, channel, data) if not ok then log(ERR, "failed to post event '", channel_name, "', '", channel, "': ", err) end end } }) if not mlcache then return nil, "failed to instantiate mlcache: " .. err end mlcaches[i] = mlcache shm_names[i] = shm_name end end local curr_mlcache = 1 if opts.cache_pages == 2 then curr_mlcache = ngx.shared.kong:get("kong:cache:" .. opts.shm_name .. ":curr_mlcache") or 1 end local self = { cluster_events = opts.cluster_events, mlcache = mlcaches[curr_mlcache], mlcaches = mlcaches, shm_names = shm_names, curr_mlcache = curr_mlcache, } local ok, err = self.cluster_events:subscribe("invalidations", function(key) log(DEBUG, "received invalidate event from cluster for key: '", key, "'") self:invalidate_local(key) end) if not ok then return nil, "failed to subscribe to invalidations cluster events " .. "channel: " .. err end _init[opts.shm_name] = true return setmetatable(self, mt) end
After initializing the cache module, the Worker will update the database according to the DB in the configuration file_ cache_ warmup_ Entities loads the specified database resources into memory for caching. The default configuration caches services and plugins.
LRU_ The size value is 500000, and the unit is item. Set the maximum number of items that can be stored. This value indicates that a single Worker LRU Cache occupies a maximum of 500M memory.
The Worker will load the database entity into the shared memory cache according to the configuration item.
local function execute_cache_warmup(kong_config) if kong_config.database == "off" then return true end -- Only in one worker Perform operations on -- Load database entities into shared memory cache if ngx.worker.id() == 0 then local ok, err = cache_warmup.execute(kong_config.db_cache_warmup_entities) if not ok then return nil, err end end return true end
Here, the database data is only loaded on one Worker process, and then synchronized to other workers.
cache_warmup.execute() checks the basic information, then calls cache_. warmup_ single_ Entity (DAO) method.
-- Load database entities into cache for faster access -- stay Worker Initialization phase run -- Default load service, plugins -- Size is not configured mem_cache_size influence -- Loads entities from the database into the cache, for rapid subsequent -- access. This function is intented to be used during worker initialization. function cache_warmup.execute(entities) if not kong.cache or not kong.core_cache then return true end for _, entity_name in ipairs(entities) do if entity_name == "routes" then -- do not spend shm memory by caching individual Routes entries -- because the routes are kept in-memory by building the router object kong.log.notice("the 'routes' entry is ignored in the list of ", "'db_cache_warmup_entities' because Kong ", "caches routes in memory separately") goto continue end local dao = kong.db[entity_name] if not (type(dao) == "table" and dao.schema) then kong.log.warn(entity_name, " is not a valid entity name, please check ", "the value of 'db_cache_warmup_entities'") goto continue end local ok, err = cache_warmup_single_entity(dao) if not ok then if err == "no memory" then kong.log.warn("cache warmup has been stopped because cache ", "memory is exhausted, please consider increasing ", "the value of 'mem_cache_size' (currently at ", kong.configuration.mem_cache_size, ")") return true end return nil, err end ::continue:: end return true end
Route s are not cached, because Routes have been built as a routing index tree in the previous section and fork ed into all Worker memory.
local function cache_warmup_single_entity(dao) local entity_name = dao.schema.name -- Selected storage location cache/core_cache local cache_store = constants.ENTITY_CACHE_STORE[entity_name] -- cache Global object local cache = kong[cache_store] ngx.log(ngx.NOTICE, "Preloading '", entity_name, "' into the ", cache_store, "...") local start = ngx.now() local hosts_array, hosts_set, host_count if entity_name == "services" then hosts_array = {} hosts_set = {} host_count = 0 end for entity, err in dao:each(nil, GLOBAL_QUERY_OPTS) do if err then return nil, err end if entity_name == "services" then if utils.hostname_type(entity.host) == "name" and hosts_set[entity.host] == nil then host_count = host_count + 1 hosts_array[host_count] = entity.host hosts_set[entity.host] = true end end -- obtain cache_key local cache_key = dao:cache_key(entity) -- call mlcache of safe_set method, -- An error will be reported if the memory is insufficient local ok, err = cache:safe_set(cache_key, entity) if not ok then return nil, err end end if entity_name == "services" and host_count > 0 then ngx.timer.at(0, warmup_dns, hosts_array, host_count) end local elapsed = math.floor((ngx.now() - start) * 1000) ngx.log(ngx.NOTICE, "finished preloading '", entity_name, "' into the ", cache_store, " (in ", tostring(elapsed), "ms)") return true end
cache_warmup_single_entity() will load all the data of the dao into memory, and the set method will distribute event synchronization data to other workers. Finally, each Worker will cache a copy.
3.3. event subscriptions
Kong. init_ Initialize Worker events and cluster events in Worker().
local worker_events, err = kong_global.init_worker_events() if not worker_events then stash_init_worker_error("failed to instantiate 'kong.worker_events' " .. "module: " .. err) return end kong.worker_events = worker_events local cluster_events, err = kong_global.init_cluster_events(kong.configuration, kong.db) if not cluster_events then stash_init_worker_error("failed to instantiate 'kong.cluster_events' " .. "module: " .. err) return end kong.cluster_events = cluster_events
Worker events are used internally lua-resty-worker-events The principle of interprocess event processing implemented by the library is to store events through shared memory and pull events in shared memory every second for processing.
function _GLOBAL.init_worker_events() -- Note: worker_events will not work correctly if required at the top of the file. -- It must be required right here, inside the init function local worker_events = require "resty.worker.events" local ok, err = worker_events.configure { shm = "kong_process_events", -- defined by "lua_shared_dict" timeout = 5, -- life time of event data in shm interval = 1, -- poll interval (seconds) wait_interval = 0.010, -- wait before retry fetching event data wait_max = 0.5, -- max wait time before discarding event } if not ok then return nil, err end return worker_events end
Cluster events (communication between multiple Kong s) are processed by storing events in the database, regularly polling the database to query events.
function _GLOBAL.init_cluster_events(kong_config, db) return kong_cluster_events.new({ db = db, poll_interval = kong_config.db_update_frequency, poll_offset = kong_config.db_update_propagation, poll_delay = kong_config.db_update_propagation, }) end
It can be seen from here that cluster events are implemented through database tables:
function _M:broadcast(channel, data, delay) if type(channel) ~= "string" then return nil, "channel must be a string" end if type(data) ~= "string" then return nil, "data must be a string" end if delay and type(delay) ~= "number" then return nil, "delay must be a number" elseif self.poll_delay > 0 then delay = self.poll_delay end -- insert event row --log(DEBUG, "broadcasting on channel: '", channel, "' data: ", data, -- " with delay: ", delay and delay or "none") local ok, err = self.strategy:insert(self.node_id, channel, nil, data, delay) if not ok then return nil, err end return true end function _M:subscribe(channel, cb, start_polling) if type(channel) ~= "string" then return error("channel must be a string") end if type(cb) ~= "function" then return error("callback must be a function") end if not self.callbacks[channel] then self.callbacks[channel] = { cb } insert(self.channels, channel) else insert(self.callbacks[channel], cb) end if start_polling == nil then start_polling = true end if not self.polling and start_polling and self.use_polling then -- start recurring polling timer local ok, err = timer_at(self.poll_interval, poll_handler, self) if not ok then return nil, "failed to start polling timer: " .. err end self.polling = true end return true end
In cache Cluster events in Lua subscribe to cache invalidation events, and internally call the delete method of mlcache to synchronize to all workers.
local ok, err = self.cluster_events:subscribe("invalidations", function(key) log(DEBUG, "received invalidate event from cluster for key: '", key, "'") self:invalidate_local(key) end) function _M:invalidate_local(key, shadow) if type(key) ~= "string" then error("key must be a string", 2) end log(DEBUG, "invalidating (local): '", key, "'") local current_page = self.curr_mlcache or 1 local delete_page if shadow and #self.mlcaches == 2 then delete_page = current_page == 1 and 2 or 1 else delete_page = current_page end local ok, err = self.mlcaches[delete_page]:delete(key) if not ok then log(ERR, "failed to delete entity from node cache: ", err) end end
This part mainly describes the event related operations during Kong initialization, mainly initializing event subscription, IPC inter process communication associated with mlcache, subscribing to cache failure events, and associating DAO event publishing.
function DB:set_events_handler(events) for _, dao in pairs(self.daos) do dao.events = events end end
4. Event distribution
Many parts of Kong pass through non blocking NGX timer. At () and NGX timer. The every() function performs scheduled tasks. This part is relatively scattered. It mainly describes that Kong performs non blocking one-time event processing and typical timing tasks.
4.1. Single task
4.1.1. DNS resolution
In cache_ warmup. When caching the services object in Lua, Kong will get the ip corresponding to the host in the services non blocking.
if entity_name == "services" and host_count > 0 then ngx.timer.at(0, warmup_dns, hosts_array, host_count) end local function warmup_dns(premature, hosts, count) if premature then return end ngx.log(ngx.NOTICE, "warming up DNS entries ...") local start = ngx.now() for i = 1, count do kong.dns.toip(hosts[i]) end local elapsed = math.floor((ngx.now() - start) * 1000) ngx.log(ngx.NOTICE, "finished warming up DNS entries", "' into the cache (in ", tostring(elapsed), "ms)") end
Use of Kong internal dns module lua-resty-dns-client , this library is also open source by Kong. It features the toip function, which configures the weighted polling weight according to the weight of the ip returned by dns, and stores the dns query results in memory.
warmup_ Call Kong. In dns() dns. Toip() method:
local function warmup_dns(premature, hosts, count) if premature then return end ngx.log(ngx.NOTICE, "warming up DNS entries ...") local start = ngx.now() for i = 1, count do kong.dns.toip(hosts[i]) end local elapsed = math.floor((ngx.now() - start) * 1000) ngx.log(ngx.NOTICE, "finished warming up DNS entries", "' into the cache (in ", tostring(elapsed), "ms)") end
4.2. Timed task
4.2.1. Cluster task
cluster_ events/init. Enable the timer in the cluster event subscription function in Lua to poll the database cluster event table.
function _M:subscribe(channel, cb, start_polling) if type(channel) ~= "string" then return error("channel must be a string") end if type(cb) ~= "function" then return error("callback must be a function") end if not self.callbacks[channel] then self.callbacks[channel] = { cb } insert(self.channels, channel) else insert(self.callbacks[channel], cb) end if start_polling == nil then start_polling = true end if not self.polling and start_polling and self.use_polling then -- start recurring polling timer local ok, err = timer_at(self.poll_interval, poll_handler, self) if not ok then return nil, "failed to start polling timer: " .. err end self.polling = true end return true end
NGX is not used here because it is necessary to judge the lock every time the loop is called timer. Instead of the every () function, NGX. X is called in an infinite loop timer. at().
poll_handler = function(premature, self) if premature or not self.polling then -- set self.polling to false to stop a polling loop return end if not get_lock(self) then local ok, err = timer_at(self.poll_interval, poll_handler, self) if not ok then log(CRIT, "failed to start recurring polling timer: ", err) end return end -- single worker local pok, perr, err = pcall(poll, self) if not pok then log(ERR, "poll() threw an error: ", perr) elseif not perr then log(ERR, "failed to poll: ", err) end -- unlock self.shm:delete(POLL_RUNNING_LOCK_KEY) local ok, err = timer_at(self.poll_interval, poll_handler, self) if not ok then log(CRIT, "failed to start recurring polling timer: ", err) end end
The lock ensures that only one Worker performs a single task through shared memory events.
local function get_lock(self) -- check if a poll is not currently running, to ensure we don't start -- another poll while a worker is still stuck in its own polling (in -- case it is being slow) -- we still add an exptime to this lock in case something goes horribly -- wrong, to ensure other workers can poll new events -- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min local ok, err = self.shm:safe_add(POLL_RUNNING_LOCK_KEY, true, max(self.poll_interval * 5, 10)) if not ok then if err ~= "exists" then log(ERR, "failed to acquire poll_running lock: ", err) end -- else -- log(DEBUG, "failed to acquire poll_running lock: ", -- "a worker still holds the lock") return false end if self.poll_interval > 0.001 then -- check if interval of `poll_interval` has elapsed already, to ensure -- we do not run the poll when a previous poll was quickly executed, but -- another worker got the timer trigger a bit too late. ok, err = self.shm:safe_add(POLL_INTERVAL_LOCK_KEY, true, self.poll_interval - 0.001) if not ok then if err ~= "exists" then log(ERR, "failed to acquire poll_interval lock: ", err) end -- else -- log(DEBUG, "failed to acquire poll_interval lock: ", -- "not enough time elapsed since last poll") self.shm:delete(POLL_RUNNING_LOCK_KEY) return false end end return true end
4.2.2. Database TTL
To add TTL to PostgreSQL, Kong uses init_ The worker stage calls the database layer dB / strategies / Postgres / connector Init in Lua_ Worker() function.
-- Some contents are omitted below and only key parts are shown function _mt:init_worker(strategies) if ngx.worker.id() == 0 then cleanup_statements[i] = concat { " DELETE FROM ", self:escape_identifier(table_name), " WHERE ", column_name, " < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';" } local cleanup_statement = concat(cleanup_statements, "\n") return timer_every(60, function(premature) local ok, err, _, num_queries = self:query(cleanup_statement) if not ok then if num_queries then for i = num_queries + 1, cleanup_statements_count do local statement = cleanup_statements[i] local ok, err = self:query(statement) if not ok then if err then log(WARN, "unable to clean expired rows from table '", sorted_strategies[i], "' on PostgreSQL database (", err, ")") else log(WARN, "unable to clean expired rows from table '", sorted_strategies[i], "' on PostgreSQL database") end end end else log(ERR, "unable to clean expired rows from PostgreSQL database (", err, ")") end end end) end return true end
When database initialization, add a timer, call the callback function in the association, and delete the TTL expired rows.
4.2.3. Update routing index
kong.init_worker() will add scheduled tasks and update the cache regularly.
-- Periodically rebuild routing cache if kong.db.strategy ~= "off" then timer_every(worker_state_update_frequency, function(premature) if premature then return end -- Don't wait for the semaphore (timeout = 0) when updating via the -- timer. -- If the semaphore is locked, that means that the rebuild is -- already ongoing. local ok, err = rebuild_router(ROUTER_ASYNC_OPTS) if not ok then log(ERR, "could not rebuild router via timer: ", err) end end) timer_every(worker_state_update_frequency, function(premature) if premature then return end local ok, err = rebuild_plugins_iterator(PLUGINS_ITERATOR_ASYNC_OPTS) if not ok then log(ERR, "could not rebuild plugins iterator via timer: ", err) end end) end
The actual call sequence is to open a cosocket protocol to judge whether the routes have changed. If so, the route cache will be reconstructed.
rebuild_router = function(opts) return rebuild("router", update_router, router_version, opts) end local function rebuild(name, callback, version, opts) local current_version, err = kong.core_cache:get(name .. ":version", TTL_ZERO, utils.uuid) if err then return nil, "failed to retrieve " .. name .. " version: " .. err end if current_version == version then return true end -- Open one cosocket Coroutine call callback return concurrency.with_coroutine_mutex(opts, callback) end update_router = function() -- we might not need to rebuild the router (if we were not -- the first request in this process to enter this code path) -- check again and rebuild only if necessary local version, err = get_router_version() if err then return nil, "failed to retrieve router version: " .. err end if version == router_version then return true end local ok, err = build_router(version) if not ok then return nil, --[[ 'err' fully formatted ]] err end return true end
Finally, build will be called_ Router () method, which we have described in 1.2.1.
5. Event handling
The Lua rest Worker events library is used for event processing among workers.
Event subscription function: events register(callback, source, event1, event2, ...), Callback method callback = function(data, event, source, pid).
Event publishing function: success, err = events post(source, event, data, unique)
5.1. Database events
db/dao/init.lua defines DAO related operation methods, which I have briefly described in 1.1.3.
The CRUD (actually no R) event of the database related entity will finally call Dao: post_ crud_ The event () method broadcasts events.
function DAO:post_crud_event(operation, entity, old_entity, options) if options and options.no_broadcast_crud_event then return end if self.events then local entity_without_nulls if entity then entity_without_nulls = remove_nulls(utils.deep_copy(entity, false)) end local old_entity_without_nulls if old_entity then old_entity_without_nulls = remove_nulls(utils.deep_copy(old_entity, false)) end local ok, err = self.events.post_local("dao:crud", operation, { operation = operation, schema = self.schema, entity = entity_without_nulls, old_entity = old_entity_without_nulls, }) if not ok then log(ERR, "[db] failed to propagate CRUD operation: ", err) end end end
An event is published in the dao:crud channel. The operation types are create, update and delete.
runloop/handler. Register in Lua_ Events () will be in Kong init_ Called in worker (), it will subscribe to database related events and add processing functions.
worker_events.register(function(data) if not data.schema then log(ERR, "[events] missing schema in crud subscriber") return end if not data.entity then log(ERR, "[events] missing entity in crud subscriber") return end -- invalidate this entity anywhere it is cached if it has a -- caching key -- If entity have cache_key Then let it fail -- Basically only entity schema There will be no if the definition is wrong cache_key local cache_key = db[data.schema.name]:cache_key(data.entity) local cache_obj = kong[constants.ENTITY_CACHE_STORE[data.schema.name]] if cache_key then cache_obj:invalidate(cache_key) end -- if we had an update, but the cache key was part of what was updated, -- we need to invalidate the previous entity as well if data.old_entity then local old_cache_key = db[data.schema.name]:cache_key(data.old_entity) if old_cache_key and cache_key ~= old_cache_key then cache_obj:invalidate(old_cache_key) end end if not data.operation then log(ERR, "[events] missing operation in crud subscriber") return end -- public worker events propagation -- obtain schema name local entity_channel = data.schema.table or data.schema.name local entity_operation_channel = fmt("%s:%s", entity_channel, data.operation) -- crud:routes local ok, err = worker_events.post_local("crud", entity_channel, data) if not ok then log(ERR, "[events] could not broadcast crud event: ", err) return end -- crud:routes:create ok, err = worker_events.post_local("crud", entity_operation_channel, data) if not ok then log(ERR, "[events] could not broadcast crud event: ", err) return end end, "dao:crud")
CRUD (no R) event processing flow: call the cache:invalidate() method. A worker level event is published inside the method to notify the worker process to delete the data. A cluster event is also published to synchronously delete the data between clusters.
-- Modified Routes It will be emptied after router:version Cache, -- This will cause the routing table to be rebuilt. See 2 for details.2.3 worker_events.register(function() log(DEBUG, "[events] Route updated, invalidating router") core_cache:invalidate("router:version") end, "crud", "routes") ...The same is true for other objects
6. Plug in loading
6.1. Plug in read
The init stage will load the plug-in list of plugins = bundled and skywalking integrator in the configuration file, and call Lua require to load the corresponding package. (all plug-in packages must be under kong.plugins)
function Plugins:load_plugin_schemas(plugin_set) self.handlers = nil local go_plugins_cnt = 0 local handlers = {} local errs -- load installed plugins for plugin in pairs(plugin_set) do local handler, err = load_plugin(self, plugin) if handler then if type(handler.is) == "function" and handler:is(BasePlugin) then -- Backwards-compatibility for 0.x and 1.x plugins inheriting from the -- BasePlugin class. -- TODO: deprecate & remove handler = handler() end if handler._go then go_plugins_cnt = go_plugins_cnt + 1 end handlers[plugin] = handler else errs = errs or {} table.insert(errs, "on plugin '" .. plugin .. "': " .. tostring(err)) end end if errs then return nil, "error loading plugin schemas: " .. table.concat(errs, "; ") end reports.add_immutable_value("go_plugins_cnt", go_plugins_cnt) self.handlers = handlers return true end
The Handler functions of all plug-ins will be stored in Kong db. plugins. Handlers, the data format is {plugin_name: handler}.
All plug-ins will be stored in the Worker process and updated periodically.
I sorted out the structure of the plug-in table loaded into Lua table and output it into YAML for easy understanding:
map: plugin_name: true combos: plugin_name: # both: {} both: route_id: service_id # routes: {} routes: route_id: true # services: {} services: service_id: true 0: true # Global plug-in 1: true # Routing plug-in 2: true # Service plug-in 3: true # Routing + Service 4: true # Consumer plug-in 5: true # Routing + Consumer plug-in 6: true # Routing + Service+Consumer plug-in loaded: plugin_name: handler: phase_name: func()
6.2. Plug in call
The plug-in does not bind to the route directly. The plug-in has its own life cycle, which is basically the same as that of Kong. The corresponding methods of the plug-in will be called at each stage of the Kong life cycle.
The plug-in only judges whether to associate the current Route, Service, and Consumer at the call stage. If so, read the configuration item (plug-in Entity) associated with the plug-in from the database and use Kong core_ Cache.
local function load_configuration_through_combos(ctx, combos, plugin) local plugin_configuration local name = plugin.name local route = ctx.route local service = ctx.service local consumer = ctx.authenticated_consumer if route and plugin.no_route then route = nil end if service and plugin.no_service then service = nil end if consumer and plugin.no_consumer then consumer = nil end local route_id = route and route.id or nil local service_id = service and service.id or nil local consumer_id = consumer and consumer.id or nil if kong.db.strategy == "off" then ... else if route_id and service_id and consumer_id and combos[COMBO_RSC] and combos.both[route_id] == service_id then plugin_configuration = load_configuration(ctx, name, route_id, service_id, consumer_id) if plugin_configuration then return plugin_configuration end end if consumer_id and combos[COMBO_C] then plugin_configuration = load_configuration(ctx, name, nil, nil, consumer_id) if plugin_configuration then return plugin_configuration end end if route_id and combos[COMBO_R] and combos.routes[route_id] then plugin_configuration = load_configuration(ctx, name, route_id) if plugin_configuration then return plugin_configuration end end ... if combos[COMBO_GLOBAL] then return load_configuration(ctx, name) end end end
Whether the current Service, Route and Consumer are paired with a plug-in will be queried here. If successful, the corresponding configuration items will be loaded:
--- Load the configuration for a plugin entry. -- Given a Route, Service, Consumer and a plugin name, retrieve the plugin's -- configuration if it exists. Results are cached in ngx.dict -- @param[type=string] name Name of the plugin being tested for configuration. -- @param[type=string] route_id Id of the route being proxied. -- @param[type=string] service_id Id of the service being proxied. -- @param[type=string] consumer_id Id of the donsumer making the request (if any). -- @treturn table Plugin configuration, if retrieved. local function load_configuration(ctx, name, route_id, service_id, consumer_id) local ws_id = workspaces.get_workspace_id() or kong.default_workspace local key = kong.db.plugins:cache_key(name, route_id, service_id, consumer_id, nil, ws_id) local plugin, err = kong.core_cache:get(key, nil, load_plugin_from_db, key) if err then ctx.delay_response = false ngx.log(ngx.ERR, tostring(err)) return ngx.exit(ngx.ERROR) end if not plugin or not plugin.enabled then return end local cfg = plugin.config or {} if not cfg.__key__ then cfg.__key__ = key cfg.__seq__ = next_seq next_seq = next_seq + 1 end cfg.route_id = plugin.route and plugin.route.id cfg.service_id = plugin.service and plugin.service.id cfg.consumer_id = plugin.consumer and plugin.consumer.id return cfg end
There are two ways to call plug-ins:
- Synchronous call
- Asynchronous call
Except access_ by_ In the Lua phase, synchronous calls are used:
local function execute_plugins_iterator(plugins_iterator, phase, ctx) local old_ws = ctx and ctx.workspace for plugin, configuration in plugins_iterator:iterate(phase, ctx) do if ctx then if plugin.handler._go then ctx.ran_go_plugin = true end kong_global.set_named_ctx(kong, "plugin", plugin.handler) end kong_global.set_namespaced_log(kong, plugin.name) -- Here is the synchronous call plugin.handler[phase](plugin.handler, configuration) kong_global.reset_log(kong) if ctx then ctx.workspace = old_ws end end end
And in access_ by_ In the Lua phase, use the coprocessor to call asynchronously:
for plugin, plugin_conf in plugins_iterator:iterate("access", ctx) do if plugin.handler._go then ctx.ran_go_plugin = true end if not ctx.delayed_response then kong_global.set_named_ctx(kong, "plugin", plugin.handler) kong_global.set_namespaced_log(kong, plugin.name) -- use Lua coroutine Open the coroutine to call the plug-in function asynchronously local err = coroutine.wrap(plugin.handler.access)(plugin.handler, plugin_conf) if err then kong.log.err(err) ctx.delayed_response = { status_code = 500, content = { message = "An unexpected error occurred" }, } end kong_global.reset_log(kong) end ctx.workspace = old_ws end
7. Caching mechanism
Based on my analysis of Kong's source code, this section makes a brief review of the caching mechanism.
Kong has these operations for caching:
- Initialize cache block
- Preload database content to cache
- Add the data content loaded only when accessing to the cache
- timer periodically updates the cache
- Database CRUD operation delete cache
- Synchronous cache between clusters / workers
Cache load content:
Under the default configuration, Kong loads all routing tables and Routes into the memory of each Worker, and all Services and Plugins into the memory and shared memory of each Worker. Upstream and Targets are timely obtained from the database according to the analysis of the load balancer and loaded into memory and shared memory.
The above Entity is loaded in the L1+L2 two-level cache core created by the mlcache library_ In cache.
consumers are loaded into a cache with a different name created for mlcache.
8. Request lifecycle
This section describes the process of a request processed by Kong.
8.1. ssl_certificate_by_lua stage
local function execute() local sn, err = server_name() if err then log(ERR, "could not retrieve SNI: ", err) return ngx.exit(ngx.ERROR) end local cert_and_key, err = find_certificate(sn) if err then log(ERR, err) return ngx.exit(ngx.ERROR) end if cert_and_key == default_cert_and_key then -- use (already set) fallback certificate return end -- set the certificate for this connection local ok, err = clear_certs() if not ok then log(ERR, "could not clear existing (default) certificates: ", err) return ngx.exit(ngx.ERROR) end ok, err = set_cert(cert_and_key.cert) if not ok then log(ERR, "could not set configured certificate: ", err) return ngx.exit(ngx.ERROR) end ok, err = set_priv_key(cert_and_key.key) if not ok then log(ERR, "could not set configured private key: ", err) return ngx.exit(ngx.ERROR) end end
Find the corresponding SSL certificate Cert and private key according to the Server Name and set them on Nginx.
8.2. rewrite_by_lua stage
local ctx = ngx.ctx if not ctx.KONG_PROCESSING_START then ctx.KONG_PROCESSING_START = ngx.req.start_time() * 1000 end if not ctx.KONG_REWRITE_START then ctx.KONG_REWRITE_START = get_now_ms() end kong_global.set_phase(kong, PHASES.rewrite) kong_resty_ctx.stash_ref() local is_https = var.https == "on" if not is_https then log_init_worker_errors(ctx) end runloop.rewrite.before(ctx) ... rewrite = { before = function(ctx) ctx.host_port = HOST_PORTS[var.server_port] or var.server_port -- special handling for proxy-authorization and te headers in case -- the plugin(s) want to specify them (store the original) ctx.http_proxy_authorization = var.http_proxy_authorization ctx.http_te = var.http_te end, },
Initialize Kong CTX lifecycle Context, add request information for Context.
8.3. access_by_lua stage
8.3.1. Route matching
runloop.access.before will call the router instance for route matching. First, get is called_ updated_ Router () determines whether there is a route update. If not, the current router instance is returned.
-- routing request local router = get_updated_router() -- call Router.exec() Find matching routes local match_t = router.exec() if not match_t then return kong.response.exit(404, { message = "no Route matched with those values" }) end
Router. The exec () method will eventually call router find_ Route() method, which receives the request header field, generates the route Cache Key, and finds the corresponding item.
local function find_route(req_method, req_uri, req_host, req_scheme, src_ip, src_port, dst_ip, dst_port, sni, req_headers) req_method = req_method or "" req_uri = req_uri or "" req_host = req_host or "" req_headers = req_headers or EMPTY_T ctx.req_method = req_method ctx.req_uri = req_uri ctx.req_host = req_host ctx.req_headers = req_headers ctx.src_ip = src_ip or "" ctx.src_port = src_port or "" ctx.dst_ip = dst_ip or "" ctx.dst_port = dst_port or "" ctx.sni = sni or "" local cache_key = req_method .. "|" .. req_uri .. "|" .. req_host .. "|" .. ctx.src_ip .. "|" .. ctx.src_port .. "|" .. ctx.dst_ip .. "|" .. ctx.dst_port .. "|" .. ctx.sni do local match_t = cache:get(cache_key) if match_t and hits.header_name == nil then return match_t end end
If there is a matching route in the LRU cache, it returns directly.
Otherwise, continue to match the route, generate matching items and store them in the cache.
... local match_t = { route = matched_route.route, service = matched_route.service, headers = matched_route.headers, upstream_url_t = upstream_url_t, upstream_scheme = upstream_url_t.scheme, upstream_uri = upstream_uri, upstream_host = upstream_host, prefix = request_prefix, matches = { uri_captures = matches.uri_captures, uri = matches.uri, host = matches.host, headers = matches.headers, method = matches.method, src_ip = matches.src_ip, src_port = matches.src_port, dst_ip = matches.dst_ip, dst_port = matches.dst_port, sni = matches.sni, } } if band(matched_route.match_rules, MATCH_RULES.HEADER) == 0 then cache:set(cache_key, match_t) end ...
After successful matching, the associated Route and Service will be written to NGX CTX, shared in the next lifecycle.
8.3.2. Request scheduling
runloop.access.after parses the IP, Port, Schema and other parameters to be requested by the backend according to Route, Service and other conditions.
-- looks up a balancer for the target. -- @param target the table with the target details -- @param no_create (optional) if true, do not attempt to create -- (for thorough testing purposes) -- @return balancer if found, `false` if not found, or nil+error on error local function get_balancer(target, no_create) -- NOTE: only called upon first lookup, so `cache_only` limitations -- do not apply here local hostname = target.host -- first go and find the upstream object, from cache or the db local upstream, err = get_upstream_by_name(hostname) if upstream == false then return false -- no upstream by this name end if err then return nil, err -- there was an error end local balancer = balancers[upstream.id] if not balancer then if no_create then return nil, "balancer not found" else log(ERR, "balancer not found for ", upstream.name, ", will create it") return create_balancer(upstream), upstream end end return balancer, upstream end
get_ The balancer () returns the Target of the final request and the load balancer according to the Host of the Service.
local ip, port, hostname, handle if balancer then -- have to invoke the ring-balancer local hstate = run_hook("balancer:get_peer:pre", target.host) ip, port, hostname, handle = balancer:getPeer(dns_cache_only, target.balancer_handle, hash_value) run_hook("balancer:get_peer:post", hstate) if not ip and (port == "No peers are available" or port == "Balancer is unhealthy") then return nil, "failure to get a peer from the ring-balancer", 503 end hostname = hostname or ip target.hash_value = hash_value target.balancer_handle = handle else -- have to do a regular DNS lookup local try_list local hstate = run_hook("balancer:to_ip:pre", target.host) ip, port, try_list = toip(target.host, target.port, dns_cache_only) run_hook("balancer:to_ip:post", hstate) hostname = target.host if not ip then log(ERR, "DNS resolution failed: ", port, ". Tried: ", tostring(try_list)) if port == "dns server error: 3 name error" or port == "dns client error: 101 empty record received" then return nil, "name resolution failed", 503 end end end
Call the policy of the load balancer to obtain the IP address of the Target, or directly use DNS query to obtain the IP address. In this step, DNS pre caching has been carried out in advance in 2.1.1, which can be read from the cache.
If the Service Host is the IP address directly, the load balancing policy is not executed.
-- ip Return directly if target.type ~= "name" then -- it's an ip address (v4 or v6), so nothing we can do... target.ip = target.host target.port = target.port or 80 -- TODO: remove this fallback value target.hostname = target.host return true end
8.4. balancer_by_lua stage
Use NGX balancer. set_ more_ Tries() sets the number of error retries, using NGX balancer. get_ last_ Failure() obtains the details of the last request error and performs passive health check on the upstream node in error processing.
if balancer_data.try_count > 1 then -- only call balancer on retry, first one is done in `runloop.access.after` -- which runs in the ACCESS context and hence has less limitations than -- this BALANCER context where the retries are executed -- record failure data local previous_try = tries[balancer_data.try_count - 1] previous_try.state, previous_try.code = get_last_failure() -- Report HTTP status for health checks local balancer = balancer_data.balancer if balancer then if previous_try.state == "failed" then if previous_try.code == 504 then balancer.report_timeout(balancer_data.balancer_handle) else balancer.report_tcp_failure(balancer_data.balancer_handle) end else balancer.report_http_status(balancer_data.balancer_handle, previous_try.code) end end local ok, err, errcode = balancer_execute(balancer_data) if not ok then ngx_log(ngx_ERR, "failed to retry the dns/balancer resolver for ", tostring(balancer_data.host), "' with: ", tostring(err)) ctx.KONG_BALANCER_ENDED_AT = get_now_ms() ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_BALANCER_START ctx.KONG_PROXY_LATENCY = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_PROCESSING_START return ngx.exit(errcode) end else -- first try, so set the max number of retries local retries = balancer_data.retries if retries > 0 then set_more_tries(retries) end end
Request to the final resolved back-end service, using NGX balancer. set_ current_ The peer () method sets the access address.
-- set the targets as resolved ngx_log(ngx_DEBUG, "setting address (try ", balancer_data.try_count, "): ", balancer_data.ip, ":", balancer_data.port) -- Address of final dispatch local ok, err = set_current_peer(balancer_data.ip, balancer_data.port, pool_opts) if not ok then ngx_log(ngx_ERR, "failed to set the current peer (address: ", tostring(balancer_data.ip), " port: ", tostring(balancer_data.port), "): ", tostring(err)) ctx.KONG_BALANCER_ENDED_AT = get_now_ms() ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_BALANCER_START ctx.KONG_PROXY_LATENCY = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_PROCESSING_START return ngx.exit(500) end
8.5. header_filter_by_lua stage
This stage is executed after Kong receives the Header field returned by the upstream service.
local upstream_status_header = constants.HEADERS.UPSTREAM_STATUS if singletons.configuration.enabled_headers[upstream_status_header] then header[upstream_status_header] = tonumber(sub(var.upstream_status or "", -3)) if not header[upstream_status_header] then log(ERR, "failed to set ", upstream_status_header, " header") end end local hash_cookie = ctx.balancer_data.hash_cookie if not hash_cookie then return end local cookie = ck:new() local ok, err = cookie:set(hash_cookie)
runloop.header_filter.before, add the node status in the header of the returned result, and determine whether to add the Cookie of the load balancer consistency policy.
8.6. body_filter_by_lua stage
This phase is executed when receiving the Body data returned by the upstream service. chunks are divided according to the data size. This phase will be executed multiple times.
In the life cycle of Openresty, body_ filter_ by_ Using NGX. In Lua Arg [1] reads chunk using NGX Arg [2] marks EOF.
-- Got all the body if kong.ctx.core.response_body then arg[1] = kong.ctx.core.response_body arg[2] = true end if not arg[2] then return end -- Get all body after -- Re count execution time ctx.KONG_BODY_FILTER_ENDED_AT = get_now_ms() ctx.KONG_BODY_FILTER_TIME = ctx.KONG_BODY_FILTER_ENDED_AT - ctx.KONG_BODY_FILTER_START if ctx.KONG_PROXIED then -- time spent receiving the response (header_filter + body_filter) -- we could use $upstream_response_time but we need to distinguish the waiting time -- from the receiving time in our logging plugins (especially ALF serializer). ctx.KONG_RECEIVE_TIME = ctx.KONG_BODY_FILTER_ENDED_AT - (ctx.KONG_HEADER_FILTER_START or ctx.KONG_BALANCER_ENDED_AT or ctx.KONG_BALANCER_START or ctx.KONG_ACCESS_ENDED_AT)
8.7. log_by_lua stage
Call Lua's garbage collector to count Kong's memory usage:
local update_lua_mem do local pid = ngx.worker.pid local kong_shm = ngx.shared.kong local Lua_MEM_SAMPLE_RATE = 10 -- seconds local last = ngx.time() local collectgarbage = collectgarbage update_lua_mem = function(force) local time = ngx.time() if force or time - last >= Lua_MEM_SAMPLE_RATE then local count = collectgarbage("count") local ok, err = kong_shm:safe_set("kong:mem:" .. pid(), count) if not ok then log(ERR, "could not record Lua VM allocated memory: ", err) end last = ngx.time() end end end
Call the load balancer to adjust the weight of upstream nodes according to the response results:
-- If response was produced by an upstream (ie, not by a Kong plugin) -- Report HTTP status for health checks local balancer_data = ctx.balancer_data if balancer_data and balancer_data.balancer_handle then local status = ngx.status if status == 504 then balancer_data.balancer.report_timeout(balancer_data.balancer_handle) else balancer_data.balancer.report_http_status( balancer_data.balancer_handle, status) end -- release the handle, so the balancer can update its statistics balancer_data.balancer_handle:release() end
9. Admin API
Kong Admin API portal:
function Kong.admin_content(options) local ctx = ngx.ctx if not ctx.workspace then ctx.workspace = kong.default_workspace end return serve_content("kong.api", options) end
local function serve_content(module, options) -- CORS Cross domain correlation header["Access-Control-Allow-Origin"] = options.allow_origin or "*" -- start-up lapis lapis.serve(module) end
about Lapis:
Lapis is a framework for building web applications using MoonScript or Lua that runs inside of a customized version of Nginx called OpenResty.
# api/init.lua -- Load fixed route -- Load core routes for _, v in ipairs({"kong", "health", "cache", "config", "clustering"}) do local routes = require("kong.api.routes." .. v) api_helpers.attach_routes(app, routes) end local routes = {} -- DAO Routes for _, dao in pairs(singletons.db.daos) do if dao.schema.generate_admin_api ~= false and not dao.schema.legacy then routes = Endpoints.new(dao.schema, routes) end end
Initialize build route:
# api/endpoints.lua -- Create base route -- Generates admin api endpoint functions -- -- Examples: -- -- /routes -- /routes/:routes -- /routes/:routes/service -- /services/:services/routes -- -- and -- -- /services -- /services/:services -- /services/:services/routes/:routes local function generate_endpoints(schema, endpoints) -- list route -- e.g. /routes generate_collection_endpoints(endpoints, schema) -- Monomer routing -- e.g. /routes/:routes generate_entity_endpoints(endpoints, schema) -- Determine whether there is an associated object -- for example route relation services for foreign_field_name, foreign_field in schema:each_field() do -- Foreign key if foreign_field.type == "foreign" and not foreign_field.schema.legacy then -- e.g. /routes/:routes/service generate_entity_endpoints(endpoints, schema, foreign_field.schema, foreign_field_name, true) -- e.g. /services/:services/routes generate_collection_endpoints(endpoints, schema, foreign_field.schema, foreign_field_name) -- e.g. /services/:services/routes/:routes generate_entity_endpoints(endpoints, foreign_field.schema, schema, foreign_field_name) end end return endpoints end -- Generates admin api collection endpoint functions -- -- Examples: -- -- /routes -- /services/:services/routes -- -- and -- -- /services local function generate_collection_endpoints(endpoints, schema, foreign_schema, foreign_field_name) local collection_path -- Foreign key Association if foreign_schema then collection_path = fmt("/%s/:%s/%s", foreign_schema.admin_api_name or foreign_schema.name, foreign_schema.name, schema.admin_api_nested_name or schema.admin_api_name or schema.name) else -- No foreign key Association collection_path = fmt("/%s", schema.admin_api_name or schema.name) end endpoints[collection_path] = { schema = schema, methods = { --OPTIONS = method_not_allowed, --HEAD = method_not_allowed, GET = get_collection_endpoint(schema, foreign_schema, foreign_field_name), POST = post_collection_endpoint(schema, foreign_schema, foreign_field_name), --PUT = method_not_allowed, --PATCH = method_not_allowed, --DELETE = method_not_allowed, }, } end
Only focus on the part of POST request processing:
local function post_collection_endpoint(schema, foreign_schema, foreign_field_name, method) return function(self, db, helpers, post_process) if foreign_schema then local foreign_entity, _, err_t = select_entity(self, db, foreign_schema) if err_t then return handle_error(err_t) end if not foreign_entity then return not_found() end self.args.post[foreign_field_name] = foreign_schema:extract_pk_values(foreign_entity) end -- Processing request, parameter verification, inserting data local entity, _, err_t = insert_entity(self, db, schema, method) if err_t then return handle_error(err_t) end -- Callback function if post_process then entity, _, err_t = post_process(entity) if err_t then return handle_error(err_t) end end return created(entity) end end
The Admin API is only a layer of API encapsulation and is not responsible for the event processing and data synchronization behind it. The event processing behind it is described in the event processing section of the article.
10. Plug in development
Briefly introduce some small tricks that can be used in plug-in development.
10.1. Multi level Schema nesting
It looks disgusting, but it's like multi-layer Schema nesting.
local schema = { name = plugin_name, fields = { { consumer = typedefs.no_consumer }, { protocols = typedefs.protocols_http }, { config = { type = "record", fields = { { rules = { type = "array", elements = { type = "record", fields = { { match = { type = "array", elements = { type = "record", fields = { { vars = { type = "array", elements = { type = "array", elements = { type = "string" } } } } } } } } } } } } }, } } } }
10.2. Custom Schema validator
local expr = require("resty.expr.v1") local schema_validator = function(conf) if conf.rules then for _, rule in ipairs(conf.rules) do if rule.match and type(rule.match) == "table" then for _, m in pairs(rule.match) do local ok, err = expr.new(m.vars) if not ok then return false, "failed to validate the 'vars' expression: " .. err end end end end end return true end
10.3. Log print Table
kong.log.inspect.on() kong.log.debug("Lua table: ", t)
10.4. Custom log output
Versions above 2.3.0 are available.
local entry = { entries = ctx.log_entries, id = self.transaction_id, action = action_name, } kong.log.set_serialize_value("waf", entry)