HDFS Hedged Read 的利弊分析

这篇文章重新审视了 HDFS Hedged Read 的实际收益与代价,重点说明它虽然能缓解慢节点问题,但会显著放大内存开销。正文先回顾 Hedged Read 在超时后向其它副本发起并行读取的工作方式,再结合线上因频繁 OOM 触发查询失败的案例,分析 HDFS 客户端在开启该功能后会额外申请一个甚至两个 ByteBuffer,导致读取同一批数据时内存占用从原本的 1 倍上升到 2 到 3 倍,最后解释了为什么 HDFS 默认关闭这一功能。

鄙人在之前一篇文章中,简单的介绍了将 Hedged Read 引入 StarRocks 的效果,见 StarRocks 中关于 Hadoop Hedged Read 性能测试 这篇文章。当时得出来的结论是在存在慢节点的情况下,hedged read 一定是正优化的。但是,后面我就被啪啪打脸了,你想一想,真有那么好的功能,HDFS 为什么不默认开启?真那么牛逼,这部分代码为什么7,8年不改进了?

Hedged read 简介

Hedged read 在 hdfs-site.xml 里面提供了两个参数,分别是:

  • dfs.client.hedged.read.threadpool.size:负责hedged read 线程池大小
  • dfs.client.hedged.read.threshold.millis 发起 hedged read 的阀值时间,当一个请求耗时超过这个阀值,就会新发起一个读请求。

总所周知,HDFS 会把一个文件切分成多个 block,每个 block 默认都有 3 副本,这 3 个副本会存储在不同的 DataNode 上面。HDFS 客户端每次读取一个 block 时,都会选择一个存储有该 block 的 DataNode 进行读取。

而 Hedged Read 的思想很简单,当第一次发起的请求耗时超过dfs.client.hedged.read.threshold.millis 这个阀值后,它会把这个 DataNode 加入黑名单,然后重新发起一个新请求,这个请求会发向新的 DataNode。最后看两个请求,谁返回的快,就用谁的。

优点-解决慢节点

这个机制听起来很美好,不把鸡蛋放在一个篮子里面,哪个 DataNode快就用哪个,有效的解决了 HDFS 慢节点的问题。不过它的实现也很粗糙,它只会试两个DataNode,如果两个DataNode都抽风了,那也没办法了。

缺点-费内存

那么这么牛的方法,为什么不默认开启呢?这就要从线上的一个事故说起了。线上的一位客户一听这么牛逼的功能,就马上开启了 hedged read 功能。当时他们自己简单的测试了下,效果确实杠杠的,HDFS 慢节点的问题得到了很大的改善。但是一道生产环境,发现 HDFS 客户端频繁的报 Out Of Memory 问题,导致查询失败。起初以为流量变大了,但是后面经过检查,发现流量其实并没有什么变化。后面客户关闭了 hedged read,就一切正常了。

为了搞明白问题,我就看了下 hedged read 的源码,核心代码如下:

/**
 * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
 * 'hedged' read if the first read is taking longer than configured amount of
 * time. We then wait on which ever read returns first.
 */
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
    long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
    throws IOException {
  final DfsClientConf conf = dfsClient.getConf();
  ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
  CompletionService<ByteBuffer> hedgedService =
      new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool());
  ArrayList<DatanodeInfo> ignored = new ArrayList<>();
  ByteBuffer bb;
  int len = (int) (end - start + 1);
  int hedgedReadId = 0;
  while (true) {
    // see HDFS-6591, this metric is used to verify/catch unnecessary loops
    hedgedReadOpsLoopNumForTesting++;
    DNAddrPair chosenNode = null;
    // there is no request already executing.
    if (futures.isEmpty()) {
      // chooseDataNode is a commitment. If no node, we go to
      // the NN to reget block locations. Only go here on first read.
      chosenNode = chooseDataNode(block, ignored);
      // Latest block, if refreshed internally
      block = chosenNode.block;
      bb = ByteBuffer.allocate(len);
      Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
          chosenNode, block, start, end, bb,
          corruptedBlocks, hedgedReadId++);
      Future<ByteBuffer> firstRequest = hedgedService
          .submit(getFromDataNodeCallable);
      futures.add(firstRequest);
      Future<ByteBuffer> future = null;
      try {
        future = hedgedService.poll(
            conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
        if (future != null) {
          ByteBuffer result = future.get();
          result.flip();
          buf.put(result);
          return;
        }
        DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
            + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
        dfsClient.getHedgedReadMetrics().incHedgedReadOps();
        // continue; no need to refresh block locations
      } catch (ExecutionException e) {
        futures.remove(future);
      } catch (InterruptedException e) {
        throw new InterruptedIOException(
            "Interrupted while waiting for reading task");
      }
      // Ignore this node on next go around.
      // If poll timeout and the request still ongoing, don't consider it
      // again. If read data failed, don't consider it either.
      ignored.add(chosenNode.info);
    } else {
      // We are starting up a 'hedged' read. We have a read already
      // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
      // If no nodes to do hedged reads against, pass.
      boolean refetch = false;
      try {
        chosenNode = chooseDataNode(block, ignored, false);
        if (chosenNode != null) {
          // Latest block, if refreshed internally
          block = chosenNode.block;
          bb = ByteBuffer.allocate(len);
          Callable<ByteBuffer> getFromDataNodeCallable =
              getFromOneDataNode(chosenNode, block, start, end, bb,
                  corruptedBlocks, hedgedReadId++);
          Future<ByteBuffer> oneMoreRequest =
              hedgedService.submit(getFromDataNodeCallable);
          futures.add(oneMoreRequest);
        } else {
          refetch = true;
        }
      } catch (IOException ioe) {
        DFSClient.LOG.debug("Failed getting node for hedged read: {}",
            ioe.getMessage());
      }
      // if not succeeded. Submit callables for each datanode in a loop, wait
      // for a fixed interval and get the result from the fastest one.
      try {
        ByteBuffer result = getFirstToComplete(hedgedService, futures);
        // cancel the rest.
        cancelAll(futures);
        dfsClient.getHedgedReadMetrics().incHedgedReadWins();
        result.flip();
        buf.put(result);
        return;
      } catch (InterruptedException ie) {
        // Ignore and retry
      }
      // If refetch is true, then all nodes are in deadNodes or ignoredNodes.
      // We should loop through all futures and remove them, so we do not
      // have concurrent requests to the same node.
      // Once all futures are cleared, we can clear the ignoredNodes and retry.
      if (refetch && futures.isEmpty()) {
        block = refetchLocations(block, ignored);
      }
      // We got here if exception. Ignore this node on next go around IFF
      // we found a chosenNode to hedge read against.
      if (chosenNode != null && chosenNode.info != null) {
        ignored.add(chosenNode.info);
      }
    }
  }
}

可以看到第一次请求,在 28 行申请了一个内存 bb = ByteBuffer.allocate(len);,如果等待超过阀值,在 69 行再一次申请了内存 bb = ByteBuffer.allocate(len);

这里要注意一个问题,在使用 HDFS 客户端读取文件的时候,我们都会事先在外部申请一块内存,然后让 HDFS 客户端往这块内存填东西。而开启了 hedged read 后,HDFS 客户端保底就会为第一次请求申请一个 ByteBuffer,如果存在慢节点情况,再申请一个 ByteBuffer。

这里拉个清单,算下内存消耗,假设我们要读取一个 30MB 的文件:

  • 不开启 hedged read:自己申请的30MB
  • 开启 hedged read:自己申请的30MB + hedged read 第一次申请的 30MB = 60MB
  • 开启 hedged read 且存在慢节点情况:自己申请的30MB + hedged read 第一次申请的 30MB + hedged read 第二次申请的30MB = 90MB

总结

所以只要开启了 hedged read,内存消耗保底就会 x2,如果存在慢节点,就 x3了,那能不OOM才怪了。

所以除非内存富裕,能接受2倍~3倍的额外开销,不然还是别开了,这也能理解为什么 HDFS 默认关闭了。

原创文章,作者:Smith,如若转载,请注明出处:https://www.inlighting.org/archives/hdfs-hedged-read

打赏 微信扫一扫 微信扫一扫
SmithSmith
上一篇 2023年8月21日 下午1:17
下一篇 2023年11月12日 下午7:59

相关推荐

  • 浅谈 Apache ORC 之 Decimal 存储

    这篇文章围绕 Apache ORC 中 Decimal 类型的存储实现,逐步补齐理解官方文档所需的几个关键前置概念。正文先解释 precision 和 scale 的含义,以及 ORC 中 int128、补码和 zigzag 编码的背景,再通过示例梳理 zigzag 的编码与解码过程,最后回到 ORC Decimal 本身,说明 valueStream 如何保存整数表示、SECONDARY stream 如何保存 scale,以及 reader 在 scale 不匹配时如何对读出的数值进行乘除调整。

    2024年5月5日
    9791
  • 浅谈 HDFS 慢节点的解决方案

    这篇文章围绕 HDFS 慢节点导致 OLAP 查询忽快忽慢的问题,梳理了问题成因和几种常见缓解手段。正文先区分数据多读、IO 次数过多、跨多个 HDFS Block 读取以及纯粹运气不好连到慢 DataNode 等不同来源,再分别讨论 Hedged Read 在解决慢节点时的原理、线程池规划和额外内存成本,以及通过缩短 HDFS 客户端 IO timeout 来尽快放弃慢请求的思路,最后结合 Trino 的历史经验说明这类配置本质上只是补救措施,真正的关键仍然是减少不必要的 IO。

    2024年3月25日
    4.4K3
  • Hadoop 完全分布式(Fully Distributed)安装

    这篇文章记录了一个基于三台 CentOS 8 主机搭建 Hadoop 完全分布式集群的完整过程,内容从基础环境准备、主机通信、静态 IP、hadoop 用户和 SSH 免密登录开始,随后逐步配置 Hadoop 3.2.1 的 JAVA_HOME、HDFS、YARN 与 workers 节点,并将配置同步到各个从节点。最后文章通过格式化 NameNode、启动 HDFS 和 YARN、查看 jps 进程以及访问 Web 控制台的方式验证集群是否正常运行。

    2019年10月6日
    1.8K0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注


评论列表(2条)

  • […] 我在之前的文章里面《HDFS Hedged Read 的利弊分析》分析过,hedged read 因为实现上的挫,它保底要消耗多一倍的内存。期间如果超过阈值发起了 hedged read,那就是两倍。 […]

  • […] 我在之前的文章里面《HDFS Hedged Read 的利弊分析》分析过,hedged read 因为实现上的挫,它保底要消耗多一倍的内存。期间如果超过阈值发起了 hedged read,那就是两倍。 […]