Hbase source code analysis MemStore flush processing 2021SC@SDUSC

preface

Following the problems mentioned above, this paper studies the flush processing flow of MemStore on HRegionServer, focusing on how to select an hregon to flush to alleviate the pressure of MemStore, and how hregon flush is initiated.

1, How to select an hregon to flush to relieve the pressure of MemStore

As mentioned above, if a FlushQueueEntry pulled from the flushQueue queue queue by the flush processing thread is empty or WakeupFlushThread, and the size of the global MemStore is higher than the low level of the limit value through the isAboveLowWaterMark() method, call the flushOneForGlobalPressure() method to flush a MemStore of the hregon according to certain policies, Reduce the size of the MemStore to prevent abnormal conditions such as OOM.

flushOneForGlobalPressure()

Next, let's analyze the flushOneForGlobalPressure() method. The code is as follows:

private boolean flushOneForGlobalPressure() {
	  
	// Obtain the online regions on the RegionServer, and arrange them in reverse order according to the memoriesize size of the regions to obtain regionsBySize
    SortedMap<Long, HRegion> regionsBySize =
        server.getCopyOfOnlineRegionsSortedBySize();
 
    // Construct the excluded Region collection excludedRegions
    Set<HRegion> excludedRegions = new HashSet<HRegion>();
 
    boolean flushedOne = false;// Flag bit
    while (!flushedOne) {// Cycle once. If it is not selected, cycle until the Region is selected or there is no optional Region
      
      // Find the biggest region that doesn't have too many storefiles
      // (might be null!)
      // Select a region with the largest memory store and no too many storefiles as the most likely selected region, that is, the best flushable region
      HRegion bestFlushableRegion = getBiggestMemstoreRegion(
          regionsBySize, excludedRegions, true);
      
      // Find the biggest region, total, even if it might have too many flushes.
      // Select a region with the largest Memstore, even if it contains too many storefiles, as the final backup scheme that can be selected, that is, bestAnyRegion
      HRegion bestAnyRegion = getBiggestMemstoreRegion(
          regionsBySize, excludedRegions, false);
 
      // If it is above the memory threshold, but there is no region that can flush, it will directly return false
      if (bestAnyRegion == null) {
        LOG.error("Above memory mark but there are no flushable regions!");
        return false;
      }
 
      HRegion regionToFlush;
      
      // Select the region to flush
      // If the memory size of bestAnyRegion is greater than twice that of bestFlushableRegion, select bestAnyRegion
      if (bestFlushableRegion != null &&
          bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
        // Even if it's not supposed to be flushed, pick a region if it's more than twice
        // as big as the best flushable one - otherwise when we're under pressure we make
        // lots of little flushes and cause lots of compactions, etc, which just makes
        // life worse!
        if (LOG.isDebugEnabled()) {
          LOG.debug("Under global heap pressure: " +
            "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
            "store files, but is " +
            StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
            " vs best flushable region's " +
            StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
            ". Choosing the bigger.");
        }
        regionToFlush = bestAnyRegion;
      } else {// Otherwise, bestFlushableRegion is preferred
        if (bestFlushableRegion == null) {
          regionToFlush = bestAnyRegion;
        } else {
          regionToFlush = bestFlushableRegion;
        }
      }
 
      // Detection status: the memstoreSize of the selected Region must be greater than 0
      Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
 
      LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
      
      // Call the flushRegion() method to flush the MemStore for a single Region
      flushedOne = flushRegion(regionToFlush, true);
      if (!flushedOne) {// If flush fails, it will be added to the excludedRegions collection to avoid being selected again next time
        LOG.info("Excluding unflushable region " + regionToFlush +
          " - trying to find a different region to flush.");
        excludedRegions.add(regionToFlush);
      }
    }
    return true;
  }

The processing logic of this method is as follows:

1. Obtain the online regions on the RegionServer, and arrange them in reverse order according to the memoriesize size of the regions to obtain regionsBySize;

2. Construct the excluded Region set excludedRegions;

3. The flag bit flushedOne is set to false;

4. Loop until the flag bit flushedOne is true, that is, there is a Region selected, or there is no optional Region at all:

4.1. Cycle regionsBySize and select a region with the largest memory and no too many storefiles as the most likely selected region, that is, the best flushable region:

4.1.1. If the current region is in the excluded regions list, skip it directly;

4.1.2. If the write status of the current region is flush ing, or the write status of the current region is not write enabled, skip directly;

4.1.3. If the number of StoreFiles needs to be checked and there are too many StoreFiles, skip directly;

4.1.4. Otherwise, return to the region;

4.2. Cycle regionsBySize and select a region with the largest Memstore, even if it contains too many storefiles, as the backup scheme that can be selected finally, that is, bestAnyRegion:

4.2.1. If the current region is in the excluded regions list, skip it directly;

4.2.2. If the write status of the current region is flush ing, or the write status of the current region is not write enabled, skip directly;

4.2.3. Otherwise, return to the region;

4.3. If there is no region above the memory threshold that can be flush ed, false will be returned directly;

4.4. Select the region to flush:

4.4.1. If the memory size of bestAnyRegion is greater than twice that of bestFlushableRegion, select bestAnyRegion;

4.4.2 otherwise, bestFlushableRegion is preferred;

4.5. Detection status: the memstoreSize of the selected Region must be greater than 0;

4.6. Call the flushRegion() method to flush MemStore for a single Region;

4.7. If flush fails, it will be added to the excludedRegions collection to avoid being selected again next time.

The above is the method to select an hregon to flush the MemStore according to a certain strategy to alleviate the pressure of the MemStore.

2, How is hregon's flush initiated

Then, the problem of specifying hregon by the remaining flush is consistent with how hregon's flush is initiated.

flushRegion()

Let's first look at the flushRegion() method with one parameter. The flushRegion() method with one parameter actually makes some necessary judgments on the fqe of the FlushRegionEntry type of the package of the hregon to be flushed, and decides whether to flush directly or push it later. Before the first push, split or merge the system if necessary. The specific processing logic is as follows:
1. If the Region is not a MetaRegion and there are too many StoreFiles on the Region:

1.1. Judge the blocking time through isMaximumWait(). If the blocking has reached or exceeded the specified time, record the log and execute flush, skip to 2 and end;

1.2. If it is the first delay, record a log information, and then request Split for the hregon. If the Split is unsuccessful, then request the system to merge systemcomparison;

1.3. Put fqe back to the flushQueue, increase the delay time by 900ms (parameter configurable), and wait until it expires before taking it out of the queue for processing;

1.4. Pretend that the Region is delayed from flush ing, and the result is uncertain, so it should return true;

2. Call the flushRegion() method with two parameters to notify hregon to execute flush.

Judge the blocking time: judge whether the current time minus the creation time is greater than the specified time. The code is as follows:

 public boolean isMaximumWait(final long maximumWait) {
      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
    }

Then analyze the flushRegion() method with two parameters. The code is as follows:

private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
    long startTime = 0;
    synchronized (this.regionsInQueue) {
      
      // First remove the corresponding hregon information from regionsInQueue
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null) {
        // Get the start time of the flush startTime
    	startTime = fqe.createTime;
      }
      if (fqe != null && emergencyFlush) {
        
        flushQueue.remove(fqe);
     }
    }
    
    // Get the start time of the flush startTime
    if (startTime == 0) {
     
      startTime = EnvironmentEdgeManager.currentTime();
    }
    
    // The upper read lock means that it does not conflict with other threads with read locks and can be synchronized, but is mutually exclusive with threads with write locks
    lock.readLock().lock();
    try {
      
      // Notify the flush requester of the flush type through the Listener
      notifyFlushRequest(region, emergencyFlush);
      
      // Call the flushcache() method of hregon to execute the flush of MemStore
      HRegion.FlushResult flushResult = region.flushcache();
      
      // Judge what to do next according to the result of flush
      
      // Judge whether to merge compact
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      
      // Detect whether split should be performed
      boolean shouldSplit = region.checkSplit() != null;
      
      // If necessary, split first and then system compact
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
      
      // If the flush succeeds, obtain the flush end time, calculate the time consumption, and record the measurement information on the hregon
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        server.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
    } catch (DroppedSnapshotException ex) {
    
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" +
        (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      // Release read lock
      lock.readLock().unlock();
      
      // Wake up other blocked threads
      wakeUpIfBlocking();
    }
    return true;
  }

The general logic of the flushRegion() method with two parameters is as follows:
1. The regionsInQueue collection and flushQueue queues are preferred:

1.1. First remove the corresponding hregon information from regionsInQueue. This must be done whether it is an emergency flush or not;

1.2. Get the start time of flush startTime;

1.3 if it is an emergency refresh, the corresponding fqe needs to be removed from the flushQueue queue queue. If it is not an emergency refresh, the fqe will be removed through flushQueue.poll;

2. If startTime is null, get the start time of flush startTime;

3. Read lock means that it does not conflict with other threads that have read locks and can be carried out synchronously, but is mutually exclusive with threads that have write locks (a special article will be written later to analyze the application of locks in various processes within HBase);

4. Notify the flush requester of the flush type through the Listener;

5. Call the flush cache () method of hregon, execute the flush of MemStore, and obtain the flush result;

6. Judge what to do next according to the result of flush:

6.1 judge whether consolidation compact should be performed according to the flush result, that is, the flag bit shouldCompact;

6.2. Call the checkSplit() method of hregon to check whether split split should be performed, that is, the flag bit shouldSplit;

6.3. Judge by two flag bits. If necessary, split first and then system compact;

7. If the flush is successful, obtain the flush end time, calculate the time consumption, and record the measurement information on the hregon;

8. Finally, the read lock is released to wake up other blocked threads.

The Listener notifies the flush requester of the flush type as follows:

private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
    
	// The default type is FlushType.NORMAL
	FlushType type = FlushType.NORMAL;
    
	// If it is an emergency refresh, follow whether to determine the type on the high watermark, which is FlushType.ABOVE_HIGHER_MARK, flushtype.above on the low water level_ LOWER_ MARK
	if (emergencyFlush) {
      type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
    }
	
	// Add region and type for listeners one by one
    for (FlushRequestListener listener : flushRequestListeners) {
      listener.flushRequested(type, region);
    }
  }

The flush Result is a static internal class in hregon, including a Result enumeration. The flush results contained in it are as follows:
1,FLUSHED_NO_COMPACTION_NEEDED: flush is successful, but compact is not required;

2,FLUSHED_COMPACTION_NEEDED: flush is successful, and compact needs to be executed at the same time;

3,CANNOT_FLUSH_MEMSTORE_EMPTY: cannot flush because MemStore is empty;

4,CANNOT_FLUSH: cannot flush.

Whether the flush is successful depends on whether the result is FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED to judge whether compact is needed, that is, to see whether the result is FLUSHED_COMPACTION_NEEDED.

summary

The above is what I want to talk about today. Up to now, the flush processing flow of MemStore on HRegionServer has been analyzed.

Keywords: Database HBase

Added by jay7981 on Wed, 08 Dec 2021 04:15:59 +0200