nginx source code analysis -- master and worker process model

1, Overall architecture of Nginx

nginx in normal execution will have multiple processes. The most basic processes are master process (monitoring process, also known as main process) and woker process (working process), and possibly cache related processes.

A relatively complete overall frame structure is shown in the figure:

2, Core process model

The main process that starts nginx will act as the monitoring process, while the child process from the main process fork() will act as the worker process.

nginx can also be executed in a single process model. In this process model, the main process is the working process, and there is no monitoring process.

The core process model block diagram of Nginx is as follows:

master process

The monitoring process acts as the interactive interface between the whole process group and users, and monitors the process at the same time. It does not need to deal with network events and is not responsible for the execution of business. It only realizes the functions of restarting services, smooth upgrading, replacing log files, and real-time effectiveness of configuration files by managing worker processes.

master process overview (from Alibaba group data platform blog):

There is a key sigsuspend() function call in the for(::) infinite loop in the master process. This function call is that the master process is suspended most of the time until the master process receives the signal.

The master process determines NGX by checking the seven flag bits_ master_ process_ Operation of cycle method:

sig_atomic_t ngx_reap;

sig_atomic_t ngx_terminate;

sig_atomic_t ngx_quit;

sig_atomic_t ngx_reconfigure;

sig_atomic_t ngx_reopen;

sig_atomic_t ngx_change_binary;

sig_atomic_t ngx_noaccept;

Significance of signals received in the process to Nginx framework:

Another flag bit will be used: ngx_restart, which is only used as a flag bit in the master workflow and has nothing to do with the signal.

Core code (ngx_process_cycle.c):

void
ngx_master_process_cycle(ngx_cycle_t *cycle)
{
    char              *title;
    u_char            *p;
    size_t             size;
    ngx_int_t          i;
    ngx_uint_t         n, sigio;
    sigset_t           set;
    struct itimerval   itv;
    ngx_uint_t         live;
    ngx_msec_t         delay;
    ngx_listening_t   *ls;
    ngx_core_conf_t   *ccf;
 
    //Signal processing setup
    sigemptyset(&set);
    sigaddset(&set, SIGCHLD);
    sigaddset(&set, SIGALRM);
    sigaddset(&set, SIGIO);
    sigaddset(&set, SIGINT);
    sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
    sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));
 
    if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "sigprocmask() failed");
    }
 
    sigemptyset(&set);
 
 
    size = sizeof(master_process);
 
    for (i = 0; i < ngx_argc; i++) {
        size += ngx_strlen(ngx_argv[i]) + 1;
    }
 
    title = ngx_pnalloc(cycle->pool, size);
 
    p = ngx_cpymem(title, master_process, sizeof(master_process) - 1);
    for (i = 0; i < ngx_argc; i++) {
        *p++ = ' ';
        p = ngx_cpystrn(p, (u_char *) ngx_argv[i], size);
    }
 
    ngx_setproctitle(title);
 
 
    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
 
    //It contains the content of fork generating sub process
    ngx_start_worker_processes(cycle, ccf->worker_processes,
                               NGX_PROCESS_RESPAWN);
    //The main process of cache management process and cache loading process
    ngx_start_cache_manager_processes(cycle, 0);
 
    ngx_new_binary = 0;
    delay = 0;
    sigio = 0;
    live = 1;
 
    for ( ;; ) {//loop
        if (delay) {
            if (ngx_sigalrm) {
                sigio = 0;
                delay *= 2;
                ngx_sigalrm = 0;
            }
 
            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "termination cycle: %d", delay);
 
            itv.it_interval.tv_sec = 0;
            itv.it_interval.tv_usec = 0;
            itv.it_value.tv_sec = delay / 1000;
            itv.it_value.tv_usec = (delay % 1000 ) * 1000;
 
            if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {
                ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                              "setitimer() failed");
            }
        }
 
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "sigsuspend");
 
        sigsuspend(&set);//The master process sleeps and waits for the acceptance signal to be activated
 
        ngx_time_update();
 
        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "wake up, sigio %i", sigio);
 
        //A flag bit of 1 indicates that all child processes need to be monitored
        if (ngx_reap) {
            ngx_reap = 0;
            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children");
 
            live = ngx_reap_children(cycle);//Manage child processes
        }
 
        //When the live flag bit is 0 (indicating that all child processes have exited), NGX_ The terminate flag bit is 1 or NGX_ The quit flag bit is 1, indicating that you want to exit the master process
        if (!live && (ngx_terminate || ngx_quit)) {
            ngx_master_process_exit(cycle);//Exit master process
        }
 
        //ngx_ The terminate flag bit is 1, which forces the service to be shut down and sends a TERM signal to all child processes
        if (ngx_terminate) {
            if (delay == 0) {
                delay = 50;
            }
 
            if (sigio) {
                sigio--;
                continue;
            }
 
            sigio = ccf->worker_processes + 2 /* cache processes */;
 
            if (delay > 1000) {
                ngx_signal_worker_processes(cycle, SIGKILL);
            } else {
                ngx_signal_worker_processes(cycle,
                                       ngx_signal_value(NGX_TERMINATE_SIGNAL));
            }
 
            continue;
        }
 
        //ngx_ The quit flag bit is 1, which gracefully turns off the service
        if (ngx_quit) {
            ngx_signal_worker_processes(cycle,
                                        ngx_signal_value(NGX_SHUTDOWN_SIGNAL));//Send a quit signal to all child processes
 
            ls = cycle->listening.elts;
            for (n = 0; n < cycle->listening.nelts; n++) {//Close the listening port
                if (ngx_close_socket(ls[n].fd) == -1) {
                    ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
                                  ngx_close_socket_n " %V failed",
                                  &ls[n].addr_text);
                }
            }
            cycle->listening.nelts = 0;
 
            continue;
        }
 
        //ngx_ The reconfigure flag bit is 1. Reread the configuration file
        //nginx will not let the original worker sub process re read the configuration file. Its strategy is to re initialize ngx_cycle_t structure, which is used to read the new quota configuration file
        //Then create a new worker subprocess and destroy the old worker subprocess
        if (ngx_reconfigure) {
            ngx_reconfigure = 0;
 
            //ngx_new_binary flag bit is 1. Upgrade Nginx smoothly
            if (ngx_new_binary) {
                ngx_start_worker_processes(cycle, ccf->worker_processes,
                                           NGX_PROCESS_RESPAWN);
                ngx_start_cache_manager_processes(cycle, 0);
                ngx_noaccepting = 0;
 
                continue;
            }
 
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");
 
            //Initialize ngx_cycle_t structure
            cycle = ngx_init_cycle(cycle);
            if (cycle == NULL) {
                cycle = (ngx_cycle_t *) ngx_cycle;
                continue;
            }
 
            ngx_cycle = cycle;
            ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                                   ngx_core_module);
            //Create a new worker subprocess
            ngx_start_worker_processes(cycle, ccf->worker_processes,
                                       NGX_PROCESS_JUST_RESPAWN);
            ngx_start_cache_manager_processes(cycle, 1);
 
            /* allow new processes to start */
            ngx_msleep(100);
 
            live = 1;
            //Send QUIT signal to all child processes
            ngx_signal_worker_processes(cycle,
                                        ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
        }
        //ngx_ The restart flag is in NGX_ It is set to 1 when noaccepting is 1
        //Restart child process
        if (ngx_restart) {
            ngx_restart = 0;
            ngx_start_worker_processes(cycle, ccf->worker_processes,
                                       NGX_PROCESS_RESPAWN);
            ngx_start_cache_manager_processes(cycle, 0);
            live = 1;
        }
 
        //ngx_reopen flag bit is 1, reopen all files
        if (ngx_reopen) {
            ngx_reopen = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
            ngx_reopen_files(cycle, ccf->user);
            ngx_signal_worker_processes(cycle,
                                        ngx_signal_value(NGX_REOPEN_SIGNAL));
        }
 
        //Smooth upgrade Nginx
        if (ngx_change_binary) {
            ngx_change_binary = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "changing binary");
            ngx_new_binary = ngx_exec_new_binary(cycle, ngx_argv);
        }
 
        //ngx_noaccept is 1, which means that all child processes will no longer process new connections
        if (ngx_noaccept) {
            ngx_noaccept = 0;
            ngx_noaccepting = 1;
            ngx_signal_worker_processes(cycle,
                                        ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
        }
    }
}

ngx_start_worker_processes function:

static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
    ngx_int_t      i;
    ngx_channel_t  ch;
 
    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");
 
    ch.command = NGX_CMD_OPEN_CHANNEL;
 
    //Loop to create n worker sub processes
    for (i = 0; i < n; i++) {
        //Complete the specific work of fok new process
        ngx_spawn_process(cycle, ngx_worker_process_cycle,
                          (void *) (intptr_t) i, "worker process", type);
 
        //Global array ngx_processes is used to store the relevant information of each sub process, such as pid, channel, interface pointer of the process to do specific things, etc. this information is used in the structure ngx_process_t to describe.
        ch.pid = ngx_processes[ngx_process_slot].pid;
        ch.slot = ngx_process_slot;
        ch.fd = ngx_processes[ngx_process_slot].channel[0];
 
        /*In NGX_ spawn_ After process creates a worker process and returns, the master process sets the pid of the worker process and the worker process in NGX_ Pass the position and channel[0] in the processes array to the previously created worker process, and then continue the cycle to create the next worker process. Just mentioned a channel[0], here's a brief explanation: channel is just an array that can store two integer elements. This channel array is used to create an inter process channel by socketpair function. Master and worker processes and worker processes can communicate through such a channel, which is in NGX_ spawn_ In the process function, fork was called socketpair before it was created.*/
        ngx_pass_open_channel(cycle, &ch);
    }
}


ngx_spawn_process Function: 
 
//Parameter interpretation:
//cycle: the core structure surrounded by nginx framework
//proc: the work cycle to be executed in the subprocess
//Parameter: data
//Name: child process name
ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
    char *name, ngx_int_t respawn)
{
    u_long     on;
    ngx_pid_t  pid;
    ngx_int_t  s;
 
    if (respawn >= 0) {
        s = respawn;
 
    } else {
        for (s = 0; s < ngx_last_process; s++) {
            if (ngx_processes[s].pid == -1) {
                break;
            }
        }
 
        if (s == NGX_MAX_PROCESSES) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "no more than %d processes can be spawned",
                          NGX_MAX_PROCESSES);
            return NGX_INVALID_PID;
        }
    }
 
 
    if (respawn != NGX_PROCESS_DETACHED) {
 
        /* Solaris 9 still has no AF_LOCAL */
        //Create socket pairs for communication between parent and child processes (TCP based)
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "socketpair() failed while spawning \"%s\"", name);
            return NGX_INVALID_PID;
        }
 
        ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                       "channel %d:%d",
                       ngx_processes[s].channel[0],
                       ngx_processes[s].channel[1]);
 
        //Set to non blocking mode
        if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          ngx_nonblocking_n " failed while spawning \"%s\"",
                          name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
 
        if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          ngx_nonblocking_n " failed while spawning \"%s\"",
                          name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
 
        on = 1;
        if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
 
        if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
 
        if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                           name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
 
        if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                           name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
 
        ngx_channel = ngx_processes[s].channel[1];
 
    } else {
        ngx_processes[s].channel[0] = -1;
        ngx_processes[s].channel[1] = -1;
    }
 
    ngx_process_slot = s;
 
    //Create child process
    pid = fork();
 
    switch (pid) {
 
    case -1:
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "fork() failed while spawning \"%s\"", name);
        ngx_close_channel(ngx_processes[s].channel, cycle->log);
        return NGX_INVALID_PID;
 
    case 0:
        ngx_pid = ngx_getpid();
        proc(cycle, data);
        break;
 
    default:
        break;
    }
 
    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);
 
    ngx_processes[s].pid = pid;
    ngx_processes[s].exited = 0;
 
    if (respawn >= 0) {
        return pid;
    }
 
    ngx_processes[s].proc = proc;
    ngx_processes[s].data = data;
    ngx_processes[s].name = name;
    ngx_processes[s].exiting = 0;
 
    switch (respawn) {
 
    case NGX_PROCESS_NORESPAWN:
        ngx_processes[s].respawn = 0;
        ngx_processes[s].just_spawn = 0;
        ngx_processes[s].detached = 0;
        break;
 
    case NGX_PROCESS_JUST_SPAWN:
        ngx_processes[s].respawn = 0;
        ngx_processes[s].just_spawn = 1;
        ngx_processes[s].detached = 0;
        break;
 
    case NGX_PROCESS_RESPAWN:
        ngx_processes[s].respawn = 1;
        ngx_processes[s].just_spawn = 0;
        ngx_processes[s].detached = 0;
        break;
 
    case NGX_PROCESS_JUST_RESPAWN:
        ngx_processes[s].respawn = 1;
        ngx_processes[s].just_spawn = 1;
        ngx_processes[s].detached = 0;
        break;
 
    case NGX_PROCESS_DETACHED:
        ngx_processes[s].respawn = 0;
        ngx_processes[s].just_spawn = 0;
        ngx_processes[s].detached = 1;
        break;
    }
 
    if (s == ngx_last_process) {
        ngx_last_process++;
    }
 
    return pid;
}

worker process

The main task of worker process is to complete the specific task logic. Its main focus is on I/O interaction events such as data readability / Writeability with the client or back-end real server (at this time, nginx acts as an intermediate agent), so the blocking points of the work process are like select(), epoll_ I/O multiplexing functions such as wait () are called to wait for data read / write events. Of course, it may also be interrupted by the newly received process signal.

How does the master process notify the worker process to do some work? Signals are used.

When a signal is received, the signal processing function ngx_signal_handler() will execute.

Working method for worker process NGX_ worker_ process_ For cycle, it mainly focuses on four global flag bits:

sig_atomic_t ngx_terminate;// Force shutdown process

sig_atomic_t ngx_quit;// Gracefully close the process (the only piece of code that will set it is to receive the QUIT signal. ngx_quit will set ngx_exiting to 1 only when it is set to 1 for the first time)

ngx_uint_t ngx_exiting;// Exit process flag bit

sig_atomic_t ngx_reopen;// Reopen all files

Where ngx_terminate,ngx_quit ,ngx_ Reopening will be performed by NGX_ signal_ The handler is set according to the received signal. ngx_ The exiting flag bit is only used by NGX_ worker_ The cycle method is used as a flag bit when exiting.

Core code (ngx_process_cycle.c):

static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
    ngx_int_t worker = (intptr_t) data;
 
    ngx_uint_t         i;
    ngx_connection_t  *c;
 
    ngx_process = NGX_PROCESS_WORKER;
 
    //Subprocess initialization
    ngx_worker_process_init(cycle, worker);
 
    ngx_setproctitle("worker process");
 
//Here is a piece of code under multithreading conditions. Because nginx does not support multithreading, it is deleted
 
    //loop
    for ( ;; ) {
        
        //ngx_ The exiting flag bit is 1, and the process exits
        if (ngx_exiting) {
            c = cycle->connections;
            for (i = 0; i < cycle->connection_n; i++) {
                if (c[i].fd != -1 && c[i].idle) {
                    c[i].close = 1;
                    c[i].read->handler(c[i].read);
                }
            }
 
            if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
            {
                ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
                ngx_worker_process_exit(cycle);
            }
        }
 
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
 
        ngx_process_events_and_timers(cycle);//Methods of handling events
 
        //Force end process
        if (ngx_terminate) {
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
            ngx_worker_process_exit(cycle);
        }
 
        //Exit the process gracefully
        if (ngx_quit) {
            ngx_quit = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                          "gracefully shutting down");
            ngx_setproctitle("worker process is shutting down");
 
            if (!ngx_exiting) {
                ngx_close_listening_sockets(cycle);
                //Set ngx_exiting flag bit
                ngx_exiting = 1;
            }
        }
 
        //Reopen all files
        if (ngx_reopen) {
            ngx_reopen = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
            ngx_reopen_files(cycle, -1);
        }
    }
}

Keywords: Linux network Nginx Interview server

Added by Sulphy on Sat, 26 Feb 2022 08:17:12 +0200