2021SC@SDUSC Hbase project code analysis - HFile write to Cell

2021SC@SDUSC

1, Foreword

In this article, let's briefly introduce the main process of HFile writing to Cell

2, Analysis

One of the initiation locations where HFile file cells are written is the preformFlush() method of storeflush when Memstore flush:

  /**
   * Performs memstore flush, writing data from scanner into sink.
   * 
   * @param scanner Scanner to get data from.
   * @param sink Sink to write data to. Could be StoreFile.Writer.
   * @param smallestReadPoint Smallest read point used for the flush.
   */
  protected void performFlush(InternalScanner scanner,
      Compactor.CellSink sink, long smallestReadPoint) throws IOException {
    int compactionKVMax =
      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
    List<Cell> kvs = new ArrayList<Cell>();
    boolean hasMore;
    do {
      hasMore = scanner.next(kvs, compactionKVMax);
      if (!kvs.isEmpty()) {
        for (Cell c : kvs) {
          // If we know that this KV is going to be included always, then let us
          // set its memstoreTS to 0. This will help us save space when writing to
          // disk.

          sink.append(c);
        }
        kvs.clear();
      }
    } while (hasMore);
  }

It writes the Cell to the comparator Sink of cellsink type. In the flushSnapshot() of defaultstoreflush, the caller of performFlush(), first calls createWriterInTmp() of hsstore to generate a storefile Instance writer of writer, and then pass this writer as a parameter sink into performFlush():

        // Write the map out to the disk
        writer = store.createWriterInTmp(
            cellsCount, store.getFamily().getCompression(), false, true, true);
        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          performFlush(scanner, writer, smallestReadPoint);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }

So storefile How are writers of type writer generated? Enter the createWriterInTmp() method of hsstore:

    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
        this.getFileSystem())
            .withFilePath(fs.createTempName())
            .withComparator(comparator)
            .withBloomType(family.getBloomFilterType())
            .withMaxKeyCount(maxKeyCount)
            .withFavoredNodes(favoredNodes)
            .withFileContext(hFileContext)
            .build();

In storefile At the end of the build() method of writerbuilder, a new storefile Writer instance, in its construction method, will generate an hfile Writer instance writer:

      writer = HFile.getWriterFactory(conf, cacheConf)
          .withPath(fs, path)
          .withComparator(comparator)
          .withFavoredNodes(favoredNodes)
          .withFileContext(fileContext)
          .create();

And this HFile Writer instance writer is obtained through WriterFactory in HFile:

  /**
   * Returns the factory to be used to create {@link HFile} writers
   */
  public static final WriterFactory getWriterFactory(Configuration conf,
      CacheConfig cacheConf) {
    int version = getFormatVersion(conf);
    switch (version) {
    case 2:
      return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
    case 3:
      return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
    default:
      throw new IllegalArgumentException("Cannot create writer for HFile " +
          "format version " + version);
    }
  }

HBase contains two types of writerfactory V2: HFileWriterV2 Writerfactory V2 and hfilewriterv3 Writerfactory V3, let's take a look at HFileWriterV2 Writerfactory V2, which create s the hfile The writer instance is actually HFileWriterV2:

    @Override
    public Writer createWriter(FileSystem fs, Path path, 
        FSDataOutputStream ostream,
        KVComparator comparator, HFileContext context) throws IOException {
      context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
      return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
          comparator, context);
      }
    }

Therefore, the initial sink calls the append loop to write to the Cell. In fact, the final call is the append() method of HFileWriterV2:

    public void append(final Cell cell) throws IOException {
      appendGeneralBloomfilter(cell);
      appendDeleteFamilyBloomFilter(cell);
      writer.append(cell);
      trackTimestamps(cell);
    }

The instantiation of this writer is the instantiation of hfilewriter V2 mentioned above. Next, let's focus on the append() method of HFileWriterV2:

  /**
   * Add key/value to file. Keys must be added in an order that agrees with the
   * Comparator passed on construction.
   *
   * @param cell Cell to add. Cannot be empty nor null.
   * @throws IOException
   */
  @Override
  public void append(final Cell cell) throws IOException {
    
	byte[] value = cell.getValueArray();
    int voffset = cell.getValueOffset();
    int vlength = cell.getValueLength();
    
    // checkKey uses comparator to check we are writing in order.
    boolean dupKey = checkKey(cell);
    checkValue(value, voffset, vlength);
    if (!dupKey) {
      checkBlockBoundary();
    }
    if (!fsBlockWriter.isWriting()) {
      newBlock();
    }
    fsBlockWriter.write(cell);
    totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
    totalValueLength += vlength;
    // Are we the first key in this block?
    if (firstCellInBlock == null) {
      // If cell is big, block will be closed and this firstCellInBlock reference will only last
      // a short while.
      firstCellInBlock = cell;
    }
 
    // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
    // indefinetly?
    lastCell = cell;
    
    entryCount++;
    
    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
  }

The logic is as follows:

  1. Get value, voffset and vlength from Cell
  2. Call the checkKey() method to check the given Cell and ensure that the keys are in order. If the dupKey is true, it means that the key has not been changed, otherwise it has been changed
  3. Call checkValue() to check value and make sure it is not empty
  4. The blockc boundary is detected only when the key is changed, and the checkBlockBoundary() method is called
  5. If you need to apply for a new data block, call the newBlock() method to apply. The judgment basis is that the isWriting() of fsBlockWriter returns false
  6. Call the write() method of fsBlockWriter to write to the Cell
  7. Accumulate the key length totalKeyLength and Value length totalValueLength
  8. If necessary, mark the first key written in the data block
  9. Mark last written to Cell
  10. entryCount+1
  11. Update maxmemories

The code of the checkBlockBoundary() method for detecting the block boundary is as follows:

  /**
   * At a block boundary, write all the inline blocks and opens new block.
   *
   * @throws IOException
   */
  protected void checkBlockBoundary() throws IOException {
	if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
      return;
    finishBlock();
    writeInlineBlocks(false);
    newBlock();
  }

It will judge the written size of fsBlockWriter. If the written size of fsBlockWriter is less than the block size defined in hFileContext, it will return directly. Otherwise, it indicates that the block has reached or exceeded the threshold. The following operations are required:

  1. Call the finishBlock() method to end the previous block
  2. Call the writeInlineBlocks() method to write InlineBlocks
  3. Call the newBlock() method to apply for a new block

Above, if there is any error, please correct.

Keywords: HBase

Added by killswitchfilter on Wed, 29 Dec 2021 13:23:28 +0200