HDFS slow node monitoring principle and source code analysis

background

With the growing size of HDFS clusters, It is inevitable that there will be some "slow nodes" in the cluster, which are mainly manifested in slow network data transmission and slow disk reading and writing. Usually, these slow nodes are difficult to be found. Only when the business job data reading and writing involves these nodes, resulting in the extension of job running time, can we find that the cluster reading and writing is slow, and then locate the specific slow nodes.

Therefore, slow nodes have always been the focus of attention in HDFS cluster operation and maintenance. In Hadoop 2 After 9, the community supports the function of viewing slow nodes from Namenode jmx.

The format of metrics is as follows. It should be noted that the top 5 nodes / disks are displayed here at most:

"SlowPeersReport":[{"SlowNode":"node4","ReportingNodes":["node1"]},{"SlowNode":"node2","ReportingNodes":["node1","node3"]},{"SlowNode":"node1","ReportingNodes":["node2"]}]

"SlowDisksReport":[{"SlowDiskID":"dn3:disk1","Latencies":{"WRITE":1000.1}},{"SlowDiskID":"dn2:disk2","Latencies":{"WRITE":1000.1}},{"SlowDiskID":"dn1:disk2","Latencies":{"READ":1000.3}},{"SlowDiskID":"dn1:disk1","Latencies":{"METADATA":1000.1,"READ":1000.8}}]

Network slow monitoring

principle

The principle of monitoring the slow transmission of A DN network is to record the time-consuming packet data transmission between DNS in the cluster, find out the abnormal values and report them to NN as slow nodes. Under normal conditions, the transmission rate between nodes is basically the same, and there will be no much difference. If there is an abnormal time-consuming of A to B, A will report to NN that B is A slow node.

In order to calculate the average time spent by the DN transmitting data downstream, a Map < string, linkedblockingdeque < SumAndCount > > is maintained inside the DN. The key of the Map is the ip of the downstream DN, and the value is a queue for storing the SumAndCount object, which is used to record the number and time spent transmitting packet s to the downstream DN.

When sending heartbeat, DN will judge whether to generate SlowPeerReport and send it to NN as part of heartbeat information. The generation cycle of SlowPeerReport is determined by DFS datanode. outliers. report. Controlled by the interval parameter, the default is 30min. First, take out all packet s from the queue and calculate the average transmission time. Then, according to these average latency, calculate the slow node reporting threshold upperLimitLatency. When the average latency of a node is greater than the upperLimitLatency, it is considered that the node belongs to a network slow node and is reported by DN1. Finally, the corresponding SlowPeerReport is generated and reported to NN through heartbeat.

Calculation logic of slow node threshold upperLimitLatency

First calculate the median median of all downstream DN transmission time, and then calculate the median absolute deviation mad:

// MAD_MULTIPLIER = 1.4826
mad = median(|list[i]-median(list)|) * MAD_MULTIPLIER

The final upperLimitLatency is:

// lowThresholdMs = 5ms
upperLimitLatency = max(lowThresholdMs, median * 3, median + mad * 3)

The code details are as follows:
org.apache.hadoop.hdfs.server.datanode.metrics.OutlierDetector.java

public Map<String, Double> getOutliers(Map<String, Double> stats) {
    // minNumResources=10, less than 10 nodes do not participate in the calculation
    if (stats.size() < minNumResources) {
      LOG.debug("Skipping statistical outlier detection as we don't have " +
              "latency data for enough resources. Have {}, need at least {}",
          stats.size(), minNumResources);
      return ImmutableMap.of();
    }
    final List<Double> sorted = new ArrayList<>(stats.values());
    Collections.sort(sorted);
    // Calculate median
    final Double median = computeMedian(sorted);
    // Calculate the median absolute deviation mad
    final Double mad = computeMad(sorted);
    // Calculate exception threshold upperLimitLatency
    Double upperLimitLatency = Math.max(
        lowThresholdMs, median * MEDIAN_MULTIPLIER);
    upperLimitLatency = Math.max(
        upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));

    final Map<String, Double> slowResources = new HashMap<>();

    // Find out the nodes larger than the exception threshold
    for (Map.Entry<String, Double> entry : stats.entrySet()) {
      if (entry.getValue() > upperLimitLatency) {
        slowResources.put(entry.getKey(), entry.getValue());
      }
    }

    return slowResources;
}

public static Double computeMad(List<Double> sortedValues) {
    ...
    // Calculate the median
    Double median = computeMedian(sortedValues);
    List<Double> deviations = new ArrayList<>(sortedValues);

    // Calculate absolute deviation
    for (int i = 0; i < sortedValues.size(); ++i) {
      deviations.set(i, Math.abs(sortedValues.get(i) - median));
    }

    Collections.sort(deviations);
    // MAD_MULTIPLIER = 1.4826
    return computeMedian(deviations) * MAD_MULTIPLIER;
}

public static Double computeMedian(List<Double> sortedValues) {
    ...
    Double median = sortedValues.get(sortedValues.size() / 2);
    if (sortedValues.size() % 2 == 0) {
      median += sortedValues.get((sortedValues.size() / 2) - 1);
      median /= 2;
    }
    return median;
}

Monitoring process code analysis

org.apache.hadoop.hdfs.server.datanode.DataNode.class
First, create a DataNodePeerMetrics object in the startDataNode method of DataNode

void startDataNode(List<StorageLocation> dataDirectories,
                     SecureResources resources
                     ) throws IOException {
   ...
   // peerStatsEnabled by DFS datanode. peer. stats. The enabled parameter determines
   peerMetrics = dnConf.peerStatsEnabled ?
        DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
   ...
}

org.apache.hadoop.hdfs.server.datanode.BlockReceiver.java
Record the packet transmission time in the BlockReceiver class, and then write it to the DataNodePeerMetrics object

private int receivePacket() throws IOException {
    ...
    //First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        // Record start time
        long begin = Time.monotonicNow();
        DataNodeFaultInjector.get().stopSendingPacketDownstream(mirrorAddr);
        packetReceiver.mirrorPacketTo(mirrorOut);
        mirrorOut.flush();
        long now = Time.monotonicNow();
        this.lastSentTime.set(now);
        // Calculating packet transmission time
        long duration = now - begin;
        DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
            mirrorAddr,
            duration);
        // Write time-consuming data to DataNodePeerMetrics
        trackSendPacketToLastNodeInPipeline(duration);
        if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
          LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
              + "downstream DNs=" + Arrays.toString(downstreamDNs)
              + ", blockId=" + replicaInfo.getBlockId());
        }
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }
    ...
}

private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
    // Gets the DataNodePeerMetrics object
    final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
    // Whether peerMetrics is null is determined by DFS datanode. peer. stats. The enabled parameter determines
    if (peerMetrics != null && isPenultimateNode) {
      peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
    }
  }

org.apache.hadoop.hdfs.server.datanode.BPServiceActor.java
When BPServiceActor class sends heartbeat, it takes out slow node data from DataNodePeerMetrics object to form SlowDiskReports and sends it to NN

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
      throws IOException {
    ...
    // Calculate whether the interval is reached (default 30min)
    final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
    // Generate slow node report
    final SlowPeerReports slowPeers =
        outliersReportDue && dn.getPeerMetrics() != null ?
            SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
            SlowPeerReports.EMPTY_REPORT;
    final SlowDiskReports slowDisks =
        outliersReportDue && dn.getDiskMetrics() != null ?
            SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
            SlowDiskReports.EMPTY_REPORT;

    HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease,
        // Slow node reports are sent to NN at random
        slowPeers,
        slowDisks);
    ...
  }

org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.java
When the report information reaches the NameNode, it will be processed in the handleHeartbeat of the DatanodeManager

public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary,
      @Nonnull SlowPeerReports slowPeers,
      @Nonnull SlowDiskReports slowDisks) throws IOException {
	  ...
	  // Whether slowPeerTracker is empty is determined by DFS datanode. peer. stats. The enabled parameter determines
	  if (slowPeerTracker != null) {
      final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
      if (!slowPeersMap.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
              slowPeersMap);
        }
        for (String slowNodeId : slowPeersMap.keySet()) {
          // Summarize the slow node information into the slowPeerTracker object
          slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
        }
      }
    }
    ...
}

org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.java
getJsonReports of SlowPeerTracker will eventually be called by NN to generate slow node json data

private Collection<ReportForJson> getJsonReports(int numNodes) {
    ...
	// Create a queue to sort the number of reporting nodes of slow nodes
    final PriorityQueue<ReportForJson> topNReports =
        new PriorityQueue<>(allReports.size(),
            new Comparator<ReportForJson>() {
          @Override
          public int compare(ReportForJson o1, ReportForJson o2) {
            return Ints.compare(o1.reportingNodes.size(),
                o2.reportingNodes.size());
          }
        });
    // Record current time
    final long now = timer.monotonicNow();

    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
        allReports.entrySet()) {
      // Filter out overdue slow node reports
      SortedSet<String> validReports = filterNodeReports(
          entry.getValue(), now);
      if (!validReports.isEmpty()) {
        // numNodes is fixed to 5, and the top 5 nodes are selected
        if (topNReports.size() < numNodes) {
          topNReports.add(new ReportForJson(entry.getKey(), validReports));
        } else if (topNReports.peek().getReportingNodes().size() <
            validReports.size()){
          topNReports.poll();
          topNReports.add(new ReportForJson(entry.getKey(), validReports));
        }
      }
    }
    return topNReports;
}

Community related patch

https://issues.apache.org/jira/browse/HDFS-10917(Collect peer performance statistics on DataNode.)
https://issues.apache.org/jira/browse/HDFS-11194(Maintain aggregated peer performance metrics on NameNode)

Related parameters

<property>
	<name>dfs.datanode.peer.stats.enabled</name>
	<value>false</value>
	<description>A switch to turn on/off tracking DataNode peer statistics.</description>
</property>
<property>
	<name>dfs.datanode.peer.metrics.min.outlier.detection.samples</name>
	<value>1000</value>
	<description>Minimum number of packet send samples which are required to qualify for outlier detection. If the number of samples is below this then outlier detection is skipped.</description>
</property>
<property>
	<name>dfs.datanode.outliers.report.interval</name>
	<value>30m</value>
	<description>This setting controls how frequently DataNodes will report their peer latencies to the NameNode via heartbeats.</description>
</property>

Disk slow monitoring

principle

The principle of monitoring the slow down of a disk is to record the time-consuming reading and writing operations of all disks of a node, find out the abnormal values and report them to NN as slow disks.

When DataNode starts, if DFS DataNode. fileio. profiling. sampling. If the percentage parameter is greater than 0, a DataNodeDiskMetrics object will be initialized. After the DataNodeDiskMetrics object is initialized, a background thread will be started every DFS DataNode. outliers. report. Interval (30min by default), take out the average speed of metadata, read and write operations of each disk from datanodevomemetrics, and then calculate the slow disk reporting threshold upperLimitLatency (the calculation logic here is the same as that of the network slow node). When the average speed of an operation of a disk is greater than upperLimitLatency, it is considered that the disk belongs to a slow disk, and the SlowDiskReports object will be generated and reported to NN through heartbeat.

Monitoring process code analysis

org.apache.hadoop.hdfs.server.datanode.DataNode.java
The first is to create a DataNodeDiskMetrics object in the startDataNode method of DataNode

void startDataNode(List<StorageLocation> dataDirectories,
                     SecureResources resources
                     ) throws IOException {
   ...
   // diskStatsEnabled by DFS datanode. fileio. profiling. sampling. Percentage parameter setting
   if (dnConf.diskStatsEnabled) {
      // Create DataNodeDiskMetrics object
      diskMetrics = new DataNodeDiskMetrics(this,
          dnConf.outliersReportIntervalMs);
   }
   ...
}

org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics.java
In the DataNodeDiskMetrics class, a disk check thread will be started to calculate the disks with slow metadata, readIo and writeIo operations

public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
    this.dn = dn;
    // The inspection interval is determined by DFS datanode. outliers. report. Interval parameter setting
    this.detectionInterval = diskOutlierDetectionIntervalMs;
    slowDiskDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_DISKS,
        SLOW_DISK_LOW_THRESHOLD_MS);
    shouldRun = true;
    // Open disk exception check thread
    startDiskOutlierDetectionThread();
}

private void startDiskOutlierDetectionThread() {
    slowDiskDetectionDaemon = new Daemon(new Runnable() {
      @Override
      public void run() {
        while (shouldRun) {
          if (dn.getFSDataset() != null) {
            // Initializing a Map of time-consuming data stores for disk operations
            Map<String, Double> metadataOpStats = Maps.newHashMap();
            Map<String, Double> readIoStats = Maps.newHashMap();
            Map<String, Double> writeIoStats = Maps.newHashMap();
            FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null;
            try {
              // Get all disks of DataNode
              fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences();
              Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences
                  .iterator();
              // Traverse each disk
              while (volumeIterator.hasNext()) {
                FsVolumeSpi volume = volumeIterator.next();
                // Gets the datanodevomemetrics object
                DataNodeVolumeMetrics metrics = volume.getMetrics();
                // Get disk path
                String volumeName = volume.getBaseURI().getPath();

				// Save the average time-consuming data of disk read and write operations into the Map
                metadataOpStats.put(volumeName,
                    metrics.getMetadataOperationMean());
                readIoStats.put(volumeName, metrics.getReadIoMean());
                writeIoStats.put(volumeName, metrics.getWriteIoMean());
              }
            } finally {
              if (fsVolumeReferences != null) {
                try {
                  fsVolumeReferences.close();
                } catch (IOException e) {
                  LOG.error("Error in releasing FS Volume references", e);
                }
              }
            }
            if (metadataOpStats.isEmpty() && readIoStats.isEmpty()
                && writeIoStats.isEmpty()) {
              LOG.debug("No disk stats available for detecting outliers.");
              continue;
            }
			// Check for slow disks
            detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
                writeIoStats);
          }

          try {
            Thread.sleep(detectionInterval);
          } catch (InterruptedException e) {
            LOG.error("Disk Outlier Detection thread interrupted", e);
            Thread.currentThread().interrupt();
          }
        }
      }
    });
    slowDiskDetectionDaemon.start();
  }

private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
      Map<String, Double> readIoStats, Map<String, Double> writeIoStats) {
    Map<String, Map<DiskOp, Double>> diskStats = Maps.newHashMap();

    // Get MetadataO slow disk
    Map<String, Double> metadataOpOutliers = slowDiskDetector
        .getOutliers(metadataOpStats);
    for (Map.Entry<String, Double> entry : metadataOpOutliers.entrySet()) {
      addDiskStat(diskStats, entry.getKey(), DiskOp.METADATA, entry.getValue());
    }

    // Get a disk with slow ReadIo operation
    Map<String, Double> readIoOutliers = slowDiskDetector
        .getOutliers(readIoStats);
    for (Map.Entry<String, Double> entry : readIoOutliers.entrySet()) {
      addDiskStat(diskStats, entry.getKey(), DiskOp.READ, entry.getValue());
    }

    // Get the disk with slow WriteIo operation
    Map<String, Double> writeIoOutliers = slowDiskDetector
        .getOutliers(writeIoStats);
    for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) {
      addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue());
    }
    if (overrideStatus) {
      // Assign slow disk data to diskoutliers stats
      diskOutliersStats = diskStats;
      LOG.debug("Updated disk outliers.");
    }
}

org.apache.hadoop.hdfs.server.datanode.BPServiceActor.java
Similarly, when BPServiceActor class sends heartbeat, it takes out slow disk data from DataNodeDiskMetrics object to form SlowDiskReports and sends them to NN

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
      throws IOException {
    ...
    // Calculate whether the interval is reached (default 30min)
    final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
    final SlowPeerReports slowPeers =
        outliersReportDue && dn.getPeerMetrics() != null ?
            SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
            SlowPeerReports.EMPTY_REPORT;
    // Generate slow disk report
    final SlowDiskReports slowDisks =
        outliersReportDue && dn.getDiskMetrics() != null ?
            SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
            SlowDiskReports.EMPTY_REPORT;

    HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease,
        slowPeers,
        // The slow disk report is sent to NN at random
        slowDisks);
    ...
  }

org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.java
When the report information reaches the NameNode, it will be processed in the handleHeartbeat of the DatanodeManager

public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary,
      @Nonnull SlowPeerReports slowPeers,
      @Nonnull SlowDiskReports slowDisks) throws IOException {
	  ...
	  // Whether slowDiskTracker is empty is determined by DFS datanode. fileio. profiling. sampling. The percentage parameter determines
	  if (slowDiskTracker != null) {
      if (!slowDisks.getSlowDisks().isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("DataNode " + nodeReg + " reported slow disks: " +
              slowDisks.getSlowDisks());
        }
        // The slow disk information is stored in the slowDiskTracker object
        slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
      }
      slowDiskTracker.checkAndUpdateReportIfNecessary();
    }
    ...
}

org.apache.hadoop.hdfs.server.blockmanagement.SlowDiskTracker.java
The logic of generating slow disk json data in SlowDiskTracker is basically the same as that of the slow node above

private ArrayList<DiskLatency> getSlowDisks(
      Map<String, DiskLatency> reports, int numDisks, long now) {
    ...
    // Generate a queue for sorting
    final PriorityQueue<DiskLatency> topNReports = new PriorityQueue<>(
        reports.size(),
        new Comparator<DiskLatency>() {
          @Override
          public int compare(DiskLatency o1, DiskLatency o2) {
            return Doubles.compare(
                o1.getMaxLatency(), o2.getMaxLatency());
          }
        });

    ArrayList<DiskLatency> oldSlowDiskIDs = Lists.newArrayList();

    for (Map.Entry<String, DiskLatency> entry : reports.entrySet()) {
      DiskLatency diskLatency = entry.getValue();
      // Filter expired slow disk reports
      if (now - diskLatency.timestamp < reportValidityMs) {
        // numDisks is fixed to 5 and generates top 5 disks
        if (topNReports.size() < numDisks) {
          topNReports.add(diskLatency);
        } else if (topNReports.peek().getMaxLatency() <
            diskLatency.getMaxLatency()) {
          topNReports.poll();
          topNReports.add(diskLatency);
        }
      } else {
        oldSlowDiskIDs.add(diskLatency);
      }
    }

    oldSlowDisksCheck = oldSlowDiskIDs;

    return Lists.newArrayList(topNReports);
  }

Community related patch

https://issues.apache.org/jira/browse/HDFS-10959(Adding per disk IO statistics and metrics in DataNode.)
https://issues.apache.org/jira/browse/HDFS-11545(Propagate DataNode's slow disks info to the NameNode via Heartbeat)
https://issues.apache.org/jira/browse/HDFS-11551(Handle SlowDiskReport from DataNode at the NameNode)

Related parameters

<property>
	<name>dfs.datanode.fileio.profiling.sampling.percentage</name>
	<value>0</value>
	<description>This setting controls the percentage of file I/O events which will be profiled for DataNode disk statistics. The default value of 0 disables disk statistics. Set to an integer value between 1 and 100 to enable disk statistics.</description>
</property>

reference material
https://blog.csdn.net/pengzhouzhou/article/details/109664302

Keywords: Hadoop hdfs

Added by Vince on Sun, 19 Dec 2021 18:19:36 +0200