HDFS Hedged Read 的利弊分析

鄙人在之前一篇文章中,简单的介绍了将 Hedged Read 引入 StarRocks 的效果,见 StarRocks 中关于 Hadoop Hedged Read 性能测试 这篇文章。当时得出来的结论是在存在慢节点的情况下,hedged read 一定是正优化的。但是,后面我就被啪啪打脸了,你想一想,真有那么好的功能,HDFS 为什么不默认开启?真那么牛逼,这部分代码为什么7,8年不改进了?

Hedged read 简介

Hedged read 在 hdfs-site.xml 里面提供了两个参数,分别是:

  • dfs.client.hedged.read.threadpool.size:负责hedged read 线程池大小
  • dfs.client.hedged.read.threshold.millis 发起 hedged read 的阀值时间,当一个请求耗时超过这个阀值,就会新发起一个读请求。

总所周知,HDFS 会把一个文件切分成多个 block,每个 block 默认都有 3 副本,这 3 个副本会存储在不同的 DataNode 上面。HDFS 客户端每次读取一个 block 时,都会选择一个存储有该 block 的 DataNode 进行读取。

而 Hedged Read 的思想很简单,当第一次发起的请求耗时超过dfs.client.hedged.read.threshold.millis 这个阀值后,它会把这个 DataNode 加入黑名单,然后重新发起一个新请求,这个请求会发向新的 DataNode。最后看两个请求,谁返回的快,就用谁的。

优点-解决慢节点

这个机制听起来很美好,不把鸡蛋放在一个篮子里面,哪个 DataNode快就用哪个,有效的解决了 HDFS 慢节点的问题。不过它的实现也很粗糙,它只会试两个DataNode,如果两个DataNode都抽风了,那也没办法了。

缺点-费内存

那么这么牛的方法,为什么不默认开启呢?这就要从线上的一个事故说起了。线上的一位客户一听这么牛逼的功能,就马上开启了 hedged read 功能。当时他们自己简单的测试了下,效果确实杠杠的,HDFS 慢节点的问题得到了很大的改善。但是一道生产环境,发现 HDFS 客户端频繁的报 Out Of Memory 问题,导致查询失败。起初以为流量变大了,但是后面经过检查,发现流量其实并没有什么变化。后面客户关闭了 hedged read,就一切正常了。

为了搞明白问题,我就看了下 hedged read 的源码,核心代码如下:

/**
 * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
 * 'hedged' read if the first read is taking longer than configured amount of
 * time. We then wait on which ever read returns first.
 */
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
    long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
    throws IOException {
  final DfsClientConf conf = dfsClient.getConf();
  ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
  CompletionService<ByteBuffer> hedgedService =
      new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool());
  ArrayList<DatanodeInfo> ignored = new ArrayList<>();
  ByteBuffer bb;
  int len = (int) (end - start + 1);
  int hedgedReadId = 0;
  while (true) {
    // see HDFS-6591, this metric is used to verify/catch unnecessary loops
    hedgedReadOpsLoopNumForTesting++;
    DNAddrPair chosenNode = null;
    // there is no request already executing.
    if (futures.isEmpty()) {
      // chooseDataNode is a commitment. If no node, we go to
      // the NN to reget block locations. Only go here on first read.
      chosenNode = chooseDataNode(block, ignored);
      // Latest block, if refreshed internally
      block = chosenNode.block;
      bb = ByteBuffer.allocate(len);
      Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
          chosenNode, block, start, end, bb,
          corruptedBlocks, hedgedReadId++);
      Future<ByteBuffer> firstRequest = hedgedService
          .submit(getFromDataNodeCallable);
      futures.add(firstRequest);
      Future<ByteBuffer> future = null;
      try {
        future = hedgedService.poll(
            conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
        if (future != null) {
          ByteBuffer result = future.get();
          result.flip();
          buf.put(result);
          return;
        }
        DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
            + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
        dfsClient.getHedgedReadMetrics().incHedgedReadOps();
        // continue; no need to refresh block locations
      } catch (ExecutionException e) {
        futures.remove(future);
      } catch (InterruptedException e) {
        throw new InterruptedIOException(
            "Interrupted while waiting for reading task");
      }
      // Ignore this node on next go around.
      // If poll timeout and the request still ongoing, don't consider it
      // again. If read data failed, don't consider it either.
      ignored.add(chosenNode.info);
    } else {
      // We are starting up a 'hedged' read. We have a read already
      // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
      // If no nodes to do hedged reads against, pass.
      boolean refetch = false;
      try {
        chosenNode = chooseDataNode(block, ignored, false);
        if (chosenNode != null) {
          // Latest block, if refreshed internally
          block = chosenNode.block;
          bb = ByteBuffer.allocate(len);
          Callable<ByteBuffer> getFromDataNodeCallable =
              getFromOneDataNode(chosenNode, block, start, end, bb,
                  corruptedBlocks, hedgedReadId++);
          Future<ByteBuffer> oneMoreRequest =
              hedgedService.submit(getFromDataNodeCallable);
          futures.add(oneMoreRequest);
        } else {
          refetch = true;
        }
      } catch (IOException ioe) {
        DFSClient.LOG.debug("Failed getting node for hedged read: {}",
            ioe.getMessage());
      }
      // if not succeeded. Submit callables for each datanode in a loop, wait
      // for a fixed interval and get the result from the fastest one.
      try {
        ByteBuffer result = getFirstToComplete(hedgedService, futures);
        // cancel the rest.
        cancelAll(futures);
        dfsClient.getHedgedReadMetrics().incHedgedReadWins();
        result.flip();
        buf.put(result);
        return;
      } catch (InterruptedException ie) {
        // Ignore and retry
      }
      // If refetch is true, then all nodes are in deadNodes or ignoredNodes.
      // We should loop through all futures and remove them, so we do not
      // have concurrent requests to the same node.
      // Once all futures are cleared, we can clear the ignoredNodes and retry.
      if (refetch && futures.isEmpty()) {
        block = refetchLocations(block, ignored);
      }
      // We got here if exception. Ignore this node on next go around IFF
      // we found a chosenNode to hedge read against.
      if (chosenNode != null && chosenNode.info != null) {
        ignored.add(chosenNode.info);
      }
    }
  }
}

可以看到第一次请求,在 28 行申请了一个内存 bb = ByteBuffer.allocate(len);,如果等待超过阀值,在 69 行再一次申请了内存 bb = ByteBuffer.allocate(len);

这里要注意一个问题,在使用 HDFS 客户端读取文件的时候,我们都会事先在外部申请一块内存,然后让 HDFS 客户端往这块内存填东西。而开启了 hedged read 后,HDFS 客户端保底就会为第一次请求申请一个 ByteBuffer,如果存在慢节点情况,再申请一个 ByteBuffer。

这里拉个清单,算下内存消耗,假设我们要读取一个 30MB 的文件:

  • 不开启 hedged read:自己申请的30MB
  • 开启 hedged read:自己申请的30MB + hedged read 第一次申请的 30MB = 60MB
  • 开启 hedged read 且存在慢节点情况:自己申请的30MB + hedged read 第一次申请的 30MB + hedged read 第二次申请的30MB = 90MB

总结

所以只要开启了 hedged read,内存消耗保底就会 x2,如果存在慢节点,就 x3了,那能不OOM才怪了。

所以除非内存富裕,能接受2倍~3倍的额外开销,不然还是别开了,这也能理解为什么 HDFS 默认关闭了。

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇