Kong source code analysis

This article was originally published in the company in September 2020, and now it is sorted out and added some comments for public release.

When I first joined the company's Infrastructure team, a confused task I received was to learn from Lua and OpenResty At that time, I received the PDF files of two books and asked to understand and learn as soon as possible Kong , and has R & D capability.

At that time, I didn't have much contact with the open source community. My ability only stayed in Git Clone. It took me about two weeks. After learning Lua's basic grammar, I began to read the source code of the Kong project, found several entry points and sorted out the source code analysis documents. It should also be this achievement that made the team leader agree with me. After that, I was responsible for the development of the company's API Gateway, And related landing work.

In this paper, the startup process, plug-in mechanism, caching mechanism and request life cycle of Kong are described in detail. However, what is still lacking is the proxy forwarding function, such as load balancing, health check and service discovery. However, at that time, I was still an ignorant newcomer, I'll forgive myself first [the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-ybygjfst-1641957618915)( https://twemoji.maxcdn.com/v/13.1.0/svg/1f43e.svg )].

Following this article, I also wrote the source code analysis of Kong Ingress Controller, followed by the source code analysis of apisik.

1. General

This article is aimed at Kong Version 2.1 (Stable).

The Commits I read and annotated in Chinese can be seen here:
https://github.com/mayocream/kong/commits?author=mayocream

Execution phase of Kong (OpenResty):

Kong's plug-in mechanism is also based on the life cycle of OpenResty, but it is a little encapsulated in the upper layer.

Kong's database association relationship:

Although Kong nominates himself as a Cloud Native project 1 , also on the CNCF list Panorama However, it also relies on the traditional database PostgreSQL and defines many function s, which is much weaker than the Etcd stored behind APIs IX. Compared with Etcd, which can establish a long HTTP connection, when the Watch data changes, Kong can only rely on regular polling to update the status from the database. The high availability of the database is much more complex than Etcd cluster.

2. Configuration file

Kong will resolve in the startup phase kong/templates Under the directory lua template file, inject environment variables and Kong Conf overrides the configuration and generates the Nginx startup configuration file Nginx conf.

The structure is as follows:

pid pids/nginx.pid;
error_log logs/error.log notice;

# injected nginx_main_* directives

env SKYWALKING_URL;

events {
    # injected nginx_events_* directives
    multi_accept on;
    worker_connections 16384;
}

http {
    lua_package_path       './?.lua;./?/init.lua;;;;';
    lua_package_cpath      ';;;';

    lua_shared_dict kong                        5m;
    lua_shared_dict kong_locks                  8m;
	...

    # injected nginx_http_* directives
    client_body_buffer_size 8k;

    init_by_lua_block {
        Kong = require 'kong'
        Kong.init()
    }

    init_worker_by_lua_block {
        Kong.init_worker()
    }

    upstream kong_upstream {
        server 0.0.0.1;

        # injected nginx_upstream_* directives
        
        balancer_by_lua_block {
            Kong.balancer()
        }
    }

	# Kong Proxy
    server {
        server_name kong;
		...
    }

	# Kong Admin API
    server {
        server_name kong_admin;
        ...
    }


}

Kong defines nginx_ MAIN_ 30. Environment variables such as these will be loaded into nginx. Net during the parsing configuration phase The specified location of conf can avoid directly modifying the template file.

For example:

# Define the env variable in the main block
$ export NGINX_MAIN_ENV SKYWALKING_URL;
# Create a new lua shared dict
$ export NGINX_HTTP_Lua_SHARED_DICT tracing_buffer 128m;

Kong Official configuration document The meaning of each parameter has been explained in great detail.

It is added here that we usually need to define multiple shared dicts, and the configuration writing method needs to be changed to this ugly form:

nginx_http_lua_shared_dict = cache_buffer_one 128m; lua_shared_dict cache_buffer_two 128m

3. Initialization

3.1. database initialized

Kong. Initialize database related in init() method:

  -- Database connection related
  local db = assert(DB.new(config))
  assert(db:init_connector())
  kong.db = db

DB. Schema. Is called in turn in the new () method new(),Entity.new(),DAO. The new () method is described one by one below.

3.1.1. Schema

Kong's Schema data structure is located under db/schema/entities, just routes Lua as an example:

local typedefs = require "kong.db.schema.typedefs"


return {
  name         = "routes",
  primary_key  = { "id" },
  endpoint_key = "name",
  workspaceable = true,
  subschema_key = "protocols",

  fields = {
    { id             = typedefs.uuid, },
    { created_at     = typedefs.auto_timestamp_s },
    { updated_at     = typedefs.auto_timestamp_s },
    { name           = typedefs.name },
    { protocols      = { type     = "set",
                         len_min  = 1,
                         required = true,
                         elements = typedefs.protocol,
                         mutually_exclusive_subsets = {
                           { "http", "https" },
                           { "tcp", "tls" },
                           { "grpc", "grpcs" },
                         },
                         default = { "http", "https" }, -- TODO: different default depending on service's scheme
                       }, },
    { methods        = typedefs.methods },
    { hosts          = typedefs.hosts },
    { paths          = typedefs.paths },
    { headers        = typedefs.headers },
    { https_redirect_status_code = { type = "integer",
                                     one_of = { 426, 301, 302, 307, 308 },
                                     default = 426, required = true,
                                   }, },
    { regex_priority = { type = "integer", default = 0 }, },
    { strip_path     = { type = "boolean", default = true }, },
    { path_handling  = { type = "string", default = "v0", one_of = { "v0", "v1" }, }, },
    { preserve_host  = { type = "boolean", default = false }, },
    { snis = { type = "set",
               elements = typedefs.sni }, },
    { sources = typedefs.sources },
    { destinations = typedefs.destinations },
    { tags             = typedefs.tags },
    { service = { type = "foreign", reference = "services" }, },
  },

  entity_checks = {
    { conditional = { if_field = "protocols",
                      if_match = { elements = { type = "string", not_one_of = { "grpcs", "https", "tls" }}},
                      then_field = "snis",
                      then_match = { len_eq = 0 },
                      then_err = "'snis' can only be set when 'protocols' is 'grpcs', 'https' or 'tls'",
                    }},
                  }
}

primary_key is the primary key in the database and also the cache_ Default cache when key is not defined_ key.

In the case of type=foreign, the entity will be loaded as a subschema when it is loaded.

Unlike other entities, plug-ins have specific caches_ key.

  name = "plugins",
  primary_key = { "id" },
  cache_key = { "name", "route", "service", "consumer" },

Call Entity. in Cache related operations cache_ Key() get.

    local cache_key = dao:cache_key(entity)
    local ok, err = cache:safe_set(cache_key, entity)

Generate cache specifically_ The key method returns a string as the cache key.

function DAO:cache_key(key, arg2, arg3, arg4, arg5, ws_id)

  if self.schema.workspaceable then
    ws_id = ws_id or workspaces.get_workspace_id()
  end

  -- Fast path: passing the cache_key/primary_key entries in
  -- order as arguments, this produces the same result as
  -- the generic code below, but building the cache key
  -- becomes a single string.format operation
  if type(key) == "string" then
    return fmt("%s:%s:%s:%s:%s:%s:%s", self.schema.name,
               key == nil and "" or key,
               arg2 == nil and "" or arg2,
               arg3 == nil and "" or arg3,
               arg4 == nil and "" or arg4,
               arg5 == nil and "" or arg5,
               ws_id == nil and "" or ws_id)
  end

  -- Generic path: build the cache key from the fields
  -- listed in cache_key or primary_key

  if type(key) ~= "table" then
    error("key must be a string or an entity table", 2)
  end

  if key.ws_id then
    ws_id = key.ws_id
  end

  local values = new_tab(7, 0)
  values[1] = self.schema.name
  local source = self.schema.cache_key or self.schema.primary_key

  local i = 2
  for _, name in ipairs(source) do
    local field = self.schema.fields[name]
    local value = key[name]
    if value == null or value == nil then
      value = ""
    elseif field.type == "foreign" then
      -- FIXME extract foreign key, do not assume `id`
      value = value.id
    end
    values[i] = tostring(value)
    i = i + 1
  end
  for n = i, 6 do
    values[n] = ""
  end

  values[7] = ws_id or ""

  return concat(values, ":")
end

schema/init. The basic methods of schema related operations are defined in Lua:

-- each_field() For traversal schema of fields
-- yes schema The most frequent related operations
function Schema:each_field(values)
  local i = 1

  local subschema
  if values then
    subschema = get_subschema(self, values)
  end

  return function()
    local item = self.fields[i]
    if not item then
      return nil
    end
    local key = next(item)
    local field = resolve_field(self, key, item[key], subschema)
    i = i + 1
    return key, field
  end
end

Schema. Set by tuple in new() method__ index allows the structure to inherit a series of operation methods defined under the schema.

function Schema.new(definition, is_subschema)
  if not definition then
    return nil, validation_errors.SCHEMA_NO_DEFINITION
  end

  if not definition.fields then
    return nil, validation_errors.SCHEMA_NO_FIELDS
  end

  local self = copy(definition)

  -- inherit Schema A series of operation methods defined under
  setmetatable(self, Schema)

  -- entity Cached cache_key,
  -- If this field is not available, it is used by default schema Defined
  -- primary_key As cache_key
  -- cache_key It's an array,
  -- It's just stored separately
  if self.cache_key then
    self.cache_key_set = {}
    for _, name in ipairs(self.cache_key) do
      self.cache_key_set[name] = true
    end
  end


  -- By tuple __index Method call Schema:each_field() method
  -- ergodic schema of fields table
  for key, field in self:each_field() do
    -- Also give access to fields by name
    self.fields[key] = field
    if field.type == "record" and field.fields then
      allow_record_fields_by_name(field)
    end

    -- If there are foreign keys
    -- The associated with the foreign key is loaded schema come in
    if field.type == "foreign" then
      local err
      field.schema, err = get_foreign_schema_for_field(field)
      if not field.schema then
        return nil, err
      end

      if not is_subschema then
        -- Store the inverse relation for implementing constraints
        local constraints = assert(_cache[field.reference]).constraints
        table.insert(constraints, {
          schema     = self,
          field_name = key,
          on_delete  = field.on_delete,
        })
      end
    end
  end

  if self.workspaceable and self.name then
    if not _workspaceable[self.name] then
      _workspaceable[self.name] = true
      table.insert(_workspaceable, { schema = self })
    end
  end

  if self.name then
    -- do not reset the constraints list if a schema in reloaded
    if not _cache[self.name] then
      _cache[self.name] = {
        constraints = {},
      }
    end
    -- but always update the schema object in cache
    _cache[self.name].schema = self
  end

  return self
end

Tuples for subordinate inheritance, a false shot.

local Schema       = {}
Schema.__index     = Schema

3.1.2. Entity

Entity simply encapsulates the Schema.

-- definition yes schema structural morphology
function Entity.new(definition)

  -- initialization Schema object
  local self, err = Schema.new(definition)
  if not self then
    return nil, err
  end

  -- ergodic schema fields
  for name, field in self:each_field() do
    if field.nilable then
      return nil, entity_errors.NO_NILABLE:format(name)
    end

    if field.abstract then
      goto continue
    end

    if field.type == "map" then
      if field.keys.type ~= "string" then
        return nil, entity_errors.MAP_KEY_STRINGS_ONLY:format(name)
      end

    elseif field.type == "record" then
      make_records_required(field)

    elseif field.type == "function" then
      return nil, entity_errors.NO_FUNCTIONS:format(name)
    end

    ::continue::
  end

  self.new_subschema = Entity.new_subschema

  return self
end

The Entity object is then loaded into dB In the new() function:

local schemas = {}

do
  -- load schemas
  -- core entities are for now the only source of schemas.
  -- TODO: support schemas from plugins entities as well.

  -- Load core entity,Why the core entity
  -- Because there are plugin Custom entity
  -- these entity yes Kong Own
  for _, entity_name in ipairs(constants.CORE_ENTITIES) do

    -- load schema(Data structure)
    local entity_schema = require("kong.db.schema.entities." .. entity_name)

    -- validate core entities schema via metaschema
    local ok, err_t = MetaSchema:validate(entity_schema)
    if not ok then
      return nil, fmt("schema of entity '%s' is invalid: %s", entity_name,
                      tostring(errors:schema_violation(err_t)))
    end

    -- load entity object
    local entity, err = Entity.new(entity_schema)
    if not entity then
      return nil, fmt("schema of entity '%s' is invalid: %s", entity_name,
                      err)
    end
    schemas[entity_name] = entity

    -- load core entities subschemas
    local subschemas
    ok, subschemas = utils.load_module_if_exists("kong.db.schema.entities." .. entity_name .. "_subschemas")
    if ok then
      for name, subschema in pairs(subschemas) do
        local ok, err = entity:new_subschema(name, subschema)
        if not ok then
          return nil, ("error initializing schema for %s: %s"):format(entity_name, err)
        end
      end
    end
  end
end

3.1.3. DAO

db/dao/init.lua defines a series of methods for database operations, such as:

function DAO:select(primary_key, options)
function DAO:page(size, offset, options)
function DAO:each(size, options)
function DAO:insert(entity, options)
function DAO:update(primary_key, entity, options)
function DAO:delete(primary_key, options)
...

DAO.new() will create a table containing db connection information and entity.

-- schema Parameter is Entity object
--  DB Structure:  local self   = {
  --    daos       = daos,       -- each of those has the connector singleton
  --    strategies = strategies,
  --    connector  = connector,
  --    strategy   = strategy,
  --    errors     = errors,
  --    infos      = connector:infos(),
  --    kong_config = kong_config,
  --  }
function _M.new(db, schema, strategy, errors)
  local fk_methods = generate_foreign_key_methods(schema)
  -- inherit DAO Basic method
  local super      = setmetatable(fk_methods, DAO)

  local self = {
    db         = db,
    schema     = schema,
    strategy   = strategy,
    errors     = errors,
    pagination = utils.shallow_copy(defaults.pagination),
    super      = super,
  }

  if schema.dao then
    -- Plug in custom dao
    local custom_dao = require(schema.dao)
    for name, method in pairs(custom_dao) do
      self[name] = method
    end
  end

  return setmetatable(self, { __index = super })
end

In DB \ init Load all DAO objects in Lua.

  do
    -- load DAOs

    for _, schema in pairs(schemas) do
      local strategy = strategies[schema.name]
      if not strategy then
        return nil, fmt("no strategy found for schema '%s'", schema.name)
      end

      -- Store daos
      daos[schema.name] = DAO.new(self, schema, strategy, errors)
    end
  end

Like the above structure, DB Set tuple for table at the end of new()__ index method.

  -- Set tuple __index method
  -- Access non-existent objects first
  -- DB.xxx Revisit DB.daos.xxx
  return setmetatable(self, DB)
  
  
local DB = {}
DB.__index = function(self, k)
  -- rawget Do not call tuples for __index Method to directly obtain the original data
  return DB[k] or rawget(self, "daos")[k]
end

The database method is called elsewhere in Kong, and the operator is Kong db. services:each_ Fields (), that is, Daos. Is actually called services,entity:each_fields() (actually Schema:each_fields()).

There is also an encapsulated database operation layer under DAO, such as the method of generating SQL statements by postgresql, which will not be repeated here.

The encapsulation of DAO layer by layer shows that I have a real headache. The IDE has not yet provided an intelligent prompt to lua, and the hair can't be displayed by pressing Ctrl.

3.2. Cache build

init_ by_ In the Lua stage, the Master process is initialized to parse the configuration file, connect to the database, empty the shared memory, and build the routing cache.

reset_ kong_ Clean up shared memory in SHM code block.

    local shms = {
      "kong",
      "kong_locks",
      "kong_healthchecks",
      "kong_process_events",
      "kong_cluster_events",
      "kong_rate_limiting_counters",
      "kong_core_db_cache" .. suffix,
      "kong_core_db_cache_miss" .. suffix,
      "kong_db_cache" .. suffix,
      "kong_db_cache_miss" .. suffix,
      "kong_clustering",
    }

    for _, shm in ipairs(shms) do
      local dict = ngx.shared[shm]
      -- Clear shared memory
      if dict then
        dict:flush_all()
        dict:flush_expired(0)
      end
    end

3.2.1. Route cache

  else
    -- DB pattern
    local default_ws = db.workspaces:select_by_name("default")
    kong.default_workspace = default_ws and default_ws.id

    local ok, err = runloop.build_plugins_iterator("init")
    if not ok then
      error("error building initial plugins: " .. tostring(err))
    end

    -- Initialize route
    -- Build routing cache
    assert(runloop.build_router("init"))
  end

  db:close()
end

In DB mode, the last step will call runloop build_ Router ("init") builds the route cache.

In the process of building the Route Cache, judge whether Kong has initialized the Cache component. If the initialization of the Cache is not completed in the init stage, create a Lua table to Cache the Route information. build_ Services_ init_ The Cache () method loads all Services into the table in pages. For the extracted Services, judge whether the currently used Nginx mode (http/stream) corresponds to the protocol specified by the Route. If so, take out the Service object and associate it with the Route. Finally, it is passed to the router The new () method establishes a tree structure and a routing index through an algorithm.

Kong is based on the protocol correspondence supported by Nginx Subsystem:

  • http/https -> http
  • grpc/grpcs -> http
  • tcp/tls -> stream
  build_router = function(version)
    local db = kong.db
    -- table Store all route-service data
    local routes, i = {}, 0

    local err
    -- The router is initially created on init phase, where kong.core_cache is
    -- still not ready. For those cases, use a plain Lua table as a cache
    -- instead
    -- init stage core_cache Initialization is not complete yet
    -- Use here table Store
    local services_init_cache = {}
    if not kong.core_cache and db.strategy ~= "off" then
      -- Get all services,Use default paging parameters
      services_init_cache, err = build_services_init_cache(db)
      if err then
        services_init_cache = {}
        log(WARN, "could not build services init cache: ", err)
      end
    end

    local counter = 0
    local page_size = db.routes.pagination.page_size
    for route, err in db.routes:each(nil, GLOBAL_QUERY_OPTS) do
      if err then
        return nil, "could not load routes: " .. err
      end

      -- inspect router Has the data changed
      -- clear through router hash Consistent judgment
      -- If it has changed, exit the function
      if db.strategy ~= "off" then
        if kong.core_cache and counter > 0 and counter % page_size == 0 then
          local new_version, err = get_router_version()
          if err then
            return nil, "failed to retrieve router version: " .. err
          end

          if new_version ~= version then
            return nil, "router was changed while rebuilding it"
          end
        end
      end

      -- subsystem Whether the protocol of the current route is supported
      if should_process_route(route) then
        -- obtain route of service
        local service, err = get_service_for_route(db, route, services_init_cache)
        if err then
          return nil, err
        end

        local r = {
          route   = route,
          service = service,
        }

        i = i + 1
        -- Store all route-service
        routes[i] = r
      end

      counter = counter + 1
    end

    local new_router, err = Router.new(routes)
    if not new_router then
      return nil, "could not create router: " .. err
    end

    -- router example
    router = new_router

    if version then
      router_version = version
    end

    -- LEGACY - singletons module is deprecated
    singletons.router = router
    -- /LEGACY

    return true
  end

In the process of building the route Cache, judge whether Kong has initialized the Cache component. If the initialization of the Cache is not completed in the init stage, create a Lua table Cache service.

  -- with [service.id] = service
  -- Structure stored in table in
  local function build_services_init_cache(db)
    local services_init_cache = {}

    for service, err in db.services:each(nil, GLOBAL_QUERY_OPTS) do
      if err then
        return nil, err
      end

      services_init_cache[service.id] = service
    end

    return services_init_cache
  end

build_services_init_cache(db) method, call DAO:each() function, and use the default paging parameter page_size=1000, get by page, and then return a single record that can be iterated. This is because init_ by_ The Lua stage does not initialize the cache (kong.core_cache), so Lua table is used to store cache data.

function DAO:each(size, options)
  if size ~= nil then
    validate_size_type(size)
  end

  -- Get paging conditions with default values
  options = get_pagination_options(self, options)

  if size ~= nil then
    local ok, err = validate_size_value(size, options.pagination.max_page_size)
    if not ok then
      local err_t = self.errors:invalid_size(err)
      return nil, tostring(err_t), err_t
    end

  else
    size = options.pagination.page_size
  end

  local ok, errors = validate_options_value(self, options)
  if not ok then
    local err_t = self.errors:invalid_options(errors)
    return nil, tostring(err_t), err_t
  end

  local pager = function(size, offset, options)
    return self.strategy:page(size, offset, options)
  end

  return iteration.by_row(self, pager, size, options)
end

The default paging parameter is dB / strategies / connector In Lua file:

local Connector = {
  defaults = {
    -- Default paging condition
    pagination = {
      page_size     = 1000,
      max_page_size = 50000,
    },
  },
}

Next, it will traverse all Routes and call should one by one_ process_ Route() and get_service_for_route() method. The former will judge whether the Nginx Subsystem is consistent with the route protocol. The latter will first find the Service in the cache. If it does not exist in the cache, it will get it from the database.

  local function get_service_for_route(db, route, services_init_cache)
    -- route Associated service Foreign key
    local service_pk = route.service
    if not service_pk then
      return nil
    end

    -- Find cache table Inside service
    local id = service_pk.id
    local service = services_init_cache[id]
    if service then
      return service
    end

    local err

    -- kong.core_cache is available, not in init phase
    if kong.core_cache then
      -- adopt mlcache query service
      local cache_key = db.services:cache_key(service_pk.id, nil, nil, nil, nil,
                                              route.ws_id)
      -- query cache Get, if not, call load_service_from_db obtain
      service, err = kong.core_cache:get(cache_key, TTL_ZERO,
                                    load_service_from_db, service_pk)

    else -- init phase, kong.core_cache not available

      -- A new service/route has been inserted while the initial route
      -- was being created, on init (perhaps by a different Kong node).
      -- Load the service individually and update services_init_cache with it
      -- Directly query the database for service
      service, err = load_service_from_db(service_pk)
      services_init_cache[id] = service
    end

    if err then
      return nil, "error raised while finding service for route (" .. route.id .. "): " ..
                  err

    elseif not service then
      return nil, "could not find service for route (" .. route.id .. ")"
    end


    -- TODO: this should not be needed as the schema should check it already
    if SUBSYSTEMS[service.protocol] ~= subsystem then
      log(WARN, "service with protocol '", service.protocol,
                "' cannot be used with '", subsystem, "' subsystem")

      return nil
    end

    return service
  end

load_ Service_ from_ The DB () method simply calls the DAO:select() method to fetch the Service and cache it to the services_ init_ In the cache table, do not update Kong core_ Cache component.

In the processing of database entity objects, only create, update and delete will be broadcast to other workers for synchronization through events, which will be described in detail in the following events section.

Next, pass the {router, service} array into router Router in IUA The new() function handles.

local new_router, err = Router.new(routes)
if not new_router then
    return nil, "could not create router: " .. err
end

-- binding router example
router = new_router

The specific process of building routing index is in router In Lua_ M.new(routes) function, using lua-resty-lrucache Packet cache: sort the routing and Service combinations through algorithms, build indexes, store structures such as {cache_key: {route, service}} in the cache, and return the Router instance.

Construction method of routing index Key:

    local cache_key = req_method .. "|" .. req_uri .. "|" .. req_host ..
                      "|" .. ctx.src_ip .. "|" .. ctx.src_port ..
                      "|" .. ctx.dst_ip .. "|" .. ctx.dst_port ..
                      "|" .. ctx.sni

The Router instance is built by the Master process and fork ed to each Worker process for use.

After the Worker completes the construction of shared memory, it registers scheduled tasks and periodically reconstructs the cache.

      -- Periodically rebuild routing cache
      if kong.db.strategy ~= "off" then
        timer_every(worker_state_update_frequency, function(premature)
          if premature then
            return
          end

          -- Don't wait for the semaphore (timeout = 0) when updating via the
          -- timer.
          -- If the semaphore is locked, that means that the rebuild is
          -- already ongoing.
          local ok, err = rebuild_router(ROUTER_ASYNC_OPTS)
          if not ok then
            log(ERR, "could not rebuild router via timer: ", err)
          end
        end)

        timer_every(worker_state_update_frequency, function(premature)
          if premature then
            return
          end

          local ok, err = rebuild_plugins_iterator(PLUGINS_ITERATOR_ASYNC_OPTS)
          if not ok then
            log(ERR, "could not rebuild plugins iterator via timer: ", err)
          end
        end)
      end

3.2.2. Entity cache

Here's the first introduction lua-resty-mlcache This cache library is based on lua_shared_dict and lua-resty-lrucache After two layers of caching, the Worker will have its own process level LRU cache. First, it will query in this layer, then use shared memory for caching, and finally provide callback to query from the database lua-resty-lock The library creates a lock that allows only a single process to perform a callback.

mlcache architecture diagram:

┌─────────────────────────────────────────────────┐
│ Nginx                                           │
│       ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│       │worker     │ │worker     │ │worker     │ │
│ L1    │           │ │           │ │           │ │
│       │ Lua cache │ │ Lua cache │ │ Lua cache │ │
│       └───────────┘ └───────────┘ └───────────┘ │
│             │             │             │       │
│             ▼             ▼             ▼       │
│       ┌───────────────────────────────────────┐ │
│       │                                       │ │
│ L2    │           lua_shared_dict             │ │
│       │                                       │ │
│       └───────────────────────────────────────┘ │
│                           │ mutex               │
│                           ▼                     │
│                  ┌──────────────────┐           │
│                  │     callback     │           │
│                  └────────┬─────────┘           │
└───────────────────────────┼─────────────────────┘
                            │
  L3                        │   I/O fetch
                            ▼

                   Database, API, DNS, Disk, any I/O...

Kong. init_ Initialize cache in worker():

  -- Initialize shared memory based cache
  local cache, err = kong_global.init_cache(kong.configuration, cluster_events, worker_events)
  if not cache then
    stash_init_worker_error("failed to instantiate 'kong.cache' module: " ..
                            err)
    return
  end
  kong.cache = cache

  local core_cache, err = kong_global.init_core_cache(kong.configuration, cluster_events, worker_events)
  if not core_cache then
    stash_init_worker_error("failed to instantiate 'kong.core_cache' module: " ..
                            err)
    return
  end
  kong.core_cache = core_cache

  ok, err = runloop.set_init_versions_in_cache()
  if not ok then
    stash_init_worker_error(err) -- 'err' fully formatted
    return
  end

global. init_ The cache () structure is as follows:

function _GLOBAL.init_cache(kong_config, cluster_events, worker_events)
  local db_cache_ttl = kong_config.db_cache_ttl
  local db_cache_neg_ttl = kong_config.db_cache_neg_ttl
  local cache_pages = 1
  if kong_config.database == "off" then
    db_cache_ttl = 0
    db_cache_neg_ttl = 0
    cache_pages = 2
  end

  return kong_cache.new {
    shm_name          = "kong_db_cache",
    cluster_events    = cluster_events,
    worker_events     = worker_events,
    ttl               = db_cache_ttl,
    neg_ttl           = db_cache_neg_ttl or db_cache_ttl,
    resurrect_ttl     = kong_config.resurrect_ttl,
    cache_pages       = cache_pages,
    resty_lock_opts   = {
      exptime = 10,
      timeout = 5,
    },
  }
end

Eventually, cache.exe will be called In Lua_ M.new() verifies the necessary parameters, detects whether the shared memory block can be accessed, associates cluster events and Worker events, defines serialization and deserialization methods, and encapsulates mlcache in one layer.

function _M.new(opts)
  -- opts validation

  opts = opts or {}

  local mlcaches = {}
  local shm_names = {}

  for i = 1, opts.cache_pages or 1 do
    local channel_name  = (i == 1) and "mlcache"                 or "mlcache_2"
    local shm_name      = (i == 1) and opts.shm_name             or opts.shm_name .. "_2"
    local shm_miss_name = (i == 1) and opts.shm_name .. "_miss"  or opts.shm_name .. "_miss_2"

    if ngx.shared[shm_name] then
      local mlcache, err = resty_mlcache.new(shm_name, shm_name, {
        shm_miss         = shm_miss_name,
        shm_locks        = "kong_locks",
        shm_set_retries  = 3,
        lru_size         = LRU_SIZE,
        ttl              = max(opts.ttl     or 3600, 0),
        neg_ttl          = max(opts.neg_ttl or 300,  0),
        resurrect_ttl    = opts.resurrect_ttl or 30,
        resty_lock_opts  = opts.resty_lock_opts,
        ipc = { -- Function binding for interprocess communication
          register_listeners = function(events)
            for _, event_t in pairs(events) do
              opts.worker_events.register(function(data)
                event_t.handler(data)
              end, channel_name, event_t.channel)
            end
          end,
          broadcast = function(channel, data)
            local ok, err = opts.worker_events.post(channel_name, channel, data)
            if not ok then
              log(ERR, "failed to post event '", channel_name, "', '",
                       channel, "': ", err)
            end
          end
        }
      })
      if not mlcache then
        return nil, "failed to instantiate mlcache: " .. err
      end
      mlcaches[i] = mlcache
      shm_names[i] = shm_name
    end
  end

  local curr_mlcache = 1

  if opts.cache_pages == 2 then
    curr_mlcache = ngx.shared.kong:get("kong:cache:" .. opts.shm_name .. ":curr_mlcache") or 1
  end

  local self          = {
    cluster_events    = opts.cluster_events,
    mlcache           = mlcaches[curr_mlcache],
    mlcaches          = mlcaches,
    shm_names         = shm_names,
    curr_mlcache      = curr_mlcache,
  }

  local ok, err = self.cluster_events:subscribe("invalidations", function(key)
    log(DEBUG, "received invalidate event from cluster for key: '", key, "'")
    self:invalidate_local(key)
  end)
  if not ok then
    return nil, "failed to subscribe to invalidations cluster events " ..
                "channel: " .. err
  end

  _init[opts.shm_name] = true

  return setmetatable(self, mt)
end

After initializing the cache module, the Worker will update the database according to the DB in the configuration file_ cache_ warmup_ Entities loads the specified database resources into memory for caching. The default configuration caches services and plugins.

LRU_ The size value is 500000, and the unit is item. Set the maximum number of items that can be stored. This value indicates that a single Worker LRU Cache occupies a maximum of 500M memory.

The Worker will load the database entity into the shared memory cache according to the configuration item.

local function execute_cache_warmup(kong_config)
  if kong_config.database == "off" then
    return true
  end

  -- Only in one worker Perform operations on
  -- Load database entities into shared memory cache
  if ngx.worker.id() == 0 then
    local ok, err = cache_warmup.execute(kong_config.db_cache_warmup_entities)
    if not ok then
      return nil, err
    end
  end

  return true
end

Here, the database data is only loaded on one Worker process, and then synchronized to other workers.

cache_warmup.execute() checks the basic information, then calls cache_. warmup_ single_ Entity (DAO) method.

-- Load database entities into cache for faster access
-- stay Worker Initialization phase run
-- Default load service, plugins
-- Size is not configured mem_cache_size influence
-- Loads entities from the database into the cache, for rapid subsequent
-- access. This function is intented to be used during worker initialization.
function cache_warmup.execute(entities)
  if not kong.cache or not kong.core_cache then
    return true
  end

  for _, entity_name in ipairs(entities) do
    if entity_name == "routes" then
      -- do not spend shm memory by caching individual Routes entries
      -- because the routes are kept in-memory by building the router object
      kong.log.notice("the 'routes' entry is ignored in the list of ",
                      "'db_cache_warmup_entities' because Kong ",
                      "caches routes in memory separately")
      goto continue
    end

    local dao = kong.db[entity_name]
    if not (type(dao) == "table" and dao.schema) then
      kong.log.warn(entity_name, " is not a valid entity name, please check ",
                    "the value of 'db_cache_warmup_entities'")
      goto continue
    end

    local ok, err = cache_warmup_single_entity(dao)
    if not ok then
      if err == "no memory" then
        kong.log.warn("cache warmup has been stopped because cache ",
                      "memory is exhausted, please consider increasing ",
                      "the value of 'mem_cache_size' (currently at ",
                      kong.configuration.mem_cache_size, ")")

        return true
      end
      return nil, err
    end

    ::continue::
  end

  return true
end

Route s are not cached, because Routes have been built as a routing index tree in the previous section and fork ed into all Worker memory.

local function cache_warmup_single_entity(dao)
  local entity_name = dao.schema.name

  -- Selected storage location cache/core_cache
  local cache_store = constants.ENTITY_CACHE_STORE[entity_name]
  -- cache Global object
  local cache = kong[cache_store]

  ngx.log(ngx.NOTICE, "Preloading '", entity_name, "' into the ", cache_store, "...")

  local start = ngx.now()

  local hosts_array, hosts_set, host_count
  if entity_name == "services" then
    hosts_array = {}
    hosts_set = {}
    host_count = 0
  end

  for entity, err in dao:each(nil, GLOBAL_QUERY_OPTS) do
    if err then
      return nil, err
    end

    if entity_name == "services" then
      if utils.hostname_type(entity.host) == "name"
         and hosts_set[entity.host] == nil then
        host_count = host_count + 1
        hosts_array[host_count] = entity.host
        hosts_set[entity.host] = true
      end
    end

    -- obtain cache_key
    local cache_key = dao:cache_key(entity)

    -- call mlcache of safe_set method,
    -- An error will be reported if the memory is insufficient
    local ok, err = cache:safe_set(cache_key, entity)
    if not ok then
      return nil, err
    end
  end

  if entity_name == "services" and host_count > 0 then
    ngx.timer.at(0, warmup_dns, hosts_array, host_count)
  end

  local elapsed = math.floor((ngx.now() - start) * 1000)

  ngx.log(ngx.NOTICE, "finished preloading '", entity_name,
                      "' into the ", cache_store, " (in ", tostring(elapsed), "ms)")
  return true
end

cache_warmup_single_entity() will load all the data of the dao into memory, and the set method will distribute event synchronization data to other workers. Finally, each Worker will cache a copy.

3.3. event subscriptions

Kong. init_ Initialize Worker events and cluster events in Worker().

  local worker_events, err = kong_global.init_worker_events()
  if not worker_events then
    stash_init_worker_error("failed to instantiate 'kong.worker_events' " ..
                            "module: " .. err)
    return
  end
  kong.worker_events = worker_events

  local cluster_events, err = kong_global.init_cluster_events(kong.configuration, kong.db)
  if not cluster_events then
    stash_init_worker_error("failed to instantiate 'kong.cluster_events' " ..
                            "module: " .. err)
    return
  end
  kong.cluster_events = cluster_events

Worker events are used internally lua-resty-worker-events The principle of interprocess event processing implemented by the library is to store events through shared memory and pull events in shared memory every second for processing.

function _GLOBAL.init_worker_events()
  -- Note: worker_events will not work correctly if required at the top of the file.
  --       It must be required right here, inside the init function
  local worker_events = require "resty.worker.events"

  local ok, err = worker_events.configure {
    shm = "kong_process_events", -- defined by "lua_shared_dict"
    timeout = 5,            -- life time of event data in shm
    interval = 1,           -- poll interval (seconds)

    wait_interval = 0.010,  -- wait before retry fetching event data
    wait_max = 0.5,         -- max wait time before discarding event
  }
  if not ok then
    return nil, err
  end

  return worker_events
end

Cluster events (communication between multiple Kong s) are processed by storing events in the database, regularly polling the database to query events.

function _GLOBAL.init_cluster_events(kong_config, db)
  return kong_cluster_events.new({
    db            = db,
    poll_interval = kong_config.db_update_frequency,
    poll_offset   = kong_config.db_update_propagation,
    poll_delay    = kong_config.db_update_propagation,
  })
end

It can be seen from here that cluster events are implemented through database tables:

function _M:broadcast(channel, data, delay)
  if type(channel) ~= "string" then
    return nil, "channel must be a string"
  end

  if type(data) ~= "string" then
    return nil, "data must be a string"
  end

  if delay and type(delay) ~= "number" then
    return nil, "delay must be a number"

  elseif self.poll_delay > 0 then
    delay = self.poll_delay
  end

  -- insert event row

  --log(DEBUG, "broadcasting on channel: '", channel, "' data: ", data,
  --           " with delay: ", delay and delay or "none")

  local ok, err = self.strategy:insert(self.node_id, channel, nil, data, delay)
  if not ok then
    return nil, err
  end

  return true
end


function _M:subscribe(channel, cb, start_polling)
  if type(channel) ~= "string" then
    return error("channel must be a string")
  end

  if type(cb) ~= "function" then
    return error("callback must be a function")
  end

  if not self.callbacks[channel] then
    self.callbacks[channel] = { cb }

    insert(self.channels, channel)

  else
    insert(self.callbacks[channel], cb)
  end

  if start_polling == nil then
    start_polling = true
  end

  if not self.polling and start_polling and self.use_polling then
    -- start recurring polling timer

    local ok, err = timer_at(self.poll_interval, poll_handler, self)
    if not ok then
      return nil, "failed to start polling timer: " .. err
    end

    self.polling = true
  end

  return true
end

In cache Cluster events in Lua subscribe to cache invalidation events, and internally call the delete method of mlcache to synchronize to all workers.

  local ok, err = self.cluster_events:subscribe("invalidations", function(key)
    log(DEBUG, "received invalidate event from cluster for key: '", key, "'")
    self:invalidate_local(key)
  end)


function _M:invalidate_local(key, shadow)
  if type(key) ~= "string" then
    error("key must be a string", 2)
  end

  log(DEBUG, "invalidating (local): '", key, "'")

  local current_page = self.curr_mlcache or 1
  local delete_page
  if shadow and #self.mlcaches == 2 then
    delete_page = current_page == 1 and 2 or 1
  else
    delete_page = current_page
  end

  local ok, err = self.mlcaches[delete_page]:delete(key)
  if not ok then
    log(ERR, "failed to delete entity from node cache: ", err)
  end
end

This part mainly describes the event related operations during Kong initialization, mainly initializing event subscription, IPC inter process communication associated with mlcache, subscribing to cache failure events, and associating DAO event publishing.

function DB:set_events_handler(events)
  for _, dao in pairs(self.daos) do
    dao.events = events
  end
end

4. Event distribution

Many parts of Kong pass through non blocking NGX timer. At () and NGX timer. The every() function performs scheduled tasks. This part is relatively scattered. It mainly describes that Kong performs non blocking one-time event processing and typical timing tasks.

4.1. Single task

4.1.1. DNS resolution

In cache_ warmup. When caching the services object in Lua, Kong will get the ip corresponding to the host in the services non blocking.

  if entity_name == "services" and host_count > 0 then
    ngx.timer.at(0, warmup_dns, hosts_array, host_count)
  end

local function warmup_dns(premature, hosts, count)
  if premature then
    return
  end

  ngx.log(ngx.NOTICE, "warming up DNS entries ...")

  local start = ngx.now()

  for i = 1, count do
    kong.dns.toip(hosts[i])
  end

  local elapsed = math.floor((ngx.now() - start) * 1000)

  ngx.log(ngx.NOTICE, "finished warming up DNS entries",
                      "' into the cache (in ", tostring(elapsed), "ms)")
end

Use of Kong internal dns module lua-resty-dns-client , this library is also open source by Kong. It features the toip function, which configures the weighted polling weight according to the weight of the ip returned by dns, and stores the dns query results in memory.

warmup_ Call Kong. In dns() dns. Toip() method:

local function warmup_dns(premature, hosts, count)
  if premature then
    return
  end

  ngx.log(ngx.NOTICE, "warming up DNS entries ...")

  local start = ngx.now()

  for i = 1, count do
    kong.dns.toip(hosts[i])
  end

  local elapsed = math.floor((ngx.now() - start) * 1000)

  ngx.log(ngx.NOTICE, "finished warming up DNS entries",
                      "' into the cache (in ", tostring(elapsed), "ms)")
end

4.2. Timed task

4.2.1. Cluster task

cluster_ events/init. Enable the timer in the cluster event subscription function in Lua to poll the database cluster event table.

function _M:subscribe(channel, cb, start_polling)
  if type(channel) ~= "string" then
    return error("channel must be a string")
  end

  if type(cb) ~= "function" then
    return error("callback must be a function")
  end

  if not self.callbacks[channel] then
    self.callbacks[channel] = { cb }

    insert(self.channels, channel)

  else
    insert(self.callbacks[channel], cb)
  end

  if start_polling == nil then
    start_polling = true
  end

  if not self.polling and start_polling and self.use_polling then
    -- start recurring polling timer

    local ok, err = timer_at(self.poll_interval, poll_handler, self)
    if not ok then
      return nil, "failed to start polling timer: " .. err
    end

    self.polling = true
  end

  return true
end

NGX is not used here because it is necessary to judge the lock every time the loop is called timer. Instead of the every () function, NGX. X is called in an infinite loop timer. at().

poll_handler = function(premature, self)
  if premature or not self.polling then
    -- set self.polling to false to stop a polling loop
    return
  end

  if not get_lock(self) then
    local ok, err = timer_at(self.poll_interval, poll_handler, self)
    if not ok then
      log(CRIT, "failed to start recurring polling timer: ", err)
    end

    return
  end

  -- single worker

  local pok, perr, err = pcall(poll, self)
  if not pok then
    log(ERR, "poll() threw an error: ", perr)

  elseif not perr then
    log(ERR, "failed to poll: ", err)
  end

  -- unlock

  self.shm:delete(POLL_RUNNING_LOCK_KEY)

  local ok, err = timer_at(self.poll_interval, poll_handler, self)
  if not ok then
    log(CRIT, "failed to start recurring polling timer: ", err)
  end
end

The lock ensures that only one Worker performs a single task through shared memory events.

local function get_lock(self)
  -- check if a poll is not currently running, to ensure we don't start
  -- another poll while a worker is still stuck in its own polling (in
  -- case it is being slow)
  -- we still add an exptime to this lock in case something goes horribly
  -- wrong, to ensure other workers can poll new events
  -- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min
  local ok, err = self.shm:safe_add(POLL_RUNNING_LOCK_KEY, true,
                                    max(self.poll_interval * 5, 10))
  if not ok then
    if err ~= "exists" then
      log(ERR, "failed to acquire poll_running lock: ", err)
    end
    -- else
    --   log(DEBUG, "failed to acquire poll_running lock: ",
    --              "a worker still holds the lock")

    return false
  end

  if self.poll_interval > 0.001 then
    -- check if interval of `poll_interval` has elapsed already, to ensure
    -- we do not run the poll when a previous poll was quickly executed, but
    -- another worker got the timer trigger a bit too late.
    ok, err = self.shm:safe_add(POLL_INTERVAL_LOCK_KEY, true,
                                self.poll_interval - 0.001)
    if not ok then
      if err ~= "exists" then
        log(ERR, "failed to acquire poll_interval lock: ", err)
      end
      -- else
      --   log(DEBUG, "failed to acquire poll_interval lock: ",
      --              "not enough time elapsed since last poll")

      self.shm:delete(POLL_RUNNING_LOCK_KEY)

      return false
    end
  end

  return true
end

4.2.2. Database TTL

To add TTL to PostgreSQL, Kong uses init_ The worker stage calls the database layer dB / strategies / Postgres / connector Init in Lua_ Worker() function.

-- Some contents are omitted below and only key parts are shown
function _mt:init_worker(strategies)
  if ngx.worker.id() == 0 then

      cleanup_statements[i] = concat {
        "  DELETE FROM ",
        self:escape_identifier(table_name),
        " WHERE ",
        column_name,
        " < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';"
      }

    local cleanup_statement = concat(cleanup_statements, "\n")

    return timer_every(60, function(premature)

      local ok, err, _, num_queries = self:query(cleanup_statement)
      if not ok then
        if num_queries then
          for i = num_queries + 1, cleanup_statements_count do
            local statement = cleanup_statements[i]
            local ok, err = self:query(statement)
            if not ok then
              if err then
                log(WARN, "unable to clean expired rows from table '",
                          sorted_strategies[i], "' on PostgreSQL database (",
                          err, ")")
              else
                log(WARN, "unable to clean expired rows from table '",
                          sorted_strategies[i], "' on PostgreSQL database")
              end
            end
          end

        else
          log(ERR, "unable to clean expired rows from PostgreSQL database (", err, ")")
        end
      end
    end)
  end

  return true
end

When database initialization, add a timer, call the callback function in the association, and delete the TTL expired rows.

4.2.3. Update routing index

kong.init_worker() will add scheduled tasks and update the cache regularly.

      -- Periodically rebuild routing cache
      if kong.db.strategy ~= "off" then
        timer_every(worker_state_update_frequency, function(premature)
          if premature then
            return
          end

          -- Don't wait for the semaphore (timeout = 0) when updating via the
          -- timer.
          -- If the semaphore is locked, that means that the rebuild is
          -- already ongoing.
          local ok, err = rebuild_router(ROUTER_ASYNC_OPTS)
          if not ok then
            log(ERR, "could not rebuild router via timer: ", err)
          end
        end)

        timer_every(worker_state_update_frequency, function(premature)
          if premature then
            return
          end

          local ok, err = rebuild_plugins_iterator(PLUGINS_ITERATOR_ASYNC_OPTS)
          if not ok then
            log(ERR, "could not rebuild plugins iterator via timer: ", err)
          end
        end)
      end

The actual call sequence is to open a cosocket protocol to judge whether the routes have changed. If so, the route cache will be reconstructed.

  rebuild_router = function(opts)
    return rebuild("router", update_router, router_version, opts)
  end

local function rebuild(name, callback, version, opts)
  local current_version, err = kong.core_cache:get(name .. ":version", TTL_ZERO,
                                                   utils.uuid)
  if err then
    return nil, "failed to retrieve " .. name .. " version: " .. err
  end

  if current_version == version then
    return true
  end
	
  -- Open one cosocket Coroutine call callback
  return concurrency.with_coroutine_mutex(opts, callback)
end

  update_router = function()
    -- we might not need to rebuild the router (if we were not
    -- the first request in this process to enter this code path)
    -- check again and rebuild only if necessary
    local version, err = get_router_version()
    if err then
      return nil, "failed to retrieve router version: " .. err
    end

    if version == router_version then
      return true
    end

    local ok, err = build_router(version)
    if not ok then
      return nil, --[[ 'err' fully formatted ]] err
    end

    return true
  end

Finally, build will be called_ Router () method, which we have described in 1.2.1.

5. Event handling

The Lua rest Worker events library is used for event processing among workers.

Event subscription function: events register(callback, source, event1, event2, ...), Callback method callback = function(data, event, source, pid).

Event publishing function: success, err = events post(source, event, data, unique)

5.1. Database events

db/dao/init.lua defines DAO related operation methods, which I have briefly described in 1.1.3.

The CRUD (actually no R) event of the database related entity will finally call Dao: post_ crud_ The event () method broadcasts events.

function DAO:post_crud_event(operation, entity, old_entity, options)
  if options and options.no_broadcast_crud_event then
    return
  end

  if self.events then
    local entity_without_nulls
    if entity then
      entity_without_nulls = remove_nulls(utils.deep_copy(entity, false))
    end

    local old_entity_without_nulls
    if old_entity then
      old_entity_without_nulls = remove_nulls(utils.deep_copy(old_entity, false))
    end

    local ok, err = self.events.post_local("dao:crud", operation, {
      operation  = operation,
      schema     = self.schema,
      entity     = entity_without_nulls,
      old_entity = old_entity_without_nulls,
    })
    if not ok then
      log(ERR, "[db] failed to propagate CRUD operation: ", err)
    end
  end
end

An event is published in the dao:crud channel. The operation types are create, update and delete.

runloop/handler. Register in Lua_ Events () will be in Kong init_ Called in worker (), it will subscribe to database related events and add processing functions.

  worker_events.register(function(data)
    if not data.schema then
      log(ERR, "[events] missing schema in crud subscriber")
      return
    end

    if not data.entity then
      log(ERR, "[events] missing entity in crud subscriber")
      return
    end

    -- invalidate this entity anywhere it is cached if it has a
    -- caching key
    -- If entity have cache_key Then let it fail
    -- Basically only entity schema There will be no if the definition is wrong cache_key

    local cache_key = db[data.schema.name]:cache_key(data.entity)
    local cache_obj = kong[constants.ENTITY_CACHE_STORE[data.schema.name]]

    if cache_key then
      cache_obj:invalidate(cache_key)
    end

    -- if we had an update, but the cache key was part of what was updated,
    -- we need to invalidate the previous entity as well

    if data.old_entity then
      local old_cache_key = db[data.schema.name]:cache_key(data.old_entity)
      if old_cache_key and cache_key ~= old_cache_key then
        cache_obj:invalidate(old_cache_key)
      end
    end

    if not data.operation then
      log(ERR, "[events] missing operation in crud subscriber")
      return
    end

    -- public worker events propagation

    -- obtain schema name
    local entity_channel           = data.schema.table or data.schema.name
    local entity_operation_channel = fmt("%s:%s", entity_channel,
      data.operation)

    -- crud:routes
    local ok, err = worker_events.post_local("crud", entity_channel, data)
    if not ok then
      log(ERR, "[events] could not broadcast crud event: ", err)
      return
    end

    -- crud:routes:create
    ok, err = worker_events.post_local("crud", entity_operation_channel, data)
    if not ok then
      log(ERR, "[events] could not broadcast crud event: ", err)
      return
    end
  end, "dao:crud")

CRUD (no R) event processing flow: call the cache:invalidate() method. A worker level event is published inside the method to notify the worker process to delete the data. A cluster event is also published to synchronously delete the data between clusters.

  -- Modified Routes It will be emptied after router:version Cache,
  -- This will cause the routing table to be rebuilt. See 2 for details.2.3
  worker_events.register(function()
    log(DEBUG, "[events] Route updated, invalidating router")
    core_cache:invalidate("router:version")
  end, "crud", "routes")

...The same is true for other objects

6. Plug in loading

6.1. Plug in read

The init stage will load the plug-in list of plugins = bundled and skywalking integrator in the configuration file, and call Lua require to load the corresponding package. (all plug-in packages must be under kong.plugins)

function Plugins:load_plugin_schemas(plugin_set)
  self.handlers = nil

  local go_plugins_cnt = 0
  local handlers = {}
  local errs

  -- load installed plugins
  for plugin in pairs(plugin_set) do
    local handler, err = load_plugin(self, plugin)

    if handler then
      if type(handler.is) == "function" and handler:is(BasePlugin) then
        -- Backwards-compatibility for 0.x and 1.x plugins inheriting from the
        -- BasePlugin class.
        -- TODO: deprecate & remove
        handler = handler()
      end

      if handler._go then
        go_plugins_cnt = go_plugins_cnt + 1
      end

      handlers[plugin] = handler

    else
      errs = errs or {}
      table.insert(errs, "on plugin '" .. plugin .. "': " .. tostring(err))
    end
  end

  if errs then
    return nil, "error loading plugin schemas: " .. table.concat(errs, "; ")
  end

  reports.add_immutable_value("go_plugins_cnt", go_plugins_cnt)

  self.handlers = handlers

  return true
end

The Handler functions of all plug-ins will be stored in Kong db. plugins. Handlers, the data format is {plugin_name: handler}.

All plug-ins will be stored in the Worker process and updated periodically.

I sorted out the structure of the plug-in table loaded into Lua table and output it into YAML for easy understanding:

map:
  plugin_name: true

combos:
  plugin_name:
    # both: {}
    both: 
      route_id: service_id
    # routes: {}
    routes:
      route_id: true
    # services: {}
    services:
      service_id: true
    0: true # Global plug-in
    1: true # Routing plug-in
    2: true # Service plug-in
    3: true # Routing + Service
    4: true # Consumer plug-in
    5: true # Routing + Consumer plug-in
    6: true # Routing + Service+Consumer plug-in

loaded:
  plugin_name:
    handler:
      phase_name: func()

6.2. Plug in call

The plug-in does not bind to the route directly. The plug-in has its own life cycle, which is basically the same as that of Kong. The corresponding methods of the plug-in will be called at each stage of the Kong life cycle.

The plug-in only judges whether to associate the current Route, Service, and Consumer at the call stage. If so, read the configuration item (plug-in Entity) associated with the plug-in from the database and use Kong core_ Cache.

local function load_configuration_through_combos(ctx, combos, plugin)
  local plugin_configuration
  local name = plugin.name

  local route    = ctx.route
  local service  = ctx.service
  local consumer = ctx.authenticated_consumer

  if route and plugin.no_route then
    route = nil
  end
  if service and plugin.no_service then
    service = nil
  end
  if consumer and plugin.no_consumer then
    consumer = nil
  end

  local    route_id = route    and    route.id or nil
  local  service_id = service  and  service.id or nil
  local consumer_id = consumer and consumer.id or nil

  if kong.db.strategy == "off" then
	...
  else
    if route_id and service_id and consumer_id and combos[COMBO_RSC]
      and combos.both[route_id] == service_id
    then
      plugin_configuration = load_configuration(ctx, name, route_id, service_id,
                                                consumer_id)
      if plugin_configuration then
        return plugin_configuration
      end
    end

    if consumer_id and combos[COMBO_C] then
      plugin_configuration = load_configuration(ctx, name, nil, nil, consumer_id)
      if plugin_configuration then
        return plugin_configuration
      end
    end

    if route_id and combos[COMBO_R] and combos.routes[route_id] then
      plugin_configuration = load_configuration(ctx, name, route_id)
      if plugin_configuration then
        return plugin_configuration
      end
    end
	
    ...

    if combos[COMBO_GLOBAL] then
      return load_configuration(ctx, name)
    end
  end
end

Whether the current Service, Route and Consumer are paired with a plug-in will be queried here. If successful, the corresponding configuration items will be loaded:

--- Load the configuration for a plugin entry.
-- Given a Route, Service, Consumer and a plugin name, retrieve the plugin's
-- configuration if it exists. Results are cached in ngx.dict
-- @param[type=string] name Name of the plugin being tested for configuration.
-- @param[type=string] route_id Id of the route being proxied.
-- @param[type=string] service_id Id of the service being proxied.
-- @param[type=string] consumer_id Id of the donsumer making the request (if any).
-- @treturn table Plugin configuration, if retrieved.
local function load_configuration(ctx,
                                  name,
                                  route_id,
                                  service_id,
                                  consumer_id)
  local ws_id = workspaces.get_workspace_id() or kong.default_workspace
  local key = kong.db.plugins:cache_key(name,
                                        route_id,
                                        service_id,
                                        consumer_id,
                                        nil,
                                        ws_id)
  local plugin, err = kong.core_cache:get(key,
                                          nil,
                                          load_plugin_from_db,
                                          key)
  if err then
    ctx.delay_response = false
    ngx.log(ngx.ERR, tostring(err))
    return ngx.exit(ngx.ERROR)
  end

  if not plugin or not plugin.enabled then
    return
  end

  local cfg = plugin.config or {}

  if not cfg.__key__ then
    cfg.__key__ = key
    cfg.__seq__ = next_seq
    next_seq = next_seq + 1
  end

  cfg.route_id    = plugin.route and plugin.route.id
  cfg.service_id  = plugin.service and plugin.service.id
  cfg.consumer_id = plugin.consumer and plugin.consumer.id

  return cfg
end

There are two ways to call plug-ins:

  1. Synchronous call
  2. Asynchronous call

Except access_ by_ In the Lua phase, synchronous calls are used:

local function execute_plugins_iterator(plugins_iterator, phase, ctx)
  local old_ws = ctx and ctx.workspace
  for plugin, configuration in plugins_iterator:iterate(phase, ctx) do
    if ctx then
      if plugin.handler._go then
        ctx.ran_go_plugin = true
      end

      kong_global.set_named_ctx(kong, "plugin", plugin.handler)
    end

    kong_global.set_namespaced_log(kong, plugin.name)
    -- Here is the synchronous call
    plugin.handler[phase](plugin.handler, configuration)
    kong_global.reset_log(kong)

    if ctx then
      ctx.workspace = old_ws
    end
  end
end

And in access_ by_ In the Lua phase, use the coprocessor to call asynchronously:

  for plugin, plugin_conf in plugins_iterator:iterate("access", ctx) do
    if plugin.handler._go then
      ctx.ran_go_plugin = true
    end

    if not ctx.delayed_response then
      kong_global.set_named_ctx(kong, "plugin", plugin.handler)
      kong_global.set_namespaced_log(kong, plugin.name)

      -- use Lua coroutine Open the coroutine to call the plug-in function asynchronously
      local err = coroutine.wrap(plugin.handler.access)(plugin.handler, plugin_conf)
      if err then
        kong.log.err(err)
        ctx.delayed_response = {
          status_code = 500,
          content     = { message  = "An unexpected error occurred" },
        }
      end

      kong_global.reset_log(kong)
    end
    ctx.workspace = old_ws
  end

7. Caching mechanism

Based on my analysis of Kong's source code, this section makes a brief review of the caching mechanism.

Kong has these operations for caching:

  • Initialize cache block
  • Preload database content to cache
  • Add the data content loaded only when accessing to the cache
  • timer periodically updates the cache
  • Database CRUD operation delete cache
  • Synchronous cache between clusters / workers

Cache load content:

Under the default configuration, Kong loads all routing tables and Routes into the memory of each Worker, and all Services and Plugins into the memory and shared memory of each Worker. Upstream and Targets are timely obtained from the database according to the analysis of the load balancer and loaded into memory and shared memory.

The above Entity is loaded in the L1+L2 two-level cache core created by the mlcache library_ In cache.

consumers are loaded into a cache with a different name created for mlcache.

8. Request lifecycle

This section describes the process of a request processed by Kong.

8.1. ssl_certificate_by_lua stage

local function execute()
  local sn, err = server_name()
  if err then
    log(ERR, "could not retrieve SNI: ", err)
    return ngx.exit(ngx.ERROR)
  end

  local cert_and_key, err = find_certificate(sn)
  if err then
    log(ERR, err)
    return ngx.exit(ngx.ERROR)
  end

  if cert_and_key == default_cert_and_key then
    -- use (already set) fallback certificate
    return
  end

  -- set the certificate for this connection

  local ok, err = clear_certs()
  if not ok then
    log(ERR, "could not clear existing (default) certificates: ", err)
    return ngx.exit(ngx.ERROR)
  end

  ok, err = set_cert(cert_and_key.cert)
  if not ok then
    log(ERR, "could not set configured certificate: ", err)
    return ngx.exit(ngx.ERROR)
  end

  ok, err = set_priv_key(cert_and_key.key)
  if not ok then
    log(ERR, "could not set configured private key: ", err)
    return ngx.exit(ngx.ERROR)
  end
end

Find the corresponding SSL certificate Cert and private key according to the Server Name and set them on Nginx.

8.2. rewrite_by_lua stage

  local ctx = ngx.ctx
  if not ctx.KONG_PROCESSING_START then
    ctx.KONG_PROCESSING_START = ngx.req.start_time() * 1000
  end

  if not ctx.KONG_REWRITE_START then
    ctx.KONG_REWRITE_START = get_now_ms()
  end

  kong_global.set_phase(kong, PHASES.rewrite)
  kong_resty_ctx.stash_ref()

  local is_https = var.https == "on"
  if not is_https then
    log_init_worker_errors(ctx)
  end

  runloop.rewrite.before(ctx)

...

  rewrite = {
    before = function(ctx)
      ctx.host_port = HOST_PORTS[var.server_port] or var.server_port

      -- special handling for proxy-authorization and te headers in case
      -- the plugin(s) want to specify them (store the original)
      ctx.http_proxy_authorization = var.http_proxy_authorization
      ctx.http_te                  = var.http_te
    end,
  },

Initialize Kong CTX lifecycle Context, add request information for Context.

8.3. access_by_lua stage

8.3.1. Route matching

runloop.access.before will call the router instance for route matching. First, get is called_ updated_ Router () determines whether there is a route update. If not, the current router instance is returned.

      -- routing request
      local router = get_updated_router()
	  -- call Router.exec() Find matching routes
      local match_t = router.exec()
      if not match_t then
        return kong.response.exit(404, { message = "no Route matched with those values" })
      end

Router. The exec () method will eventually call router find_ Route() method, which receives the request header field, generates the route Cache Key, and finds the corresponding item.

  local function find_route(req_method, req_uri, req_host, req_scheme,
                            src_ip, src_port,
                            dst_ip, dst_port,
                            sni, req_headers)
    
    req_method = req_method or ""
    req_uri = req_uri or ""
    req_host = req_host or ""
    req_headers = req_headers or EMPTY_T

    ctx.req_method     = req_method
    ctx.req_uri        = req_uri
    ctx.req_host       = req_host
    ctx.req_headers    = req_headers
    ctx.src_ip         = src_ip or ""
    ctx.src_port       = src_port or ""
    ctx.dst_ip         = dst_ip or ""
    ctx.dst_port       = dst_port or ""
    ctx.sni            = sni or ""
    
    local cache_key = req_method .. "|" .. req_uri .. "|" .. req_host ..
                      "|" .. ctx.src_ip .. "|" .. ctx.src_port ..
                      "|" .. ctx.dst_ip .. "|" .. ctx.dst_port ..
                      "|" .. ctx.sni

    do
      local match_t = cache:get(cache_key)
      if match_t and hits.header_name == nil then
        return match_t
      end
    end

If there is a matching route in the LRU cache, it returns directly.

Otherwise, continue to match the route, generate matching items and store them in the cache.

              ...
              local match_t     = {
                  route           = matched_route.route,
                  service         = matched_route.service,
                  headers         = matched_route.headers,
                  upstream_url_t  = upstream_url_t,
                  upstream_scheme = upstream_url_t.scheme,
                  upstream_uri    = upstream_uri,
                  upstream_host   = upstream_host,
                  prefix          = request_prefix,
                  matches         = {
                    uri_captures  = matches.uri_captures,
                    uri           = matches.uri,
                    host          = matches.host,
                    headers       = matches.headers,
                    method        = matches.method,
                    src_ip        = matches.src_ip,
                    src_port      = matches.src_port,
                    dst_ip        = matches.dst_ip,
                    dst_port      = matches.dst_port,
                    sni           = matches.sni,
                  }
                }

                if band(matched_route.match_rules, MATCH_RULES.HEADER) == 0 then
                  cache:set(cache_key, match_t)
                end
                ...

After successful matching, the associated Route and Service will be written to NGX CTX, shared in the next lifecycle.

8.3.2. Request scheduling

runloop.access.after parses the IP, Port, Schema and other parameters to be requested by the backend according to Route, Service and other conditions.

-- looks up a balancer for the target.
-- @param target the table with the target details
-- @param no_create (optional) if true, do not attempt to create
-- (for thorough testing purposes)
-- @return balancer if found, `false` if not found, or nil+error on error
local function get_balancer(target, no_create)
  -- NOTE: only called upon first lookup, so `cache_only` limitations
  -- do not apply here
  local hostname = target.host


  -- first go and find the upstream object, from cache or the db
  local upstream, err = get_upstream_by_name(hostname)
  if upstream == false then
    return false -- no upstream by this name
  end
  if err then
    return nil, err -- there was an error
  end

  local balancer = balancers[upstream.id]
  if not balancer then
    if no_create then
      return nil, "balancer not found"
    else
      log(ERR, "balancer not found for ", upstream.name, ", will create it")
      return create_balancer(upstream), upstream
    end
  end

  return balancer, upstream
end

get_ The balancer () returns the Target of the final request and the load balancer according to the Host of the Service.

  local ip, port, hostname, handle
  if balancer then
    -- have to invoke the ring-balancer
    local hstate = run_hook("balancer:get_peer:pre", target.host)
    ip, port, hostname, handle = balancer:getPeer(dns_cache_only,
                                          target.balancer_handle,
                                          hash_value)
    run_hook("balancer:get_peer:post", hstate)
    if not ip and
      (port == "No peers are available" or port == "Balancer is unhealthy") then
      return nil, "failure to get a peer from the ring-balancer", 503
    end
    hostname = hostname or ip
    target.hash_value = hash_value
    target.balancer_handle = handle

  else
    -- have to do a regular DNS lookup
    local try_list
    local hstate = run_hook("balancer:to_ip:pre", target.host)
    ip, port, try_list = toip(target.host, target.port, dns_cache_only)
    run_hook("balancer:to_ip:post", hstate)
    hostname = target.host
    if not ip then
      log(ERR, "DNS resolution failed: ", port, ". Tried: ", tostring(try_list))
      if port == "dns server error: 3 name error" or
         port == "dns client error: 101 empty record received" then
        return nil, "name resolution failed", 503
      end
    end
  end

Call the policy of the load balancer to obtain the IP address of the Target, or directly use DNS query to obtain the IP address. In this step, DNS pre caching has been carried out in advance in 2.1.1, which can be read from the cache.

If the Service Host is the IP address directly, the load balancing policy is not executed.

  -- ip Return directly
  if target.type ~= "name" then
    -- it's an ip address (v4 or v6), so nothing we can do...
    target.ip = target.host
    target.port = target.port or 80 -- TODO: remove this fallback value
    target.hostname = target.host
    return true
  end

8.4. balancer_by_lua stage

Use NGX balancer. set_ more_ Tries() sets the number of error retries, using NGX balancer. get_ last_ Failure() obtains the details of the last request error and performs passive health check on the upstream node in error processing.

  if balancer_data.try_count > 1 then
    -- only call balancer on retry, first one is done in `runloop.access.after`
    -- which runs in the ACCESS context and hence has less limitations than
    -- this BALANCER context where the retries are executed

    -- record failure data
    local previous_try = tries[balancer_data.try_count - 1]
    previous_try.state, previous_try.code = get_last_failure()

    -- Report HTTP status for health checks
    local balancer = balancer_data.balancer
    if balancer then
      if previous_try.state == "failed" then
        if previous_try.code == 504 then
          balancer.report_timeout(balancer_data.balancer_handle)
        else
          balancer.report_tcp_failure(balancer_data.balancer_handle)
        end

      else
        balancer.report_http_status(balancer_data.balancer_handle,
                                    previous_try.code)
      end
    end

    local ok, err, errcode = balancer_execute(balancer_data)
    if not ok then
      ngx_log(ngx_ERR, "failed to retry the dns/balancer resolver for ",
              tostring(balancer_data.host), "' with: ", tostring(err))

      ctx.KONG_BALANCER_ENDED_AT = get_now_ms()
      ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_BALANCER_START
      ctx.KONG_PROXY_LATENCY = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_PROCESSING_START

      return ngx.exit(errcode)
    end

  else
    -- first try, so set the max number of retries
    local retries = balancer_data.retries
    if retries > 0 then
      set_more_tries(retries)
    end
  end

Request to the final resolved back-end service, using NGX balancer. set_ current_ The peer () method sets the access address.

  -- set the targets as resolved
  ngx_log(ngx_DEBUG, "setting address (try ", balancer_data.try_count, "): ",
                     balancer_data.ip, ":", balancer_data.port)
  -- Address of final dispatch
  local ok, err = set_current_peer(balancer_data.ip, balancer_data.port, pool_opts)
  if not ok then
    ngx_log(ngx_ERR, "failed to set the current peer (address: ",
            tostring(balancer_data.ip), " port: ", tostring(balancer_data.port),
            "): ", tostring(err))

    ctx.KONG_BALANCER_ENDED_AT = get_now_ms()
    ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_BALANCER_START
    ctx.KONG_PROXY_LATENCY = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_PROCESSING_START

    return ngx.exit(500)
  end

8.5. header_filter_by_lua stage

This stage is executed after Kong receives the Header field returned by the upstream service.

      local upstream_status_header = constants.HEADERS.UPSTREAM_STATUS
      if singletons.configuration.enabled_headers[upstream_status_header] then
        header[upstream_status_header] = tonumber(sub(var.upstream_status or "", -3))
        if not header[upstream_status_header] then
          log(ERR, "failed to set ", upstream_status_header, " header")
        end
      end

      local hash_cookie = ctx.balancer_data.hash_cookie
      if not hash_cookie then
        return
      end

      local cookie = ck:new()
      local ok, err = cookie:set(hash_cookie)

runloop.header_filter.before, add the node status in the header of the returned result, and determine whether to add the Cookie of the load balancer consistency policy.

8.6. body_filter_by_lua stage

This phase is executed when receiving the Body data returned by the upstream service. chunks are divided according to the data size. This phase will be executed multiple times.

In the life cycle of Openresty, body_ filter_ by_ Using NGX. In Lua Arg [1] reads chunk using NGX Arg [2] marks EOF.

  -- Got all the body
  if kong.ctx.core.response_body then
    arg[1] = kong.ctx.core.response_body
    arg[2] = true
  end

  if not arg[2] then
    return
  end

  -- Get all body after
  -- Re count execution time
  ctx.KONG_BODY_FILTER_ENDED_AT = get_now_ms()
  ctx.KONG_BODY_FILTER_TIME = ctx.KONG_BODY_FILTER_ENDED_AT - ctx.KONG_BODY_FILTER_START

  if ctx.KONG_PROXIED then
    -- time spent receiving the response (header_filter + body_filter)
    -- we could use $upstream_response_time but we need to distinguish the waiting time
    -- from the receiving time in our logging plugins (especially ALF serializer).
    ctx.KONG_RECEIVE_TIME = ctx.KONG_BODY_FILTER_ENDED_AT - (ctx.KONG_HEADER_FILTER_START or
                                                             ctx.KONG_BALANCER_ENDED_AT or
                                                             ctx.KONG_BALANCER_START or
                                                             ctx.KONG_ACCESS_ENDED_AT)

8.7. log_by_lua stage

Call Lua's garbage collector to count Kong's memory usage:

local update_lua_mem
do
  local pid = ngx.worker.pid
  local kong_shm = ngx.shared.kong

  local Lua_MEM_SAMPLE_RATE = 10 -- seconds
  local last = ngx.time()

  local collectgarbage = collectgarbage

  update_lua_mem = function(force)
    local time = ngx.time()

    if force or time - last >= Lua_MEM_SAMPLE_RATE then
      local count = collectgarbage("count")

      local ok, err = kong_shm:safe_set("kong:mem:" .. pid(), count)
      if not ok then
        log(ERR, "could not record Lua VM allocated memory: ", err)
      end

      last = ngx.time()
    end
  end
end

Call the load balancer to adjust the weight of upstream nodes according to the response results:

      -- If response was produced by an upstream (ie, not by a Kong plugin)
      -- Report HTTP status for health checks
      local balancer_data = ctx.balancer_data
      if balancer_data and balancer_data.balancer_handle then
        local status = ngx.status
        if status == 504 then
          balancer_data.balancer.report_timeout(balancer_data.balancer_handle)
        else
          balancer_data.balancer.report_http_status(
            balancer_data.balancer_handle, status)
        end
        -- release the handle, so the balancer can update its statistics
        balancer_data.balancer_handle:release()
      end

9. Admin API

Kong Admin API portal:

function Kong.admin_content(options)
  local ctx = ngx.ctx
  if not ctx.workspace then
    ctx.workspace = kong.default_workspace
  end

  return serve_content("kong.api", options)
end
local function serve_content(module, options)

  -- CORS Cross domain correlation
  header["Access-Control-Allow-Origin"] = options.allow_origin or "*"

  -- start-up lapis
  lapis.serve(module)
end

about Lapis:

Lapis is a framework for building web applications using MoonScript or Lua that runs inside of a customized version of Nginx called OpenResty.

# api/init.lua
-- Load fixed route
-- Load core routes
for _, v in ipairs({"kong", "health", "cache", "config", "clustering"}) do
  local routes = require("kong.api.routes." .. v)
  api_helpers.attach_routes(app, routes)
end

  local routes = {}

  -- DAO Routes
  for _, dao in pairs(singletons.db.daos) do
    if dao.schema.generate_admin_api ~= false and not dao.schema.legacy then
      routes = Endpoints.new(dao.schema, routes)
    end
  end

Initialize build route:

# api/endpoints.lua
-- Create base route
-- Generates admin api endpoint functions
--
-- Examples:
--
-- /routes
-- /routes/:routes
-- /routes/:routes/service
-- /services/:services/routes
--
-- and
--
-- /services
-- /services/:services
-- /services/:services/routes/:routes
local function generate_endpoints(schema, endpoints)
  -- list route
  -- e.g. /routes
  generate_collection_endpoints(endpoints, schema)

  -- Monomer routing
  -- e.g. /routes/:routes
  generate_entity_endpoints(endpoints, schema)

  -- Determine whether there is an associated object
  -- for example route relation services
  for foreign_field_name, foreign_field in schema:each_field() do
    -- Foreign key
    if foreign_field.type == "foreign" and not foreign_field.schema.legacy then
      -- e.g. /routes/:routes/service
      generate_entity_endpoints(endpoints, schema, foreign_field.schema, foreign_field_name, true)

      -- e.g. /services/:services/routes
      generate_collection_endpoints(endpoints, schema, foreign_field.schema, foreign_field_name)

      -- e.g. /services/:services/routes/:routes
      generate_entity_endpoints(endpoints, foreign_field.schema, schema, foreign_field_name)
    end
  end

  return endpoints
end

-- Generates admin api collection endpoint functions
--
-- Examples:
--
-- /routes
-- /services/:services/routes
--
-- and
--
-- /services
local function generate_collection_endpoints(endpoints, schema, foreign_schema, foreign_field_name)
  local collection_path

  -- Foreign key Association
  if foreign_schema then
    collection_path = fmt("/%s/:%s/%s", foreign_schema.admin_api_name or
                                        foreign_schema.name,
                                        foreign_schema.name,
                                        schema.admin_api_nested_name or
                                        schema.admin_api_name or
                                        schema.name)

  else
    -- No foreign key Association
    collection_path = fmt("/%s", schema.admin_api_name or
                                 schema.name)
  end

  endpoints[collection_path] = {
    schema  = schema,
    methods = {
      --OPTIONS = method_not_allowed,
      --HEAD    = method_not_allowed,
      GET     = get_collection_endpoint(schema, foreign_schema, foreign_field_name),
      POST    = post_collection_endpoint(schema, foreign_schema, foreign_field_name),
      --PUT     = method_not_allowed,
      --PATCH   = method_not_allowed,
      --DELETE  = method_not_allowed,
    },
  }
end

Only focus on the part of POST request processing:

local function post_collection_endpoint(schema, foreign_schema, foreign_field_name, method)
  return function(self, db, helpers, post_process)
    if foreign_schema then
      local foreign_entity, _, err_t = select_entity(self, db, foreign_schema)
      if err_t then
        return handle_error(err_t)
      end

      if not foreign_entity then
        return not_found()
      end

      self.args.post[foreign_field_name] = foreign_schema:extract_pk_values(foreign_entity)
    end

    -- Processing request, parameter verification, inserting data    
    local entity, _, err_t = insert_entity(self, db, schema, method)
    if err_t then
      return handle_error(err_t)
    end
    
    -- Callback function
    if post_process then
      entity, _, err_t = post_process(entity)
      if err_t then
        return handle_error(err_t)
      end
    end

    return created(entity)
  end
end

The Admin API is only a layer of API encapsulation and is not responsible for the event processing and data synchronization behind it. The event processing behind it is described in the event processing section of the article.

10. Plug in development

Briefly introduce some small tricks that can be used in plug-in development.

10.1. Multi level Schema nesting

It looks disgusting, but it's like multi-layer Schema nesting.

local schema = {
    name = plugin_name,
    fields = {
        { consumer = typedefs.no_consumer },
        { protocols = typedefs.protocols_http },
        { config = {
          type = "record",
          fields = { {
            rules = {
              type = "array",
              elements = {
                type = "record",
                fields = { {
                  match = {
                    type = "array",
                    elements = {
                      type = "record",
                      fields = {
                        { vars = { type = "array", elements = { 
                            type = "array",
                            elements = { type = "string" }
                        } } }
                      }
                    }
                  }
                } }
              }
            }
          } },
        } }
    }
}

10.2. Custom Schema validator

local expr = require("resty.expr.v1")

local schema_validator = function(conf)
    if conf.rules then
        for _, rule in ipairs(conf.rules) do
            if rule.match and type(rule.match) == "table" then
                for _, m in pairs(rule.match) do
                    local ok, err = expr.new(m.vars)
                    if not ok then
                        return false, "failed to validate the 'vars' expression: " .. err
                    end
                end
            end
        end
    end

    return true
end

10.3. Log print Table

kong.log.inspect.on()
kong.log.debug("Lua table: ", t)

10.4. Custom log output

Versions above 2.3.0 are available.

local entry = {
    entries = ctx.log_entries,
    id = self.transaction_id,
    action = action_name,
}

kong.log.set_serialize_value("waf", entry)

Keywords: lua kong

Added by geo115fr on Wed, 12 Jan 2022 05:27:00 +0200