storm source code analysis


2021SC@SDUSC

First, introduce some knowledge about Worker.
Then analyze the code.

About Worker

Relationship among worker, executor and task

A worker is a process. A worker is a process. A process contains one or more threads. A thread is an executor. A thread will process one or more tasks. A task is a task, and a task is an instance object of a node class.

A node of the storm cluster may have one or more worker processes running on one or more topologies, and one worker process executes a subset of the topology. A worker belongs to a specific topology and may run one or more executors (executor threads) for one or more components (spuut / bolt) of the topology. A running topology includes multiple processes running on multiple nodes in the storm cluster.

One worker process executes a subset of one topology (Note: one worker will not serve multiple topologies). A worker process will start one or more executor threads to execute a topology component (spuut or bolt). Therefore, a running topology is composed of multiple worker processes on multiple physical machines in the cluster.

The executor is a separate thread started by the worker process. Each executor will only run the task of one component (spuut or bolt) of one topology (Note: the task can be one or more, storm is one component by default, and only one task will be generated. The executor thread will call all task instances in sequence in each cycle).

Task is the unit that finally runs the code in sput or bolt (Note: a task is an instance of sput or bolt, and the executor thread will call the nextTuple or execute method of the task during execution). After topology is started, the number of tasks of a component (spool or bolt) is fixed, but the number of executor threads used by the component can be dynamically adjusted (for example, one executor thread can execute one or more task instances of the component). This means that for a component, there are such conditions: #threads < = #tasks (that is, the number of threads is less than or equal to the number of tasks). By default, the number of tasks is equal to the number of executor threads, that is, one executor thread only runs one task.

Manage event threads for Worker processes

The Supervisor manages the event threads of the Worker process.

Each work node runs the Supervisor daemon, which is responsible for listening to the assigned host jobs on the work node, starting and stopping the assigned work processes of Nimbus.

The supervisor will regularly obtain the topology information, task assignment information assignments and various heartbeat information from zookeeper, and carry out task assignment on this basis.

During supervisor synchronization, new workers will be started or old workers will be closed and load balanced according to the new task allocation.

The processing flow is as follows:

Code analysis worker launcher. C

setup_permissions()

static int setup_permissions(FTSENT* entry, uid_t euser, int user_write, boolean setgid_on_dir) {
  if (lchown(entry->fts_path, euser, launcher_gid) != 0) {
    fprintf(ERRORFILE, "ERROR: Failure to exec app initialization process - %s, fts_path=%s\n",
            strerror(errno), entry->fts_path);
    return -1;
  }
  mode_t mode = entry->fts_statp->st_mode;
  // Preserve user read and execute and set group read and write.
  mode_t new_mode = (mode & (S_IRUSR | S_IXUSR)) | S_IRGRP | S_IWGRP;
  if (user_write) {
    new_mode = new_mode | S_IWUSR;
  }
  // If the entry is a directory, Add group execute and setGID bits.
  if ((mode & S_IFDIR) == S_IFDIR) {
    new_mode = new_mode | S_IXGRP;
    if (setgid_on_dir) {
      new_mode = new_mode | S_ISGID;
    }
  }
  if (chmod(entry->fts_path, new_mode) != 0) {
    fprintf(ERRORFILE, "ERROR: Failure to exec app initialization process - %s, fts_path=%s\n",
            strerror(errno), entry->fts_path);
    return -1;
  }
  return 0;
}

Optionally, set permissions for the directory so that it can be written by the user.

We set the permission r(w)xrws -- so that the file group (which should be storm's user group) has full access to the directory, and the file user (the user of the topology owner) can read and execute, and write to some directories. Set the setGID bit to ensure that users of storm can access any files created in this directory for cleaning. If setGID_ on_ If dir is FALSE, the sticky bit should not be set for the group permission of the directory.

mkdirs()

int mkdirs(const char* path, mode_t perm) {
  struct stat sb;
  char * npath;
  char * p;
  if (stat(path, &sb) == 0) {
    return check_dir(path, sb.st_mode, perm, 1);
  }
  npath = strdup(path);
  if (npath == NULL) {
    fprintf(LOGFILE, "ERROR: Not enough memory to copy path string");
    return -1;
  }
  /* Skip leading slashes. */
  p = npath;
  while (*p == '/') {
    p++;
  }

  while (NULL != (p = strchr(p, '/'))) {
    *p = '\0';
    if (create_validate_dir(npath, perm, path, 0) == -1) {
      free(npath);
      return -1;
    }
    *p++ = '/'; /* restore slash */
    while (*p == '/')
      p++;
  }

  /* Create the final directory component. */
  if (create_validate_dir(npath, perm, path, 1) == -1) {
    free(npath);
    return -1;
  }
  free(npath);
  return 0;
}

Ensure that the given path and all parent directories are created with the required permissions.

create_validate_dir()

int create_validate_dir(const char* npath, mode_t perm, const char* path,
                        int finalComponent) {
  struct stat sb;
  if (stat(npath, &sb) != 0) {
    if (mkdir(npath, perm) != 0) {
      if (errno != EEXIST || stat(npath, &sb) != 0) {
        fprintf(LOGFILE, "ERROR: Can't create directory %s - %s\n", npath,
                strerror(errno));
        return -1;
      }
      // The directory npath should exist.
      if (check_dir(npath, sb.st_mode, perm, finalComponent) == -1) {
        return -1;
      }
    }
  } else {
    if (check_dir(npath, sb.st_mode, perm, finalComponent) == -1) {
      return -1;
    }
  }
  return 0;
}

If the parent directory does not exist, it is created. If a race condition occurs, check permissions. Use 0 or 1 to indicate whether this is the last component. If so, we need to check permissions.

setup_worker_tmp_permissions()

int setup_worker_tmp_permissions(const char *worker_dir) {
  char* worker_tmp = concatenate("%s/tmp", "worker tmp dir", 1, worker_dir);
  if (worker_tmp != NULL) {
    int exit_code = setup_dir_permissions(worker_tmp, 1, FALSE);
    if (exit_code != 0) {
      fprintf(ERRORFILE, "ERROR: setup_dir_permissions on %s failed\n", worker_tmp);
      fflush(ERRORFILE);
    } 
    return exit_code;
  } else {
    return -1;
  }
}

Remove setgid from the worker ID / tmp directory so that java profiling can work. This is not required for non container workers. But it's best to be consistent.

Keywords: Big Data storm

Added by capella07 on Tue, 07 Dec 2021 00:49:11 +0200