[source code analysis] NVIDIA HugeCTR, GPU version parameter server ----- Local hash table

[source code analysis] NVIDIA HugeCTR, GPU version parameter server -- (9) -- local hash table

0x00 summary

In this series, we introduce HugeCTR, an industry-oriented recommendation system training framework, which is optimized for large-scale CTR models with model parallel embedding and data parallel intensive networks. This article introduces the backward operation of localized slotsparseembeddinghash.

Which draws lessons from HugeCTR source code reading Thank you for this masterpiece.

Other articles in this series are as follows:

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server -- (1)

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server -- - (2)

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server -- - (3)

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server -- - (4)

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server - (5) embedded hash table

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server - (6) -- distributed hash table

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server - (7) -- distributed hash propagates to

[Source code analysis] NVIDIA HugeCTR, GPU version parameter server -- - (8) -- distributed hash, and then propagate to

0x01 previous review

From the previous analysis, we can see that the overall process of an embedded table lookup is as follows.

0x02 definition

The LocalizedSlotSparseEmbeddingHash class inherits from the Embedding class, which is the base class that implements all embedded layers. In the LocalizedSlotSparseEmbeddingHash class, some slots in the embedded table are assigned to a single GPU, called localized slots. For example, slot 0 on GPU-0, slot 1 on GPU-1, slot 2 on GPU-0, slot 3 on GPU-1, etc. In contrast, some of the slots in the DistributedSlotSparseEmbeddingHash are allocated to multiple GPUs.

The embedded table is encapsulated in a hash table. The key in the hash table is called hash_table_key, the value in the hash table is called hash_table_value_index indicates the row number of the embedding feature in the embedding table. The embedding feature is called hash_table_value.

LocalizedSlotSparseEmbeddingHash implements all the operations required for the training process of the embedded layer, including forward propagation and backward propagation. Forward propagation corresponds to API forward. The API of back propagation is divided into two stages: backward and update_params. This class also provides the operation of uploading the hash table (including hash table key, hash table value index and hash table value) from the host file to the GPU (named load_parameters) and downloading the hash table from the GPU to the host file (named dump_parameters).

template <typename TypeHashKey, typename TypeEmbeddingComp>
class LocalizedSlotSparseEmbeddingHash : public IEmbedding {
  using NvHashTable = HashTable<TypeHashKey, size_t>;

 private:
  EmbeddingData<TypeHashKey, TypeEmbeddingComp> embedding_data_;
  std::vector<LocalizedFilterKeyStorage<TypeHashKey>> filter_keys_storages_;

  std::vector<std::shared_ptr<NvHashTable>> hash_tables_; /**< Hash table.  */

  // define tensors
  Tensors2<float> hash_table_value_tensors_; /**< Hash table value. */
  std::vector<Tensors2<float>> value_table_tensors_;

  Tensors2<size_t> hash_table_slot_id_tensors_; /**< the tensors for storing slot ids */
  Tensors2<size_t> hash_value_index_tensors_;   /**< Hash value index. The index is corresponding to
                                                     the line number of the value. */
  Tensors2<TypeEmbeddingComp>
      embedding_feature_tensors_;             /**< the output tensor of the forward(). */
  Tensors2<TypeEmbeddingComp> wgrad_tensors_; /**< the input tensor of the backward(). */

  std::vector<EmbeddingOptimizer<TypeHashKey, TypeEmbeddingComp>> embedding_optimizers_;
  size_t max_vocabulary_size_;
  size_t max_vocabulary_size_per_gpu_;   /**< Max vocabulary size for each GPU. */
  std::vector<size_t> slot_num_per_gpu_; /* slot_num per GPU */
  std::vector<size_t> slot_size_array_;

  SparseEmbeddingFunctors functors_;

  Tensors2<TypeEmbeddingComp> all2all_tensors_; /**< the temple buffer to store all2all results */

  Tensors2<TypeEmbeddingComp> utest_all2all_tensors_;
  Tensors2<TypeEmbeddingComp> utest_reorder_tensors_;
  Tensors2<TypeEmbeddingComp> utest_backward_temp_tensors_;
  Tensors2<TypeEmbeddingComp> utest_forward_temp_tensors_;
}

0x03 build

3.1 call

In HugeCTR/src/parsers/create_embedding.cpp contains the following calls:

case Embedding_t::LocalizedSlotSparseEmbeddingHash: {
  const SparseEmbeddingHashParams embedding_params = {batch_size,
                                                      batch_size_eval,
                                                      max_vocabulary_size_per_gpu,
                                                      slot_size_array,
                                                      embedding_vec_size,
                                                      sparse_input.max_feature_num_per_sample,
                                                      sparse_input.slot_num,
                                                      combiner,  // combiner: 0-sum, 1-mean
                                                      embedding_opt_params};

  embeddings.emplace_back(new LocalizedSlotSparseEmbeddingHash<TypeKey, TypeFP>(
      sparse_input.train_sparse_tensors, sparse_input.evaluate_sparse_tensors, embedding_params,
      resource_manager));

  break;
}

3.2 constructor

The constructor of LocalizedSlotSparseEmbeddingHash is as follows. For the specific logic, see the notes below.

template <typename TypeHashKey, typename TypeEmbeddingComp>
LocalizedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::LocalizedSlotSparseEmbeddingHash(
    const SparseTensors<TypeHashKey> &train_keys, const SparseTensors<TypeHashKey> &evaluate_keys,
    const SparseEmbeddingHashParams &embedding_params,
    const std::shared_ptr<ResourceManager> &resource_manager)
    : embedding_data_(Embedding_t::LocalizedSlotSparseEmbeddingHash, train_keys, evaluate_keys,
                      embedding_params, resource_manager),
      slot_size_array_(embedding_params.slot_size_array) {
  try {
    // Set the maximum amount of data per GPU
    if (slot_size_array_.empty()) {
      max_vocabulary_size_per_gpu_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu;
      max_vocabulary_size_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu *
                             embedding_data_.get_resource_manager().get_global_gpu_count();
    } else {
      max_vocabulary_size_per_gpu_ =
          cal_max_voc_size_per_gpu(slot_size_array_, embedding_data_.get_resource_manager());
      max_vocabulary_size_ = 0;
      for (size_t slot_size : slot_size_array_) {
        max_vocabulary_size_ += slot_size;
      }
    }

    CudaDeviceContext context;
    // Traverse local GPU
    for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); id++) {
      // Set current context
      context.set_device(embedding_data_.get_local_gpu(id).get_device_id());

      // Number of slot s per GPU
      size_t gid = embedding_data_.get_local_gpu(id).get_global_id();
      size_t slot_num_per_gpu =
          embedding_data_.embedding_params_.slot_num /
              embedding_data_.get_resource_manager().get_global_gpu_count() +
          ((gid < embedding_data_.embedding_params_.slot_num %
                      embedding_data_.get_resource_manager().get_global_gpu_count())
               ? 1
               : 0);
      slot_num_per_gpu_.push_back(slot_num_per_gpu);
      // new GeneralBuffer objects
      const std::shared_ptr<GeneralBuffer2<CudaAllocator>> &buf = embedding_data_.get_buffer(id);
      embedding_optimizers_.emplace_back(max_vocabulary_size_per_gpu_,
                                         embedding_data_.embedding_params_, buf);

      // The next step is to allocate memory for various variables
      // new hash table value vectors
      if (slot_size_array_.empty()) {
        Tensor2<float> tensor;
        buf->reserve(
            {max_vocabulary_size_per_gpu_, embedding_data_.embedding_params_.embedding_vec_size},
            &tensor);
        hash_table_value_tensors_.push_back(tensor);
      } else {
        const std::shared_ptr<BufferBlock2<float>> &block = buf->create_block<float>();
        Tensors2<float> tensors;
        size_t vocabulary_size_in_current_gpu = 0;
        for (size_t i = 0; i < slot_size_array_.size(); i++) {
          if ((i % embedding_data_.get_resource_manager().get_global_gpu_count()) == gid) {
            Tensor2<float> tensor;
            block->reserve(
                {slot_size_array_[i], embedding_data_.embedding_params_.embedding_vec_size},
                &tensor);
            tensors.push_back(tensor);
            vocabulary_size_in_current_gpu += slot_size_array_[i];
          }
        }
        value_table_tensors_.push_back(tensors);
        if (max_vocabulary_size_per_gpu_ > vocabulary_size_in_current_gpu) {
          Tensor2<float> padding_tensor_for_optimizer;
          block->reserve({max_vocabulary_size_per_gpu_ - vocabulary_size_in_current_gpu,
                          embedding_data_.embedding_params_.embedding_vec_size},
                         &padding_tensor_for_optimizer);
        }
        hash_table_value_tensors_.push_back(block->as_tensor());
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(true),
                      embedding_data_.embedding_params_.max_feature_num},
                     &tensor);
        embedding_data_.train_value_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(false),
                      embedding_data_.embedding_params_.max_feature_num},
                     &tensor);
        embedding_data_.evaluate_value_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve(
            {embedding_data_.embedding_params_.get_batch_size(true) * slot_num_per_gpu + 1},
            &tensor);
        embedding_data_.train_row_offsets_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeHashKey> tensor;
        buf->reserve(
            {embedding_data_.embedding_params_.get_batch_size(false) * slot_num_per_gpu + 1},
            &tensor);
        embedding_data_.evaluate_row_offsets_tensors_.push_back(tensor);
      }
      { embedding_data_.train_nnz_array_.push_back(std::make_shared<size_t>(0)); }
      { embedding_data_.evaluate_nnz_array_.push_back(std::make_shared<size_t>(0)); }
      // new hash table value_index that get() from HashTable
      {
        Tensor2<size_t> tensor;
        buf->reserve({1, embedding_data_.embedding_params_.get_universal_batch_size() *
                             embedding_data_.embedding_params_.max_feature_num},
                     &tensor);
        hash_value_index_tensors_.push_back(tensor);
      }

      // new embedding features reduced by hash table values(results of forward)
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve(
            {embedding_data_.embedding_params_.get_universal_batch_size() * slot_num_per_gpu,
             embedding_data_.embedding_params_.embedding_vec_size},
            &tensor);
        embedding_feature_tensors_.push_back(tensor);
      }

      // new wgrad used by backward
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(true) * slot_num_per_gpu,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        wgrad_tensors_.push_back(tensor);
      }

      // the tenosrs for storing slot ids
      // TODO: init to -1 ?
      {
        Tensor2<size_t> tensor;
        buf->reserve({max_vocabulary_size_per_gpu_, 1}, &tensor);
        hash_table_slot_id_tensors_.push_back(tensor);
      }
      // temp tensors for all2all
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.get_universal_batch_size_per_gpu() *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        all2all_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_universal_batch_size() *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_forward_temp_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.get_batch_size_per_gpu(true) *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_all2all_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.get_batch_size_per_gpu(true) *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_reorder_tensors_.push_back(tensor);
      }
      {
        Tensor2<TypeEmbeddingComp> tensor;
        buf->reserve({embedding_data_.embedding_params_.get_batch_size(true) *
                          embedding_data_.embedding_params_.slot_num,
                      embedding_data_.embedding_params_.embedding_vec_size},
                     &tensor);
        utest_backward_temp_tensors_.push_back(tensor);
      }
      {
        size_t max_nnz = embedding_data_.embedding_params_.get_universal_batch_size() *
                         embedding_data_.embedding_params_.max_feature_num;
        size_t rowoffset_count = embedding_data_.embedding_params_.slot_num *
                                     embedding_data_.embedding_params_.get_universal_batch_size() +
                                 1;

        filter_keys_storages_.emplace_back(buf, max_nnz, rowoffset_count);
      }
    }

    hash_tables_.resize(embedding_data_.get_resource_manager().get_local_gpu_count());
#pragma omp parallel for num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
    for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); id++) {
      // Initialize internal hash table
      CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());
      // construct HashTable object: used to store hash table <key, value_index>
      hash_tables_[id].reset(new NvHashTable(max_vocabulary_size_per_gpu_));
      embedding_data_.get_buffer(id)->allocate();
    }

    // Initialize optimizer
    for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); id++) {
      context.set_device(embedding_data_.get_local_gpu(id).get_device_id());
      embedding_optimizers_[id].initialize(embedding_data_.get_local_gpu(id));

    }  // end of for(int id = 0; id < embedding_data_.get_local_gpu_count(); id++)

    if (!embedding_data_.embedding_params_.slot_size_array.empty()) {
      std::vector<TypeHashKey> embedding_offsets;
      TypeHashKey slot_sizes_prefix_sum = 0;
      for (size_t i = 0; i < embedding_data_.embedding_params_.slot_size_array.size(); i++) {
        embedding_offsets.push_back(slot_sizes_prefix_sum);
        slot_sizes_prefix_sum += embedding_data_.embedding_params_.slot_size_array[i];
      }
      for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); ++id) {
        CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());

        CK_CUDA_THROW_(
            cudaMemcpy(embedding_data_.embedding_offsets_[id].get_ptr(), embedding_offsets.data(),
                       embedding_offsets.size() * sizeof(TypeHashKey), cudaMemcpyHostToDevice));
      }
    }
    // sync
    functors_.sync_all_gpus(embedding_data_.get_resource_manager());

  } catch (const std::runtime_error &rt_err) {
    std::cerr << rt_err.what() << std::endl;
    throw;
  }

  return;
}

3.3 how to determine a slot

Next, let's look at how to determine which GPU has which slot. In init_ Init_ is invoked in params. Embedding completes the build.

  /**
   * Initialize the embedding table
   */
  void init_params() override {
    // do hash table value initialization
    if (slot_size_array_.empty()) {  // if no slot_sizes provided, use the old method to init
      init_embedding(max_vocabulary_size_per_gpu_,
                     embedding_data_.embedding_params_.embedding_vec_size,
                     hash_table_value_tensors_);

    } else {
      if (slot_size_array_.size() == embedding_data_.embedding_params_.slot_num) {
#ifndef DATA_READING_TEST
        init_embedding(slot_size_array_, embedding_data_.embedding_params_.embedding_vec_size,
                       value_table_tensors_, hash_table_slot_id_tensors_);

#endif
      } else {
        throw std::runtime_error(
            std::string("[HCDEBUG][ERROR] Runtime error: the size of slot_sizes != slot_num\n"));
      }
    }
  }

init_embedding will create an embedded table on top of each GPU.

template <typename TypeHashKey, typename TypeEmbeddingComp>
void LocalizedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::init_embedding(
    const std::vector<size_t> &slot_sizes, size_t embedding_vec_size,
    std::vector<Tensors2<float>> &hash_table_value_tensors,
    Tensors2<size_t> &hash_table_slot_id_tensors) {
  
  // Get the number of GPUs on this node and the number of global GPUs
  size_t local_gpu_count = embedding_data_.get_resource_manager().get_local_gpu_count();
  size_t total_gpu_count = embedding_data_.get_resource_manager().get_global_gpu_count();

  for (size_t id = 0; id < local_gpu_count; id++) { // Traverse local GPU
    // global id is used here to set
    size_t device_id = embedding_data_.get_local_gpu(id).get_device_id();
    size_t global_id = embedding_data_.get_local_gpu(id).get_global_id();
    functors_.init_embedding_per_gpu(global_id, total_gpu_count, slot_sizes, embedding_vec_size,
                                     hash_table_value_tensors[id], hash_table_slot_id_tensors[id],
                                     embedding_data_.get_local_gpu(id));
  }

  for (size_t id = 0; id < local_gpu_count; id++) {
    CK_CUDA_THROW_(cudaStreamSynchronize(embedding_data_.get_local_gpu(id).get_stream()));
  }

  return;
}

Let's analyze init_embedding_per_gpu, in fact, is simply allocated with% operation. Give an example: if there are 10 slots and 3 GPUs, the slot ID is 0 ~ 9 and the GPU id is 0 ~ 2. 0 ~ 10% 3 = 0,1,2,0,1,2,0,1,2,0, so 10 slots are allocated to 3 GPUs:

  • GPU 0 : 0,3,6,9

  • GPU 1 : 1,4,7,

  • GPU 2 : 2,5,8,

Therefore, slot per gpu is not equal.

void SparseEmbeddingFunctors::init_embedding_per_gpu(size_t gid, size_t total_gpu_count,
                                                     const std::vector<size_t> &slot_sizes,
                                                     size_t embedding_vec_size,
                                                     Tensors2<float> &embedding_tables,
                                                     Tensor2<size_t> &slot_ids,
                                                     const GPUResource &gpu_resource) {
  CudaDeviceContext context(gpu_resource.get_device_id());
  size_t *slot_ids_ptr = slot_ids.get_ptr();
  size_t key_offset = 0;
  size_t value_index_offset = 0;
  for (size_t i = 0, j = 0; i < slot_sizes.size(); i++) { // Traverse slot
    size_t slot_size = slot_sizes[i];
    if ((i % total_gpu_count) == gid) { // This GPU id
      // The operation will continue only when i is equal to gid
      float up_bound = sqrt(1.f / slot_size);
      HugeCTR::UniformGenerator::fill(
          embedding_tables[j++], -up_bound, up_bound, gpu_resource.get_sm_count(),
          gpu_resource.get_replica_variant_curand_generator(), gpu_resource.get_stream());
      // Configure slot id
      memset_const(slot_ids_ptr, i, slot_size, gpu_resource.get_stream());

      value_index_offset += slot_size;
      slot_ids_ptr += slot_size;
    }
    key_offset += slot_size;
  }
}

0x04 forward propagation

4.1 General

Let's first summarize the steps of forward communication:

  • First, use filter_keys_per_gpu configures EmbeddingData.

  • Second, use forward_per_gpu looks up from embedding, that is, call functions_ forward_ per_ The gpu performs a lookup operation from the hashmap of the gpu to obtain a dense vector.

  • Use all2all_forward allows each GPU to have all the data of all samples. The ultimate goal here is similar to dist's idea. There are only a few complete samples for each GPU, and the samples on different GPUs are different. Therefore, you need to copy the data of the current sample in other slots to this GPU. In other words, among the results of all2all, only other slots of the current sample are selected.

  • Use forward_reorder adjusts the internal order of the data of each GPU (which will be described in detail later).

  • Using store_slot_id stores the slot id. The reason to save the slot id corresponding to the parameter is that there are different slots on each GPU. Now all slots of a sample should be placed on the same GPU, so you need to know which slot to load when loading.

The specific codes are as follows:

/**
   * The forward propagation of embedding layer.
   */
  void forward(bool is_train, int eval_batch = -1) override {
#pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
    {
      size_t i = omp_get_thread_num();
      CudaDeviceContext context(embedding_data_.get_local_gpu(i).get_device_id());

      if (embedding_data_.embedding_params_.is_data_parallel) {
        filter_keys_per_gpu(is_train, i, embedding_data_.get_local_gpu(i).get_global_id(),
                            embedding_data_.get_resource_manager().get_global_gpu_count());
      }
      functors_.forward_per_gpu(
          embedding_data_.embedding_params_.get_batch_size(is_train), slot_num_per_gpu_[i],
          embedding_data_.embedding_params_.embedding_vec_size,
          embedding_data_.embedding_params_.combiner, is_train,
          embedding_data_.get_row_offsets_tensors(is_train)[i],
          embedding_data_.get_value_tensors(is_train)[i],
          *embedding_data_.get_nnz_array(is_train)[i], *hash_tables_[i],
          hash_table_value_tensors_[i], hash_value_index_tensors_[i], embedding_feature_tensors_[i],
          embedding_data_.get_local_gpu(i).get_stream());
    }

    // At this point, embedding_feature_tensors_  Inside is the embedding table. Inside is the embedding vector
// do all-to-all
#ifndef ENABLE_MPI
    if (embedding_data_.get_resource_manager().get_global_gpu_count() > 1) {
      functors_.all2all_forward(embedding_data_.get_batch_size_per_gpu(is_train), slot_num_per_gpu_,
                                embedding_data_.embedding_params_.embedding_vec_size,
                                embedding_feature_tensors_, all2all_tensors_,
                                embedding_data_.get_resource_manager());
    } else {
      CK_CUDA_THROW_(cudaMemcpyAsync(
          all2all_tensors_[0].get_ptr(), embedding_feature_tensors_[0].get_ptr(),
          embedding_data_.get_batch_size_per_gpu(is_train) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#else
    if (embedding_data_.get_resource_manager().get_global_gpu_count() > 1) {
      functors_.all2all_forward(embedding_data_.get_batch_size_per_gpu(is_train),
                                embedding_data_.embedding_params_.slot_num,
                                embedding_data_.embedding_params_.embedding_vec_size,
                                embedding_feature_tensors_, all2all_tensors_,
                                embedding_data_.get_resource_manager());
    } else {
      CK_CUDA_THROW_(cudaMemcpyAsync(
          all2all_tensors_[0].get_ptr(), embedding_feature_tensors_[0].get_ptr(),
          (size_t)embedding_data_.get_batch_size_per_gpu(is_train) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#endif

    // reorder
    functors_.forward_reorder(embedding_data_.get_batch_size_per_gpu(is_train),
                              embedding_data_.embedding_params_.slot_num,
                              embedding_data_.embedding_params_.embedding_vec_size,
                              all2all_tensors_, embedding_data_.get_output_tensors(is_train),
                              embedding_data_.get_resource_manager());

    // store slot ids
    functors_.store_slot_id(embedding_data_.embedding_params_.get_batch_size(is_train),
                            embedding_data_.embedding_params_.slot_num, slot_num_per_gpu_,
                            embedding_data_.get_row_offsets_tensors(is_train),
                            hash_value_index_tensors_, hash_table_slot_id_tensors_,
                            embedding_data_.get_resource_manager());

    return;
  }

Let's take the following figure as an example. Here, we assume a total of 2 sample s and 4 slots. embedding_vec_size = 8,batch_size_per_gpu = 2. Here is an important place: how to determine which slot is on which GPU.

0 ~ 3% 2 = 0, 1, 0, 1, so 4 slot s are allocated to 2 GPU s:

  • GPU 0 : slot 0,slot 2;
  • GPU 1 : slot 1,slot 3;

It should be noted that the slot order here is not 1, 2, 3 and 4, which is why we need to reorder later. Because slots are not in simple ascending order, the following numerical allocation is not in simple ascending order, but:

  • GPU 0 : 1,3,5,7;

  • GPU 1 : 2,4,6,8;

Why is it distributed like this? It can be known after the end of the last forward propagation.

4.2 alltoall

Because forward_ per_ The GPU function has been introduced earlier, so let's look directly at the alltoall operation.

As we mentioned earlier, after each GPU obtains the dense vector locally, it will be stored in embedding_feature_tensors_. This is a one-dimensional array. Under dist type, the length is sample_num(batch_size) * slot_num_per_gpu[i] * embedding_vec_size. In local, this is batch_size_per_gpu * slot_num_per_gpu[i] * embedding_vec_size.

So the next step is to send embedding between GPUs_ feature_ tensors_, Then each GPU only accepts what it should accept.

template <typename Type>
void SparseEmbeddingFunctors::all2all_forward(size_t batch_size_per_gpu,
                                              const std::vector<size_t> &slot_num_per_gpu,
                                              size_t embedding_vec_size,
                                              const Tensors2<Type> &send_tensors,
                                              Tensors2<Type> &recv_tensors,
                                              const ResourceManager &resource_manager) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count();

  // Fill in partition table, ith Topo GPU to jth Topo GPU
  std::vector<std::vector<size_t>> table(local_gpu_count, std::vector<size_t>(local_gpu_count));
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t element_per_send = batch_size_per_gpu * slot_num_per_gpu[i] * embedding_vec_size;
    for (size_t j = 0; j < local_gpu_count; j++) {
      table[i][j] = element_per_send;
    }
  }

  std::vector<const Type *> src(local_gpu_count);
  std::vector<Type *> dst(local_gpu_count);
  for (size_t id = 0; id < local_gpu_count; id++) {
    src[id] = send_tensors[id].get_ptr();
    dst[id] = recv_tensors[id].get_ptr();
  }
  std::vector<std::vector<const Type *>> src_pos(local_gpu_count,
                                                 std::vector<const Type *>(local_gpu_count));
  std::vector<std::vector<Type *>> dst_pos(local_gpu_count, std::vector<Type *>(local_gpu_count));
  
  // Set offset of source data
  // Calculate the src offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t src_offset = 0;
    for (size_t j = 0; j < local_gpu_count; j++) {
      src_pos[i][j] = src[i] + src_offset;
      src_offset += table[i][j];
    }
  }
  // Set the offset of the target data
  // Calculate the dst offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t dst_offset = 0;
    for (size_t j = 0; j < local_gpu_count; j++) {
      dst_pos[i][j] = dst[i] + dst_offset;
      dst_offset += table[j][i];
    }
  }

  // need to know the Type
  ncclDataType_t type;
  switch (sizeof(Type)) {
    case 2:
      type = ncclHalf;
      break;
    case 4:
      type = ncclFloat;
      break;
    default:
      CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
  }

  // Do the all2all transfer
  CK_NCCL_THROW_(ncclGroupStart());
  for (size_t i = 0; i < local_gpu_count; i++) {
    const auto &local_gpu = resource_manager.get_local_gpu(i);
    for (size_t j = 0; j < local_gpu_count; j++) {
      CK_NCCL_THROW_(ncclSend(src_pos[i][j], table[i][j], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream()));
      CK_NCCL_THROW_(ncclRecv(dst_pos[i][j], table[j][i], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream()));
    }
  }
  CK_NCCL_THROW_(ncclGroupEnd());

  return;
}

MPI_Alltoall and MPI_ Compared with allgahter, the difference is:

  • MPI_AllGather: different processes collect exactly the same data from a process (aggregation result process).
  • MPI_Alltoall: different processes collect different data from a process (aggregation result process).

For example:

rank=0, Send 0 1 2
rank=1, Send 3 4 5
rank=2, Send 6 7 8

Then accept:

rank=0, Accept 0 3 6
rank=1, Accept 1 4 7
rank=2, Accept 2 5 8

For our example, at present, it is as follows:

GPU0 Send: 1,3,5,7
GPU1 Send: 2,4,6,8

GPU0 Accepted: 1,3,2,4
GPU1 Accepted: 5,7,6,8

Get as follows, "..." For all2all_tensors_ The length is more than 4 item s.

4.3 Reorder

We can find that each GPU now has its own data (each GPU is a complete sample), but there is a problem with the internal order of the sample data, which is not in the ascending order of the slot. Let's roughly adjust and refine the above figure (the legend is in and out of the actual variables, which is just for better demonstration).

Next, use Reorder to retrieve the data from all2all_ Copy tensor to embedding_data_.get_output_tensors(is_train). The order will be adjusted during the copying process. The purpose is to convert slot 0, slot 2, slot 1 and slot 3 into slot 0, slot 1, slot 2 and slot 3.

template <typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::forward_reorder(size_t batch_size_per_gpu, size_t slot_num,
                                              size_t embedding_vec_size, size_t total_gpu_count,
                                              const Tensors2<TypeEmbeddingComp> &src_tensors,
                                              Tensors2<TypeEmbeddingComp> &dst_tensors,
                                              const ResourceManager &resource_manager) {
  CudaDeviceContext context;
  size_t local_gpu_count = resource_manager.get_local_gpu_count();

  for (size_t id = 0; id < local_gpu_count; id++) { // Traverse local GPU
    const auto &local_gpu = resource_manager.get_local_gpu(id); 
    context.set_device(local_gpu->get_device_id());

    // Copy
    do_forward_reorder(batch_size_per_gpu, slot_num, embedding_vec_size, total_gpu_count,
                       src_tensors[id].get_ptr(), dst_tensors[id].get_ptr(),
                       local_gpu->get_stream());
  }
}

do_ forward_ The reorder code is as follows, which relies on forward_ reorder_ The kernel completes the specific logic.

template <typename TypeEmbeddingComp>
void do_forward_reorder(size_t batch_size_per_gpu, size_t slot_num, size_t embedding_vec_size,
                        size_t total_gpu_count, const TypeEmbeddingComp *input,
                        TypeEmbeddingComp *output, cudaStream_t stream) {
  const size_t grid_size = batch_size_per_gpu;
  const size_t block_size = embedding_vec_size;
  forward_reorder_kernel<<<grid_size, block_size, 0, stream>>>(
      batch_size_per_gpu, slot_num, embedding_vec_size, total_gpu_count, input, output);
}

4.3.1 ideas

The specific logic is:

  • gpu_num is the number of GPUs in the global. Later, we want to calculate it according to the global information, because all2all is already a global perspective.
  • Get the sample id of the current sample in the current GPU (in fact, it is bid, and each bid corresponds to a sample), and then process it for this sample id, so as to ensure that only the samples of this GPU are retained. For example, the second sample_id = 1.
  • Get the starting position of the first slot of the current sample, such as 1 * 4 * 8 = 32.
  • Get the size of the embedding vector corresponding to a slot, that is, the stripe between slot and slot = 8
  • Traverse the slots of the sample, ranging from 0 to slot num, in order to copy these slots from all2all to embedding_data_.get_output_tensors, so you need to find the starting position of the slot of this sample in all2all.
  • For each slot, you need to find out which gpu the slot is on.
    • Traverse the GPU. The purpose of traversing the GPU is to find the location of the previous GPU because the slot is allocated according to the GPU. offset_ The final result of pre is the number of slots on the GPU before this slot.
      • The key code here is gpu_id = slot_id % gpu_num, which is used to determine "a slot is found on the buffer from which GPU".
      • For our example, when alltoall is sent, two slots are sent together. Here, reorder needs one slot to find data. At this time, gpu_id is the key point used to find.
    • Get the number of slot s corresponding to each GPU.
    • Get the offset of the current sample in the current GPU.
    • Get the data starting position of the current sample in other slot s.
    • Get the current slot in embedding_ data_. get_ output_ The location of the sotens.
    • Copy the slot corresponding to this sample_ ID information.

The code is as follows:

// reorder operation after all2all in forward propagation
template <typename TypeEmbeddingComp>
__global__ void forward_reorder_kernel(int batch_size_per_gpu, int slot_num, int embedding_vec_size,
                                       int gpu_num, const TypeEmbeddingComp *input,
                                       TypeEmbeddingComp *output) {
  // blockDim.x = embedding_vec_size; // each thread corresponding to one element of embedding
  // vector gridDim.x = batch_size / gpu_num = samples_per_gpu; // each block corresponding to one
  // sample on each GPU Each thread needs to process slot_num slots

  int tid = threadIdx.x;
  int bid = blockIdx.x;

  // The sample id of the current GPU is processed for this sample id later, which can ensure that only the samples of the GPU are retained
  int sample_id = bid;  // sample_id on the current GPU, such as the second sample, sample_id = 1

  if ((bid < batch_size_per_gpu) && (tid < embedding_vec_size)) {
    // The starting position of the first slot of the current sample, such as 1 * 4 * 8 = 32
    int dst_offset =
        sample_id * slot_num * embedding_vec_size;  // offset for the first slot of one sample
    // The size of the embedding vector corresponding to a slot is the stripe = 8 between slots
    int dst_stride = embedding_vec_size;            // stride from slot to slot

    // Traverse the slots of the sample, ranging from 0 to slot num, in order to copy these slots from all2all to embedding_data_.get_output_tensors
    // Therefore, you need to find the starting position of the slot of this sample in all2all
    for (int slot_id = 0; slot_id < slot_num; slot_id++) { 
      int gpu_id = slot_id % gpu_num; // Key code to determine which gpu the slot is on
      int offset_pre = 0;  // offset in previous gpus
      
      // The purpose of traversing the GPU is to find the location of the previous GPU, because the slot is allocated according to the GPU
      // offset_pre finally gets the number of slots on the GPU before this slot
      for (int id = 0; id < gpu_id; id++) { 
        int slot_num_per_gpu = slot_num / gpu_num + ((id < (slot_num % gpu_num)) ? 1 : 0);
        int stride = batch_size_per_gpu * slot_num_per_gpu;
        offset_pre += stride; // Find the front position
      }
      // How many slot s does each GPU correspond to
      int slot_num_per_gpu = slot_num / gpu_num + ((gpu_id < (slot_num % gpu_num)) ? 1 : 0);
      // The offset of the current sample in the current GPU
      int offset_cur = sample_id * slot_num_per_gpu;  // offset in current gpu
      // The current sample is at the data starting position corresponding to other slot s
      // (offset_cur + offset_pre + (int)(slot_id / gpu_num)) is the number of slots in front of this slot
      int src_addr = (offset_cur + offset_pre + (int)(slot_id / gpu_num)) * embedding_vec_size;
      
      // The current slot is embedded_ data_. get_ output_ Target location in tensors  
      int dst_addr = dst_offset + dst_stride * slot_id;
      // Copy the slot corresponding to this sample_ ID information
      output[dst_addr + tid] = input[src_addr + tid];
    }
  }
}

4.3.2 diagram

This is for demonstration. The logic is simplified_ feature_ tensors_, all2all_ tensors_ It should have been a one-dimensional array, which is abstracted into a two-dimensional array.

4.4 slot id

Finally, you need to store the slot id. The reason to save the slot id corresponding to the parameter is that there are different slots on each GPU. Now all slots of a sample should be placed on the same GPU, so you need to know which slot to load when loading.

// store slot_id by row_offset and value_index
template <typename TypeKey, typename TypeValueIndex>
__global__ void store_slot_id_kernel(size_t batch_size,
                                     int slot_num,  // total slot number in hash table
                                     int slot_num_per_gpu,
                                     int gpu_num,  // total gpu number
                                     int gpu_id,   // global gpu device id
                                     const TypeKey *row_offset, const TypeValueIndex *value_index,
                                     TypeValueIndex *slot_id) {
  size_t gid = blockIdx.x * blockDim.x + threadIdx.x;

  if (gid < (batch_size * slot_num_per_gpu)) {
    int sid = gid % slot_num_per_gpu;
    sid = gpu_id + sid * gpu_num;  // global slot id
    if (sid < slot_num) {
      TypeKey offset = row_offset[gid];
      int value_num = row_offset[gid + 1] - offset;
      for (int i = 0; i < value_num; i++) {
        TypeValueIndex index = value_index[offset + i];  // row number
        slot_id[index] = sid;
      }
    }
  }
}

}  // namespace

template <typename TypeKey>
void SparseEmbeddingFunctors::store_slot_id(size_t batch_size, size_t slot_num,
                                            const std::vector<size_t> &slot_num_per_gpu,
                                            const Tensors2<TypeKey> &row_offset_tensors,
                                            const Tensors2<size_t> &value_index_tensors,
                                            Tensors2<size_t> &slot_id_tensors,
                                            const ResourceManager &resource_manager) {
  CudaDeviceContext context;
  size_t local_gpu_count = resource_manager.get_local_gpu_count();
  size_t total_gpu_count = resource_manager.get_global_gpu_count();

  for (size_t id = 0; id < local_gpu_count; id++) {
    if (slot_num_per_gpu[id] == 0) {
      continue;
    }

    const auto &local_gpu = resource_manager.get_local_gpu(id);
    size_t local_device_id = local_gpu->get_device_id();
    size_t global_id = local_gpu->get_global_id();
    const size_t block_size = 64;
    const size_t grid_size = (batch_size * slot_num_per_gpu[id] + block_size - 1) / block_size;

    context.set_device(local_device_id);
    store_slot_id_kernel<<<grid_size, block_size, 0, local_gpu->get_stream()>>>(
        batch_size, slot_num, slot_num_per_gpu[id], total_gpu_count, global_id,
        row_offset_tensors[id].get_ptr(), value_index_tensors[id].get_ptr(),
        slot_id_tensors[id].get_ptr());
  }
}

4.5 output matrix

Let's look at the size of the output dense matrix through a function, which is batch_size_per_gpu * slot_num * embedding_vec_size.

// only used for results check
/**
 * Get the forward() results from GPUs and copy them to the host pointer
 * embedding_feature. This function is only used for unit test.
 * @param embedding_feature the host pointer for storing the forward()
 * results.
 */
void get_forward_results(bool is_train, Tensor2<TypeEmbeddingComp> &embedding_feature) {
  size_t memcpy_size = embedding_data_.get_batch_size_per_gpu(is_train) *
                       embedding_data_.embedding_params_.slot_num *
                       embedding_data_.embedding_params_.embedding_vec_size;

  functors_.get_forward_results(memcpy_size, embedding_data_.get_output_tensors(is_train),
                                embedding_feature, utest_forward_temp_tensors_,
                                embedding_data_.get_resource_manager());

  return;
}

get_batch_size_per_gpu is defined as follows:

size_t get_batch_size_per_gpu(bool is_train) const {
  return embedding_params_.get_batch_size(is_train) / resource_manager_->get_global_gpu_count();
}

0x05 backward propagation

Because the forward propagation has done all2all and backward successively, the backward propagation should first do its reverse operation, and then do backward.

Although we know all2all_backward and backward_reorder is the reverse operation of forward propagation, but the code here is still brain burning. It will be better in combination with the figure.

  /**
   * The first stage of backward propagation of embedding layer,
   * which computes the wgrad by the dgrad from the top layer.
   */
  void backward() override {
    // Read dgrad from output_tensors -> compute wgrad

    // reorder
    functors_.backward_reorder(embedding_data_.get_batch_size_per_gpu(true),
                               embedding_data_.embedding_params_.slot_num,
                               embedding_data_.embedding_params_.embedding_vec_size,
                               embedding_data_.get_output_tensors(true), all2all_tensors_,
                               embedding_data_.get_resource_manager());

		// do all2all
#ifndef ENABLE_MPI
    if (embedding_data_.get_resource_manager().get_global_gpu_count() > 1) {
      functors_.all2all_backward(embedding_data_.get_batch_size_per_gpu(true), slot_num_per_gpu_,
                                 embedding_data_.embedding_params_.embedding_vec_size,
                                 all2all_tensors_, embedding_feature_tensors_,
                                 embedding_data_.get_resource_manager());

    } else {
      CudaDeviceContext context(embedding_data_.get_local_gpu(0).get_device_id());
      CK_CUDA_THROW_(cudaMemcpyAsync(
          embedding_feature_tensors_[0].get_ptr(), all2all_tensors_[0].get_ptr(),
          embedding_data_.get_batch_size_per_gpu(true) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#else
    if (embedding_data_.get_resource_manager().get_global_gpu_count() > 1) {
      functors_.all2all_backward(
          embedding_data_.get_batch_size_per_gpu(true), embedding_data_.embedding_params_.slot_num,
          embedding_data_.embedding_params_.embedding_vec_size, all2all_tensors_,
          embedding_feature_tensors_, embedding_data_.get_resource_manager());

    } else {
      CudaDeviceContext context(embedding_data_.get_local_gpu(0).get_device_id());
      CK_CUDA_THROW_(cudaMemcpyAsync(
          embedding_feature_tensors_[0].get_ptr(), all2all_tensors_[0].get_ptr(),
          embedding_data_.get_batch_size_per_gpu(true) * slot_num_per_gpu_[0] *
              embedding_data_.embedding_params_.embedding_vec_size * sizeof(TypeEmbeddingComp),
          cudaMemcpyDeviceToDevice, embedding_data_.get_local_gpu(0).get_stream()));
    }
#endif

    // do backward
    functors_.backward(embedding_data_.embedding_params_.get_batch_size(true), slot_num_per_gpu_,
                       embedding_data_.embedding_params_.embedding_vec_size,
                       embedding_data_.embedding_params_.combiner,
                       embedding_data_.get_row_offsets_tensors(true), embedding_feature_tensors_,
                       wgrad_tensors_, embedding_data_.get_resource_manager());

    return;
  }

5.1 Reorder backward

The purpose of Reorder backpropagation is to make all gradients above GPUs be dispersed and copied to all2all_tensors_ Different locations. In the following figure, each slot corresponds to a gradient embedding vector, now train_output_tensors_(gradients) is the gradient. Now the gradient above each GPU is a complete gradient of two sample s.

The specific code is as follows. There will be two bid s on each GPU, corresponding to sample 1 and sample 2 respectively:

// reorder operation before all2all in backward propagation
template <typename TypeEmbeddingComp>
__global__ void backward_reorder_kernel(int batch_size_per_gpu, int slot_num,
                                        int embedding_vec_size, int gpu_num,
                                        const TypeEmbeddingComp *input, TypeEmbeddingComp *output) {
  // blockDim.x = embedding_vec_size; // each thread corresponding to one element of embedding
  // vector gridDim.x = batch_size / gpu_num = samples_per_gpu; // each block corresponding to one
  // sample on each GPU Each thread needs to process slot_num slots

  int tid = threadIdx.x;
  int bid = blockIdx.x;

  int sample_id = bid;  // sample_id on the current GPU

  if ((bid < batch_size_per_gpu) && (tid < embedding_vec_size)) {
    // Source: the starting position of the gradient of this sample. GPU0 is 0 and GPU1 is 1*4*embedding_vec_size
    int src_offset = sample_id * slot_num * embedding_vec_size; 
    int src_stride = embedding_vec_size; // Span. This is 4

    for (int slot_id = 0; slot_id < slot_num; slot_id++) { // The value is 0 ~ 3
      int gpu_id = slot_id % gpu_num; // The value is 0 ~ 1
      int offset_pre = 0;  // offset in previous gpus
      for (int id = 0; id < gpu_id; id++) {
        // The value is 2
        int slot_num_per_gpu = slot_num / gpu_num + ((id < (slot_num % gpu_num)) ? 1 : 0);
        // The value is 2 * 2
        int stride = batch_size_per_gpu * slot_num_per_gpu;
        // Find the starting position of all samples in the previous GPU. GPU0 is 0 and GPU1 is 4
        offset_pre += stride; 
      }
      
      // Target location: find the starting location of this sample in the current GPU
      // slot_num_per_gpu = 2
      int slot_num_per_gpu = slot_num / gpu_num + ((gpu_id < (slot_num % gpu_num)) ? 1 : 0);
      // 2*sample_id
      int offset_cur = sample_id * slot_num_per_gpu;  // offset in current gpu
      // It should be noted that embedding_vec_size is 4, but in the figure, we all put embedding_vec_size comes down to a slot
      // If it corresponds to the graph, it is in the unit of slot, embedding_vec_size is 1, so it is simplified as follows:       
      // GPU0=sample_id*2+0+slot_id/gpu_num, sample1 is 0 ~ 1, sample2 is 4 ~ 5
      // GPU1=sample_id*2+4+slot_id/gpu_num, sample1 is 2 ~ 3, sample2 is 6 ~ 7
      int dst_addr = (offset_cur + offset_pre + (int)(slot_id / gpu_num)) * embedding_vec_size;

      // Source position: find the starting position of this sample in the current gradient
      // It should be noted that embedding_vec_size is 4, but in the figure, we all put embedding_vec_size comes down to a slot
      // If it corresponds to the graph, it is in the unit of slot, embedding_vec_size is 1, so it is simplified as follows:      
      // src_offset=sample_id * slot_num
      // src_addr = sample_id * slot_num + slot_id
      // Then src_addr should be: sample_id * slot_num + slot_id
      // Therefore, the value range of GPU0 and GPU1 is sample1=0 ~ 3 and sample2=4 ~ 7
      int src_addr = src_offset + src_stride * slot_id;
      output[dst_addr + tid] = input[src_addr + tid]; // Copy the gradient of this sample to all2all_tensors_  The position that the tensor should be in 
    }
  }
}

5.2 All2all backward

Here is the exchange. The essence is the same as the beginning of forward communication. Send yourself in groups, but only accept what you should accept. Finally, each GPU has only the gradient of its original sample. We can see that the final gradient is the same as the original embedding_feature_tensors_ Exactly, whether it's sample, slot or specific value.

The specific codes are as follows:

/**
 * nccl all2all communication for backward
 * @param batch_size_per_gpu batch size per GPU
 * @param slot_num slot number
 * @param embedding_vec_size embedding vector size
 * @param send_tensors the send tensors of multi GPUs.
 * @param recv_tensors the recv tensors of multi GPUs.
 * @param device_resources all gpus device resources.
 */
template <typename Type>
void SparseEmbeddingFunctors::all2all_backward(size_t batch_size_per_gpu, size_t slot_num,
                                               size_t embedding_vec_size,
                                               const Tensors2<Type> &send_tensors,
                                               Tensors2<Type> &recv_tensors,
                                               const ResourceManager &resource_manager) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count();
  size_t total_gpu_count = resource_manager.get_global_gpu_count();
  size_t num_proc = resource_manager.get_num_process();

  std::vector<const Type *> src(local_gpu_count);
  std::vector<Type *> dst(local_gpu_count);
  for (size_t id = 0; id < local_gpu_count; id++) {
    src[id] = send_tensors[id].get_ptr(); // send_tensors is a list that corresponds to multiple GPU s
    dst[id] = recv_tensors[id].get_ptr(); // recv_tensors is a list that corresponds to multiple GPU s
  }

  std::vector<std::vector<size_t>> send_table(local_gpu_count,
                                              std::vector<size_t>(total_gpu_count));
  std::vector<std::vector<size_t>> recv_table(local_gpu_count,
                                              std::vector<size_t>(total_gpu_count));

  // Fill in receiving partition table, ith Topo GPU receive from jth global GPU
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t global_id = resource_manager.get_local_gpu(i)->get_global_id();
    size_t slot_num_per_gpu =
        slot_num / total_gpu_count + ((global_id < (slot_num % total_gpu_count)) ? 1 : 0);
    size_t element_per_recv = batch_size_per_gpu * slot_num_per_gpu * embedding_vec_size;

    for (size_t j = 0; j < total_gpu_count; j++) {
      recv_table[i][j] = element_per_recv;
    }
  }

  // Fill in sending partition table, ith Topo GPU send to jth global GPU
  for (size_t j = 0; j < total_gpu_count; j++) {
    size_t global_id = j;
    size_t slot_num_per_gpu =
        slot_num / total_gpu_count + ((global_id < (slot_num % total_gpu_count)) ? 1 : 0);
    size_t element_per_send = batch_size_per_gpu * slot_num_per_gpu * embedding_vec_size;

    for (size_t i = 0; i < local_gpu_count; i++) {
      send_table[i][j] = element_per_send;
    }
  }

  std::vector<std::vector<const Type *>> src_pos(local_gpu_count,
                                                 std::vector<const Type *>(total_gpu_count));
  std::vector<std::vector<Type *>> dst_pos(local_gpu_count, std::vector<Type *>(total_gpu_count));
  // Calculate the src offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t src_offset = 0;
    for (size_t j = 0; j < total_gpu_count; j++) {
      src_pos[i][j] = src[i] + src_offset;
      src_offset += send_table[i][j];
    }
  }
  // Calculate the dst offset pointer from each GPU to each other
  for (size_t i = 0; i < local_gpu_count; i++) {
    size_t dst_offset = 0;
    for (size_t j = 0; j < total_gpu_count; j++) {
      dst_pos[i][j] = dst[i] + dst_offset;
      dst_offset += recv_table[i][j];
    }
  }

  // need to know the Type
  ncclDataType_t type;
  switch (sizeof(Type)) {
    case 2:
      type = ncclHalf;
      break;
    case 4:
      type = ncclFloat;
      break;
    default:
      CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
  }

  // Do the all2all transfer
  CK_NCCL_THROW_(ncclGroupStart());
  for (size_t i = 0; i < local_gpu_count; i++) {
    const auto &local_gpu = resource_manager.get_local_gpu(i);
    for (size_t j = 0; j < total_gpu_count; j++) {
      CK_NCCL_THROW_(ncclSend(src_pos[i][j], send_table[i][j], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream()));
      CK_NCCL_THROW_(ncclRecv(dst_pos[i][j], recv_table[i][j], type, j, local_gpu->get_nccl(),
                              local_gpu->get_stream()));
    }
  }
  CK_NCCL_THROW_(ncclGroupEnd());

  return;
}

5.3 backward

Now we get the gradient corresponding to the original sample on the GPU, so we can carry out backward. This part has been introduced before, so we won't repeat it.

// do backward
functors_.backward(embedding_data_.embedding_params_.get_batch_size(true), slot_num_per_gpu_,
                   embedding_data_.embedding_params_.embedding_vec_size,
                   embedding_data_.embedding_params_.combiner,
                   embedding_data_.get_row_offsets_tensors(true), embedding_feature_tensors_,
                   wgrad_tensors_, embedding_data_.get_resource_manager());

0x06 storage

Here is a brief analysis. When storing, rank 0 is responsible for writing files.

Error_t Session::download_params_to_files_(std::string weights_file,
                                           std::string dense_opt_states_file,
                                           const std::vector<std::string>& embedding_files,
                                           const std::vector<std::string>& sparse_opt_state_files) {
  try {
    {
      // Storage parameters
      int i = 0;
      for (auto& embedding_file : embedding_files) {
        embeddings_[i]->dump_parameters(embedding_file);
        i++;
      }
    }

    {
      // Storage optimizer
      int i = 0;
      for (auto& sparse_opt_state_file : sparse_opt_state_files) {
        std::ofstream out_stream_opt(sparse_opt_state_file, std::ofstream::binary);
        embeddings_[i]->dump_opt_states(out_stream_opt);
        out_stream_opt.close();
        i++;
      }
    }

    // The rank 0 node is responsible for writing files
    if (resource_manager_->is_master_process()) {
      std::ofstream out_stream_weight(weights_file, std::ofstream::binary);
      networks_[0]->download_params_to_host(out_stream_weight);

      std::ofstream out_dense_opt_state_weight(dense_opt_states_file, std::ofstream::binary);
      networks_[0]->download_opt_states_to_host(out_dense_opt_state_weight);

      std::string no_trained_params = networks_[0]->get_no_trained_params_in_string();
      if (no_trained_params.length() != 0) {
        std::string ntp_file = weights_file + ".ntp.json";
        std::ofstream out_stream_ntp(ntp_file, std::ofstream::out);
        out_stream_ntp.write(no_trained_params.c_str(), no_trained_params.length());
        out_stream_ntp.close();
      }
      out_stream_weight.close();
      out_dense_opt_state_weight.close();
    }

  } catch (const internal_runtime_error& rt_err) {
    std::cerr << rt_err.what() << std::endl;
    return rt_err.get_error();
  } catch (const std::exception& err) {
    std::cerr << err.what() << std::endl;
    return Error_t::UnspecificError;
  }
  return Error_t::Success;
}

Take optimizer as an example. Other worker nodes send data to rank 0 node. After receiving the data, rank 0 node will process it.

template <typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::dump_opt_states(
    std::ofstream& stream, const ResourceManager& resource_manager,
    std::vector<Tensors2<TypeEmbeddingComp>>& opt_states) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count();

  CudaDeviceContext context;
  for (auto& opt_state : opt_states) {
    size_t total_size = 0;
    for (size_t id = 0; id < local_gpu_count; id++) {
      total_size += opt_state[id].get_size_in_bytes();
    }
    size_t max_size = total_size;

#ifdef ENABLE_MPI
    bool is_master_process = resource_manager.is_master_process();
    CK_MPI_THROW_(MPI_Reduce(is_master_process ? MPI_IN_PLACE : &max_size, &max_size,
                             sizeof(size_t), MPI_CHAR, MPI_MAX,
                             resource_manager.get_master_process_id(), MPI_COMM_WORLD));
#endif

    std::unique_ptr<char[]> h_opt_state(new char[max_size]);
    size_t offset = 0;
    for (size_t id = 0; id < local_gpu_count; id++) {
      size_t local_size = opt_state[id].get_size_in_bytes();
      auto& local_gpu = resource_manager.get_local_gpu(id);
      context.set_device(local_gpu->get_device_id());
      CK_CUDA_THROW_(cudaMemcpyAsync(h_opt_state.get() + offset, opt_state[id].get_ptr(),
                                     local_size, cudaMemcpyDeviceToHost, local_gpu->get_stream()));
      offset += local_size;
    }
    sync_all_gpus(resource_manager);

    int pid = resource_manager.get_process_id();
    if (resource_manager.is_master_process()) {
      // rank 0 is responsible for writing
      stream.write(h_opt_state.get(), total_size);
    }
#ifdef ENABLE_MPI
    else {
      // Other worker nodes send data to the rank 0 node
      int tag = (pid << 8) | 0xBA;
      CK_MPI_THROW_(MPI_Send(h_opt_state.get(), total_size, MPI_CHAR,
                             resource_manager.get_master_process_id(), tag, MPI_COMM_WORLD));
    }

    if (resource_manager.is_master_process()) {
      for (int r = 1; r < resource_manager.get_num_process(); r++) {
        int tag = (r << 8) | 0xBA;
        int recv_size = 0;
        MPI_Status status;
        CK_MPI_THROW_(MPI_Probe(r, tag, MPI_COMM_WORLD, &status));
        CK_MPI_THROW_(MPI_Get_count(&status, MPI_CHAR, &recv_size));
        // The rank 0 node received data
        CK_MPI_THROW_(MPI_Recv(h_opt_state.get(), recv_size, MPI_CHAR, r, tag, MPI_COMM_WORLD,
                               MPI_STATUS_IGNORE));
        stream.write(h_opt_state.get(), recv_size);
      }
    }

#endif
    MESSAGE_("Done");
  }
}

0xFF reference

https://developer.nvidia.com/blog/introducing-merlin-hugectr-training-framework-dedicated-to-recommender-systems/

https://developer.nvidia.com/blog/announcing-nvidia-merlin-application-framework-for-deep-recommender-systems/

https://developer.nvidia.com/blog/accelerating-recommender-systems-training-with-nvidia-merlin-open-beta/

HugeCTR source code reading

How does the embedding layer back propagate

https://web.eecs.umich.edu/~justincj/teaching/eecs442/notes/linear-backprop.html

Sparse matrix storage format summary + storage efficiency comparison: COO,CSR,DIA,ELL,HYB

Making something out of nothing: on the Embedding idea in Recommendation Algorithm

tf. nn. embedding_ Principle of lookup function

Please explain the embedding of tensorflow_ What does the lookup interface mean?

[technical dry goods] talk about how embedding is done in the recommended scenes of large factories

Keywords: Machine Learning

Added by micah1701 on Wed, 09 Mar 2022 14:22:18 +0200