preface
Following the problems mentioned above, this paper studies the flush processing flow of MemStore on HRegionServer, focusing on how to select an hregon to flush to alleviate the pressure of MemStore, and how hregon flush is initiated.
1, How to select an hregon to flush to relieve the pressure of MemStore
As mentioned above, if a FlushQueueEntry pulled from the flushQueue queue queue by the flush processing thread is empty or WakeupFlushThread, and the size of the global MemStore is higher than the low level of the limit value through the isAboveLowWaterMark() method, call the flushOneForGlobalPressure() method to flush a MemStore of the hregon according to certain policies, Reduce the size of the MemStore to prevent abnormal conditions such as OOM.
flushOneForGlobalPressure()
Next, let's analyze the flushOneForGlobalPressure() method. The code is as follows:
private boolean flushOneForGlobalPressure() { // Obtain the online regions on the RegionServer, and arrange them in reverse order according to the memoriesize size of the regions to obtain regionsBySize SortedMap<Long, HRegion> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); // Construct the excluded Region collection excludedRegions Set<HRegion> excludedRegions = new HashSet<HRegion>(); boolean flushedOne = false;// Flag bit while (!flushedOne) {// Cycle once. If it is not selected, cycle until the Region is selected or there is no optional Region // Find the biggest region that doesn't have too many storefiles // (might be null!) // Select a region with the largest memory store and no too many storefiles as the most likely selected region, that is, the best flushable region HRegion bestFlushableRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, true); // Find the biggest region, total, even if it might have too many flushes. // Select a region with the largest Memstore, even if it contains too many storefiles, as the final backup scheme that can be selected, that is, bestAnyRegion HRegion bestAnyRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, false); // If it is above the memory threshold, but there is no region that can flush, it will directly return false if (bestAnyRegion == null) { LOG.error("Above memory mark but there are no flushable regions!"); return false; } HRegion regionToFlush; // Select the region to flush // If the memory size of bestAnyRegion is greater than twice that of bestFlushableRegion, select bestAnyRegion if (bestFlushableRegion != null && bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { // Even if it's not supposed to be flushed, pick a region if it's more than twice // as big as the best flushable one - otherwise when we're under pressure we make // lots of little flushes and cause lots of compactions, etc, which just makes // life worse! if (LOG.isDebugEnabled()) { LOG.debug("Under global heap pressure: " + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + " vs best flushable region's " + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; } else {// Otherwise, bestFlushableRegion is preferred if (bestFlushableRegion == null) { regionToFlush = bestAnyRegion; } else { regionToFlush = bestFlushableRegion; } } // Detection status: the memstoreSize of the selected Region must be greater than 0 Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); // Call the flushRegion() method to flush the MemStore for a single Region flushedOne = flushRegion(regionToFlush, true); if (!flushedOne) {// If flush fails, it will be added to the excludedRegions collection to avoid being selected again next time LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); excludedRegions.add(regionToFlush); } } return true; }
The processing logic of this method is as follows:
1. Obtain the online regions on the RegionServer, and arrange them in reverse order according to the memoriesize size of the regions to obtain regionsBySize;
2. Construct the excluded Region set excludedRegions;
3. The flag bit flushedOne is set to false;
4. Loop until the flag bit flushedOne is true, that is, there is a Region selected, or there is no optional Region at all:
4.1. Cycle regionsBySize and select a region with the largest memory and no too many storefiles as the most likely selected region, that is, the best flushable region:
4.1.1. If the current region is in the excluded regions list, skip it directly;
4.1.2. If the write status of the current region is flush ing, or the write status of the current region is not write enabled, skip directly;
4.1.3. If the number of StoreFiles needs to be checked and there are too many StoreFiles, skip directly;
4.1.4. Otherwise, return to the region;
4.2. Cycle regionsBySize and select a region with the largest Memstore, even if it contains too many storefiles, as the backup scheme that can be selected finally, that is, bestAnyRegion:
4.2.1. If the current region is in the excluded regions list, skip it directly;
4.2.2. If the write status of the current region is flush ing, or the write status of the current region is not write enabled, skip directly;
4.2.3. Otherwise, return to the region;
4.3. If there is no region above the memory threshold that can be flush ed, false will be returned directly;
4.4. Select the region to flush:
4.4.1. If the memory size of bestAnyRegion is greater than twice that of bestFlushableRegion, select bestAnyRegion;
4.4.2 otherwise, bestFlushableRegion is preferred;
4.5. Detection status: the memstoreSize of the selected Region must be greater than 0;
4.6. Call the flushRegion() method to flush MemStore for a single Region;
4.7. If flush fails, it will be added to the excludedRegions collection to avoid being selected again next time.
The above is the method to select an hregon to flush the MemStore according to a certain strategy to alleviate the pressure of the MemStore.
2, How is hregon's flush initiated
Then, the problem of specifying hregon by the remaining flush is consistent with how hregon's flush is initiated.
flushRegion()
Let's first look at the flushRegion() method with one parameter. The flushRegion() method with one parameter actually makes some necessary judgments on the fqe of the FlushRegionEntry type of the package of the hregon to be flushed, and decides whether to flush directly or push it later. Before the first push, split or merge the system if necessary. The specific processing logic is as follows:
1. If the Region is not a MetaRegion and there are too many StoreFiles on the Region:
1.1. Judge the blocking time through isMaximumWait(). If the blocking has reached or exceeded the specified time, record the log and execute flush, skip to 2 and end;
1.2. If it is the first delay, record a log information, and then request Split for the hregon. If the Split is unsuccessful, then request the system to merge systemcomparison;
1.3. Put fqe back to the flushQueue, increase the delay time by 900ms (parameter configurable), and wait until it expires before taking it out of the queue for processing;
1.4. Pretend that the Region is delayed from flush ing, and the result is uncertain, so it should return true;
2. Call the flushRegion() method with two parameters to notify hregon to execute flush.
Judge the blocking time: judge whether the current time minus the creation time is greater than the specified time. The code is as follows:
public boolean isMaximumWait(final long maximumWait) { return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; }
Then analyze the flushRegion() method with two parameters. The code is as follows:
private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { long startTime = 0; synchronized (this.regionsInQueue) { // First remove the corresponding hregon information from regionsInQueue FlushRegionEntry fqe = this.regionsInQueue.remove(region); // Use the start time of the FlushRegionEntry if available if (fqe != null) { // Get the start time of the flush startTime startTime = fqe.createTime; } if (fqe != null && emergencyFlush) { flushQueue.remove(fqe); } } // Get the start time of the flush startTime if (startTime == 0) { startTime = EnvironmentEdgeManager.currentTime(); } // The upper read lock means that it does not conflict with other threads with read locks and can be synchronized, but is mutually exclusive with threads with write locks lock.readLock().lock(); try { // Notify the flush requester of the flush type through the Listener notifyFlushRequest(region, emergencyFlush); // Call the flushcache() method of hregon to execute the flush of MemStore HRegion.FlushResult flushResult = region.flushcache(); // Judge what to do next according to the result of flush // Judge whether to merge compact boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size // Detect whether split should be performed boolean shouldSplit = region.checkSplit() != null; // If necessary, split first and then system compact if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } // If the flush succeeds, obtain the flush end time, calculate the time consumption, and record the measurement information on the hregon if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); server.metricsRegionServer.updateFlushTime(endTime - startTime); } } catch (DroppedSnapshotException ex) { server.abort("Replay of WAL required. Forcing server shutdown", ex); return false; } catch (IOException ex) { LOG.error("Cache flush failed" + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { return false; } } finally { // Release read lock lock.readLock().unlock(); // Wake up other blocked threads wakeUpIfBlocking(); } return true; }
The general logic of the flushRegion() method with two parameters is as follows:
1. The regionsInQueue collection and flushQueue queues are preferred:
1.1. First remove the corresponding hregon information from regionsInQueue. This must be done whether it is an emergency flush or not;
1.2. Get the start time of flush startTime;
1.3 if it is an emergency refresh, the corresponding fqe needs to be removed from the flushQueue queue queue. If it is not an emergency refresh, the fqe will be removed through flushQueue.poll;
2. If startTime is null, get the start time of flush startTime;
3. Read lock means that it does not conflict with other threads that have read locks and can be carried out synchronously, but is mutually exclusive with threads that have write locks (a special article will be written later to analyze the application of locks in various processes within HBase);
4. Notify the flush requester of the flush type through the Listener;
5. Call the flush cache () method of hregon, execute the flush of MemStore, and obtain the flush result;
6. Judge what to do next according to the result of flush:
6.1 judge whether consolidation compact should be performed according to the flush result, that is, the flag bit shouldCompact;
6.2. Call the checkSplit() method of hregon to check whether split split should be performed, that is, the flag bit shouldSplit;
6.3. Judge by two flag bits. If necessary, split first and then system compact;
7. If the flush is successful, obtain the flush end time, calculate the time consumption, and record the measurement information on the hregon;
8. Finally, the read lock is released to wake up other blocked threads.
The Listener notifies the flush requester of the flush type as follows:
private void notifyFlushRequest(HRegion region, boolean emergencyFlush) { // The default type is FlushType.NORMAL FlushType type = FlushType.NORMAL; // If it is an emergency refresh, follow whether to determine the type on the high watermark, which is FlushType.ABOVE_HIGHER_MARK, flushtype.above on the low water level_ LOWER_ MARK if (emergencyFlush) { type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK; } // Add region and type for listeners one by one for (FlushRequestListener listener : flushRequestListeners) { listener.flushRequested(type, region); } }
The flush Result is a static internal class in hregon, including a Result enumeration. The flush results contained in it are as follows:
1,FLUSHED_NO_COMPACTION_NEEDED: flush is successful, but compact is not required;
2,FLUSHED_COMPACTION_NEEDED: flush is successful, and compact needs to be executed at the same time;
3,CANNOT_FLUSH_MEMSTORE_EMPTY: cannot flush because MemStore is empty;
4,CANNOT_FLUSH: cannot flush.
Whether the flush is successful depends on whether the result is FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED to judge whether compact is needed, that is, to see whether the result is FLUSHED_COMPACTION_NEEDED.
summary
The above is what I want to talk about today. Up to now, the flush processing flow of MemStore on HRegionServer has been analyzed.