background
With the growing size of HDFS clusters, It is inevitable that there will be some "slow nodes" in the cluster, which are mainly manifested in slow network data transmission and slow disk reading and writing. Usually, these slow nodes are difficult to be found. Only when the business job data reading and writing involves these nodes, resulting in the extension of job running time, can we find that the cluster reading and writing is slow, and then locate the specific slow nodes.
Therefore, slow nodes have always been the focus of attention in HDFS cluster operation and maintenance. In Hadoop 2 After 9, the community supports the function of viewing slow nodes from Namenode jmx.
The format of metrics is as follows. It should be noted that the top 5 nodes / disks are displayed here at most:
"SlowPeersReport":[{"SlowNode":"node4","ReportingNodes":["node1"]},{"SlowNode":"node2","ReportingNodes":["node1","node3"]},{"SlowNode":"node1","ReportingNodes":["node2"]}] "SlowDisksReport":[{"SlowDiskID":"dn3:disk1","Latencies":{"WRITE":1000.1}},{"SlowDiskID":"dn2:disk2","Latencies":{"WRITE":1000.1}},{"SlowDiskID":"dn1:disk2","Latencies":{"READ":1000.3}},{"SlowDiskID":"dn1:disk1","Latencies":{"METADATA":1000.1,"READ":1000.8}}]
Network slow monitoring
principle
The principle of monitoring the slow transmission of A DN network is to record the time-consuming packet data transmission between DNS in the cluster, find out the abnormal values and report them to NN as slow nodes. Under normal conditions, the transmission rate between nodes is basically the same, and there will be no much difference. If there is an abnormal time-consuming of A to B, A will report to NN that B is A slow node.
In order to calculate the average time spent by the DN transmitting data downstream, a Map < string, linkedblockingdeque < SumAndCount > > is maintained inside the DN. The key of the Map is the ip of the downstream DN, and the value is a queue for storing the SumAndCount object, which is used to record the number and time spent transmitting packet s to the downstream DN.
When sending heartbeat, DN will judge whether to generate SlowPeerReport and send it to NN as part of heartbeat information. The generation cycle of SlowPeerReport is determined by DFS datanode. outliers. report. Controlled by the interval parameter, the default is 30min. First, take out all packet s from the queue and calculate the average transmission time. Then, according to these average latency, calculate the slow node reporting threshold upperLimitLatency. When the average latency of a node is greater than the upperLimitLatency, it is considered that the node belongs to a network slow node and is reported by DN1. Finally, the corresponding SlowPeerReport is generated and reported to NN through heartbeat.
Calculation logic of slow node threshold upperLimitLatency
First calculate the median median of all downstream DN transmission time, and then calculate the median absolute deviation mad:
// MAD_MULTIPLIER = 1.4826 mad = median(|list[i]-median(list)|) * MAD_MULTIPLIER
The final upperLimitLatency is:
// lowThresholdMs = 5ms upperLimitLatency = max(lowThresholdMs, median * 3, median + mad * 3)
The code details are as follows:
org.apache.hadoop.hdfs.server.datanode.metrics.OutlierDetector.java
public Map<String, Double> getOutliers(Map<String, Double> stats) { // minNumResources=10, less than 10 nodes do not participate in the calculation if (stats.size() < minNumResources) { LOG.debug("Skipping statistical outlier detection as we don't have " + "latency data for enough resources. Have {}, need at least {}", stats.size(), minNumResources); return ImmutableMap.of(); } final List<Double> sorted = new ArrayList<>(stats.values()); Collections.sort(sorted); // Calculate median final Double median = computeMedian(sorted); // Calculate the median absolute deviation mad final Double mad = computeMad(sorted); // Calculate exception threshold upperLimitLatency Double upperLimitLatency = Math.max( lowThresholdMs, median * MEDIAN_MULTIPLIER); upperLimitLatency = Math.max( upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad)); final Map<String, Double> slowResources = new HashMap<>(); // Find out the nodes larger than the exception threshold for (Map.Entry<String, Double> entry : stats.entrySet()) { if (entry.getValue() > upperLimitLatency) { slowResources.put(entry.getKey(), entry.getValue()); } } return slowResources; } public static Double computeMad(List<Double> sortedValues) { ... // Calculate the median Double median = computeMedian(sortedValues); List<Double> deviations = new ArrayList<>(sortedValues); // Calculate absolute deviation for (int i = 0; i < sortedValues.size(); ++i) { deviations.set(i, Math.abs(sortedValues.get(i) - median)); } Collections.sort(deviations); // MAD_MULTIPLIER = 1.4826 return computeMedian(deviations) * MAD_MULTIPLIER; } public static Double computeMedian(List<Double> sortedValues) { ... Double median = sortedValues.get(sortedValues.size() / 2); if (sortedValues.size() % 2 == 0) { median += sortedValues.get((sortedValues.size() / 2) - 1); median /= 2; } return median; }
Monitoring process code analysis
org.apache.hadoop.hdfs.server.datanode.DataNode.class
First, create a DataNodePeerMetrics object in the startDataNode method of DataNode
void startDataNode(List<StorageLocation> dataDirectories, SecureResources resources ) throws IOException { ... // peerStatsEnabled by DFS datanode. peer. stats. The enabled parameter determines peerMetrics = dnConf.peerStatsEnabled ? DataNodePeerMetrics.create(getDisplayName(), getConf()) : null; ... }
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.java
Record the packet transmission time in the BlockReceiver class, and then write it to the DataNodePeerMetrics object
private int receivePacket() throws IOException { ... //First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { try { // Record start time long begin = Time.monotonicNow(); DataNodeFaultInjector.get().stopSendingPacketDownstream(mirrorAddr); packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); long now = Time.monotonicNow(); this.lastSentTime.set(now); // Calculating packet transmission time long duration = now - begin; DataNodeFaultInjector.get().logDelaySendingPacketDownstream( mirrorAddr, duration); // Write time-consuming data to DataNodePeerMetrics trackSendPacketToLastNodeInPipeline(duration); if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) { LOG.warn("Slow BlockReceiver write packet to mirror took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + "downstream DNs=" + Arrays.toString(downstreamDNs) + ", blockId=" + replicaInfo.getBlockId()); } } catch (IOException e) { handleMirrorOutError(e); } } ... } private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) { // Gets the DataNodePeerMetrics object final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics(); // Whether peerMetrics is null is determined by DFS datanode. peer. stats. The enabled parameter determines if (peerMetrics != null && isPenultimateNode) { peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs); } }
org.apache.hadoop.hdfs.server.datanode.BPServiceActor.java
When BPServiceActor class sends heartbeat, it takes out slow node data from DataNodePeerMetrics object to form SlowDiskReports and sends it to NN
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) throws IOException { ... // Calculate whether the interval is reached (default 30min) final boolean outliersReportDue = scheduler.isOutliersReportDue(now); // Generate slow node report final SlowPeerReports slowPeers = outliersReportDue && dn.getPeerMetrics() != null ? SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) : SlowPeerReports.EMPTY_REPORT; final SlowDiskReports slowDisks = outliersReportDue && dn.getDiskMetrics() != null ? SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary, requestBlockReportLease, // Slow node reports are sent to NN at random slowPeers, slowDisks); ... }
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.java
When the report information reaches the NameNode, it will be processed in the handleHeartbeat of the DatanodeManager
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throws IOException { ... // Whether slowPeerTracker is empty is determined by DFS datanode. peer. stats. The enabled parameter determines if (slowPeerTracker != null) { final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers(); if (!slowPeersMap.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap); } for (String slowNodeId : slowPeersMap.keySet()) { // Summarize the slow node information into the slowPeerTracker object slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false)); } } } ... }
org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.java
getJsonReports of SlowPeerTracker will eventually be called by NN to generate slow node json data
private Collection<ReportForJson> getJsonReports(int numNodes) { ... // Create a queue to sort the number of reporting nodes of slow nodes final PriorityQueue<ReportForJson> topNReports = new PriorityQueue<>(allReports.size(), new Comparator<ReportForJson>() { @Override public int compare(ReportForJson o1, ReportForJson o2) { return Ints.compare(o1.reportingNodes.size(), o2.reportingNodes.size()); } }); // Record current time final long now = timer.monotonicNow(); for (Map.Entry<String, ConcurrentMap<String, Long>> entry : allReports.entrySet()) { // Filter out overdue slow node reports SortedSet<String> validReports = filterNodeReports( entry.getValue(), now); if (!validReports.isEmpty()) { // numNodes is fixed to 5, and the top 5 nodes are selected if (topNReports.size() < numNodes) { topNReports.add(new ReportForJson(entry.getKey(), validReports)); } else if (topNReports.peek().getReportingNodes().size() < validReports.size()){ topNReports.poll(); topNReports.add(new ReportForJson(entry.getKey(), validReports)); } } } return topNReports; }
Community related patch
https://issues.apache.org/jira/browse/HDFS-10917(Collect peer performance statistics on DataNode.)
https://issues.apache.org/jira/browse/HDFS-11194(Maintain aggregated peer performance metrics on NameNode)
Related parameters
<property> <name>dfs.datanode.peer.stats.enabled</name> <value>false</value> <description>A switch to turn on/off tracking DataNode peer statistics.</description> </property> <property> <name>dfs.datanode.peer.metrics.min.outlier.detection.samples</name> <value>1000</value> <description>Minimum number of packet send samples which are required to qualify for outlier detection. If the number of samples is below this then outlier detection is skipped.</description> </property> <property> <name>dfs.datanode.outliers.report.interval</name> <value>30m</value> <description>This setting controls how frequently DataNodes will report their peer latencies to the NameNode via heartbeats.</description> </property>
Disk slow monitoring
principle
The principle of monitoring the slow down of a disk is to record the time-consuming reading and writing operations of all disks of a node, find out the abnormal values and report them to NN as slow disks.
When DataNode starts, if DFS DataNode. fileio. profiling. sampling. If the percentage parameter is greater than 0, a DataNodeDiskMetrics object will be initialized. After the DataNodeDiskMetrics object is initialized, a background thread will be started every DFS DataNode. outliers. report. Interval (30min by default), take out the average speed of metadata, read and write operations of each disk from datanodevomemetrics, and then calculate the slow disk reporting threshold upperLimitLatency (the calculation logic here is the same as that of the network slow node). When the average speed of an operation of a disk is greater than upperLimitLatency, it is considered that the disk belongs to a slow disk, and the SlowDiskReports object will be generated and reported to NN through heartbeat.
Monitoring process code analysis
org.apache.hadoop.hdfs.server.datanode.DataNode.java
The first is to create a DataNodeDiskMetrics object in the startDataNode method of DataNode
void startDataNode(List<StorageLocation> dataDirectories, SecureResources resources ) throws IOException { ... // diskStatsEnabled by DFS datanode. fileio. profiling. sampling. Percentage parameter setting if (dnConf.diskStatsEnabled) { // Create DataNodeDiskMetrics object diskMetrics = new DataNodeDiskMetrics(this, dnConf.outliersReportIntervalMs); } ... }
org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics.java
In the DataNodeDiskMetrics class, a disk check thread will be started to calculate the disks with slow metadata, readIo and writeIo operations
public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) { this.dn = dn; // The inspection interval is determined by DFS datanode. outliers. report. Interval parameter setting this.detectionInterval = diskOutlierDetectionIntervalMs; slowDiskDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_DISKS, SLOW_DISK_LOW_THRESHOLD_MS); shouldRun = true; // Open disk exception check thread startDiskOutlierDetectionThread(); } private void startDiskOutlierDetectionThread() { slowDiskDetectionDaemon = new Daemon(new Runnable() { @Override public void run() { while (shouldRun) { if (dn.getFSDataset() != null) { // Initializing a Map of time-consuming data stores for disk operations Map<String, Double> metadataOpStats = Maps.newHashMap(); Map<String, Double> readIoStats = Maps.newHashMap(); Map<String, Double> writeIoStats = Maps.newHashMap(); FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null; try { // Get all disks of DataNode fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences(); Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences .iterator(); // Traverse each disk while (volumeIterator.hasNext()) { FsVolumeSpi volume = volumeIterator.next(); // Gets the datanodevomemetrics object DataNodeVolumeMetrics metrics = volume.getMetrics(); // Get disk path String volumeName = volume.getBaseURI().getPath(); // Save the average time-consuming data of disk read and write operations into the Map metadataOpStats.put(volumeName, metrics.getMetadataOperationMean()); readIoStats.put(volumeName, metrics.getReadIoMean()); writeIoStats.put(volumeName, metrics.getWriteIoMean()); } } finally { if (fsVolumeReferences != null) { try { fsVolumeReferences.close(); } catch (IOException e) { LOG.error("Error in releasing FS Volume references", e); } } } if (metadataOpStats.isEmpty() && readIoStats.isEmpty() && writeIoStats.isEmpty()) { LOG.debug("No disk stats available for detecting outliers."); continue; } // Check for slow disks detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, writeIoStats); } try { Thread.sleep(detectionInterval); } catch (InterruptedException e) { LOG.error("Disk Outlier Detection thread interrupted", e); Thread.currentThread().interrupt(); } } } }); slowDiskDetectionDaemon.start(); } private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats, Map<String, Double> readIoStats, Map<String, Double> writeIoStats) { Map<String, Map<DiskOp, Double>> diskStats = Maps.newHashMap(); // Get MetadataO slow disk Map<String, Double> metadataOpOutliers = slowDiskDetector .getOutliers(metadataOpStats); for (Map.Entry<String, Double> entry : metadataOpOutliers.entrySet()) { addDiskStat(diskStats, entry.getKey(), DiskOp.METADATA, entry.getValue()); } // Get a disk with slow ReadIo operation Map<String, Double> readIoOutliers = slowDiskDetector .getOutliers(readIoStats); for (Map.Entry<String, Double> entry : readIoOutliers.entrySet()) { addDiskStat(diskStats, entry.getKey(), DiskOp.READ, entry.getValue()); } // Get the disk with slow WriteIo operation Map<String, Double> writeIoOutliers = slowDiskDetector .getOutliers(writeIoStats); for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) { addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue()); } if (overrideStatus) { // Assign slow disk data to diskoutliers stats diskOutliersStats = diskStats; LOG.debug("Updated disk outliers."); } }
org.apache.hadoop.hdfs.server.datanode.BPServiceActor.java
Similarly, when BPServiceActor class sends heartbeat, it takes out slow disk data from DataNodeDiskMetrics object to form SlowDiskReports and sends them to NN
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) throws IOException { ... // Calculate whether the interval is reached (default 30min) final boolean outliersReportDue = scheduler.isOutliersReportDue(now); final SlowPeerReports slowPeers = outliersReportDue && dn.getPeerMetrics() != null ? SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) : SlowPeerReports.EMPTY_REPORT; // Generate slow disk report final SlowDiskReports slowDisks = outliersReportDue && dn.getDiskMetrics() != null ? SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary, requestBlockReportLease, slowPeers, // The slow disk report is sent to NN at random slowDisks); ... }
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.java
When the report information reaches the NameNode, it will be processed in the handleHeartbeat of the DatanodeManager
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throws IOException { ... // Whether slowDiskTracker is empty is determined by DFS datanode. fileio. profiling. sampling. The percentage parameter determines if (slowDiskTracker != null) { if (!slowDisks.getSlowDisks().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("DataNode " + nodeReg + " reported slow disks: " + slowDisks.getSlowDisks()); } // The slow disk information is stored in the slowDiskTracker object slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks); } slowDiskTracker.checkAndUpdateReportIfNecessary(); } ... }
org.apache.hadoop.hdfs.server.blockmanagement.SlowDiskTracker.java
The logic of generating slow disk json data in SlowDiskTracker is basically the same as that of the slow node above
private ArrayList<DiskLatency> getSlowDisks( Map<String, DiskLatency> reports, int numDisks, long now) { ... // Generate a queue for sorting final PriorityQueue<DiskLatency> topNReports = new PriorityQueue<>( reports.size(), new Comparator<DiskLatency>() { @Override public int compare(DiskLatency o1, DiskLatency o2) { return Doubles.compare( o1.getMaxLatency(), o2.getMaxLatency()); } }); ArrayList<DiskLatency> oldSlowDiskIDs = Lists.newArrayList(); for (Map.Entry<String, DiskLatency> entry : reports.entrySet()) { DiskLatency diskLatency = entry.getValue(); // Filter expired slow disk reports if (now - diskLatency.timestamp < reportValidityMs) { // numDisks is fixed to 5 and generates top 5 disks if (topNReports.size() < numDisks) { topNReports.add(diskLatency); } else if (topNReports.peek().getMaxLatency() < diskLatency.getMaxLatency()) { topNReports.poll(); topNReports.add(diskLatency); } } else { oldSlowDiskIDs.add(diskLatency); } } oldSlowDisksCheck = oldSlowDiskIDs; return Lists.newArrayList(topNReports); }
Community related patch
https://issues.apache.org/jira/browse/HDFS-10959(Adding per disk IO statistics and metrics in DataNode.)
https://issues.apache.org/jira/browse/HDFS-11545(Propagate DataNode's slow disks info to the NameNode via Heartbeat)
https://issues.apache.org/jira/browse/HDFS-11551(Handle SlowDiskReport from DataNode at the NameNode)
Related parameters
<property> <name>dfs.datanode.fileio.profiling.sampling.percentage</name> <value>0</value> <description>This setting controls the percentage of file I/O events which will be profiled for DataNode disk statistics. The default value of 0 disables disk statistics. Set to an integer value between 1 and 100 to enable disk statistics.</description> </property>
reference material
https://blog.csdn.net/pengzhouzhou/article/details/109664302