Hudi 压缩(Compaction)实现分析

简介: Hudi 压缩(Compaction)实现分析

1. 介绍

压缩( compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。用户可通过 hudi-cli提供的命令行显示触发 compaction或者在使用 HoodieDeltaStreamer将上游(Kafka/DFS)数据写入 hudi数据集时进行相应配置,然后由系统自动进行 compaction操作。

2. 分析

对于 compaction操作,Hudi主要将其分为生成 HoodieCompactionPlan和执行 HoodieCompactionPlan两阶段。

2.1 生成HoodieCompactionPlan

生成 HoodieCompactionPlan的主要入口在 HoodieWriteClient#scheduleCompaction。其核心代码如下

public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
    // 创建新的commitTime,单调递增
    String instantTime = HoodieActiveTimeline.createNewCommitTime();
    // 调度compaction
    boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata);
    return notEmpty ? Option.of(instantTime) : Option.empty();
  }

首先获取新的 commitTime(单调递增),然后调用 scheduleCompactionAtInstant生成 HoodieCompactionPlan,其核心代码如下

public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata)
      throws IOException {
    HoodieTableMetaClient metaClient = createMetaClient(true);
    // 先进行一些检查,1. 如果有inflight状态的写入,那么最早的instant的时间一定大于正在进行压缩的时间;2. commit、deltacommit、compaction类型的instant的时间一定小于正在进行压缩的时间
  metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
      Preconditions.checkArgument(
          HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
          "Earliest write inflight instant time must be later " + "than compaction time. Earliest :" + earliestInflight
              + ", Compaction scheduled at " + instantTime);
    });
    List<HoodieInstant> conflictingInstants = metaClient
        .getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline
            .compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
        .collect(Collectors.toList());
    Preconditions.checkArgument(conflictingInstants.isEmpty(),
        "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
            + conflictingInstants);
    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
    // 开始生成compactionPlan
    HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
    if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) {
      extraMetadata.ifPresent(workload::setExtraMetadata);
      HoodieInstant compactionInstant =
          new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
      // 序列化后保存至元数据(.aux)目录下
      metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
          AvroUtils.serializeCompactionPlan(workload));
      return true;
    }
    return false;
  }

该方法首先会进行校验,包括如果存在 inflight状态的 instant,那么最早的 instant的时间一定要大于当前压缩的时间(可知 compaction时不允许还有处于 inflight状态的非 compaction类型的 instant),以及对于 commitdeltacommitcompaction类型的 instant的时间一定要小于当前压缩的时间( compaction时必须保证所有 completedinflightrequestedcompaction的时间必须小于当前压缩时间)。

调度生成 CompactionPlanscheduleCompaction核心代码如下

public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
    // 找出最后完成的compaction的instant
    Option<HoodieInstant> lastCompaction =
        getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
    String deltaCommitsSinceTs = "0";
    if (lastCompaction.isPresent()) { // 上一次compaction存在
      deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
    }
    // 统计从开始到现在总共有多少个deltacommit的instant
    int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline()
        .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
    if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
      // 生成空的Plan
      return new HoodieCompactionPlan();
    }
    HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor();
    try {
      // 生成CompactionPlan
      return compactor.generateCompactionPlan(jsc, this, config, instantTime,
          ((SyncableFileSystemView) getRTFileSystemView()).getPendingCompactionOperations()
              .map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId())
              .collect(Collectors.toSet()));
    } catch (IOException e) {
      throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
    }
  }

可以看到首先会根据从上次进行 compact之后是否又满足再次 compact的条件(即 deltacommit次数是否已经达到要求),然后再调用 generateCompactionPlan方法生成计划,其核心代码如下

public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieTable hoodieTable,
      HoodieWriteConfig config, String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactions)
      throws IOException {
    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
    // 找出所有的分区路径
    List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
        config.shouldAssumeDatePartitioning());
    // 根据策略过滤分区路径
    partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
    if (partitionPaths.isEmpty()) {
      // 无分区路径,则返回null
      return null;
    }
    RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
    List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size())
        .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
            .getLatestFileSlices(partitionPath)
            .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> {
              // 获取增量日志文件
              List<HoodieLogFile> logFiles =
                  s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
              // 获取数据文件
              Option<HoodieDataFile> dataFile = s.getDataFile();
              // 生成CompactionOperation
              return new CompactionOperation(dataFile, partitionPath, logFiles,
                  config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles));
            }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator())
        .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
    // 根据策略、以及pending的CompactionPlan生成新的HoodieCompactionPlan
    HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
        CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
    Preconditions.checkArgument(
        compactionPlan.getOperations().stream().noneMatch(
            op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
        "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
            + "Please fix your strategy implementation." + "FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions
            + ", Selected workload :" + compactionPlan);
    if (compactionPlan.getOperations().isEmpty()) {
      log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
    }
    return compactionPlan;
  }

可以看到,首先会获取所有的分区,对于每个分区,获取最新的所有不属于正在进行 compaction操作中的 FileSlice,对于 FileSlice,然后再获取对应的数据文件、日志文件、并计算指标信息后生成 CompactionOperation,并过滤出增量日志不为空的 CompactionOperation,然后根据 CompactionOperation构造 HoodieCompactionOperation,最后会根据 HoodieCompactionOperation生成 HoodieCompactionPlan(会对这次的 HoodieCompactionOperation和pending的 HoodieCompactionPlan中的operations进行排序,过滤选出 HoodieCompactionOperation),需确保同个文件不会存在于多个 HoodieCompactionPlan中。

在生成完 HoodieCompactionPlan后,会将其序列化后保存在 .hoodie/.aux元数据目录下,此时状态为 requested,此时便完成了 HoodieCompactionPlan的生成和写入。

2.2 执行HoodieCompactionPlan

在生成完 HoodieCompactionPlan并保存在文件中后,执行 compaction时,最终会调用 HoodieWriteClient#compact方法,其核心代码如下

private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
    HoodieTableMetaClient metaClient = createMetaClient(true);
    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
    // 获取compaction类型的timeline(包括requested、inflight状态)
    HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
    // 构造inflight状态的instant
    HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
    if (pendingCompactionTimeline.containsInstant(inflightInstant)) { // timeline中包含该instant
      // 回滚inflight的compaction
      rollbackInflightCompaction(inflightInstant, table);
      metaClient = createMetaClient(true);
      table = HoodieTable.getHoodieTable(metaClient, config, jsc);
      // 再次获取compaction类型的timeline(包括requested、inflight状态)
      pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
    }
    // 构造requested状态的instant
    HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
    if (pendingCompactionTimeline.containsInstant(instant)) { // 包含
      return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit);
    } else {
      throw new IllegalStateException(
          "No Compaction request available at " + compactionInstantTime + " to run compaction");
    }
  }

方法首先会进行检查,如果包含了 inflight状态的 instant,则需要回滚(以这次 compaction为准),然后再调用 runCompaction方法执行 compaction,其核心代码如下

private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline,
      boolean autoCommit) throws IOException {
    HoodieTableMetaClient metaClient = createMetaClient(true);
    // 从之前的.hoodie/.aux目录下反序列化出Plan
    HoodieCompactionPlan compactionPlan =
        CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp());
    // 将状态从requested转化为inflight,会重命名之前的requested文件
    activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
    compactionTimer = metrics.getCompactionCtx();
    // Create a Hoodie table which encapsulated the commits and files visible
    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
    // 开始真正执行compact操作
    JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
    // Force compaction action
    statuses.persist(config.getWriteStatusStorageLevel());
    // 提交compaction
    commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit,
        Option.ofNullable(compactionPlan.getExtraMetadata()));
    return statuses;
  }

可以看到,首先会从之前序列化的文件中反序列出 HoodieCompactionPlan,然后变更状态后开始调用 compact方法执行compact操作,该方法最终会调用 HoodieRealtimeTableCompactor#compact,其核心代码如下

public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, HoodieCompactionPlan compactionPlan,
      HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
    // 检验
    if (compactionPlan == null || (compactionPlan.getOperations() == null)
        || (compactionPlan.getOperations().isEmpty())) {
      return jsc.emptyRDD();
    }
    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
    // 生成一个CopyOnWriteTable,开始的所有操作都是基于MergeOnReadTable
    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
    // 将HoodieCompactionOperation转化为CompactionOperation
    List<CompactionOperation> operations = compactionPlan.getOperations().stream()
        .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
    return jsc.parallelize(operations, operations.size())
        .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator);
  }

可以看到,其核心逻辑在于重新生成了一个 HoodieCopyOnWriteTable,然后将 HoodieCompactionOperation转化为 CompactionOperation,最后继续调用 compact进行压缩操作,其核心代码如下

private List<WriteStatus> compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
      HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException {
    FileSystem fs = metaClient.getFs();
    // 获取schema
    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
    // 获取类型为(commit、rollback、deltacommit)中最大的instant的时间
    String maxInstantTime = metaClient
        .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
            HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
        .filterCompletedInstants().lastInstant().get().getTimestamp();
    // 获取日志文件
    List<String> logFiles = operation.getDeltaFileNames().stream().map(
        p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
        .collect(toList());
    // 用户读取日志文件中记录的扫描迭代器
    HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles,
        readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(),
        config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(),
        config.getSpillableMapBasePath());
    if (!scanner.iterator().hasNext()) {
      return Lists.<WriteStatus>newArrayList();
    }
    // 获取数据文件
    Option<HoodieDataFile> oldDataFileOpt =
        operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
    // Compacting is very similar to applying updates to existing file
    Iterator<List<WriteStatus>> result;
    if (oldDataFileOpt.isPresent()) { // 数据文件存在
      // 则使用COW类型处理更新
      result = hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(),
          oldDataFileOpt.get());
    } else { // 数据文件不存在
      // 则使用COW类型处理插入
      result = hoodieCopyOnWriteTable.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(),
          scanner.iterator());
    }
    // 获取指标信息
    Iterable<List<WriteStatus>> resultIterable = () -> result;
    return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
      s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
      s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
      s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
      s.getStat().setPartitionPath(operation.getPartitionPath());
      s.getStat()
          .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
      s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
      s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
      s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
      RuntimeStats runtimeStats = new RuntimeStats();
      runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
      s.getStat().setRuntimeStats(runtimeStats);
    }).collect(toList());
  }

可以看到核心流程是构造一个log增量日志文件的记录迭代器(后续单独分析),然后判断该 operation下的数据文件是否为空,若为空,则将所有记录写入新的parquet数据文件,若不为空,则将增量日志文件记录更新至parquet数据文件(与旧的数据文件记录合并后写入parquet文件)。在写入数据文件后会将写入的指标信息写入文件中, 并且将 compaction的状态标记为 completed,同时会将其变更为 timeline上的 commit(文件格式为 commitTime.commit)。

3. 总结

compaction时只存在于 MergeOnRead存储类型时的操作,其首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件( FileSlice),然后生成 HoodieCompactionPlan(每个FileSlice对应一个HoodieCompactionOperation)并将其序列化至文件中,然后在执行 compaction操作时会将其从文件中反序列化,然后从 HoodieCompactionPlan中获取 HoodieCompactionOperation并进行压缩,即会构建一个用于迭代log增量日志文件的迭代器,然后与旧的parquet数据文件进行合并或写入parquet数据文件,完成后会将统计信息写入文件,而 completedcompaction操作在 timeline上表现为 commit

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
5天前
|
分布式计算 测试技术 Apache
探索Apache Hudi核心概念 (3) - Compaction
探索Apache Hudi核心概念 (3) - Compaction
49 5
|
5天前
|
存储 测试技术 分布式数据库
提升 Apache Hudi Upsert 性能的三个建议
提升 Apache Hudi Upsert 性能的三个建议
39 1
|
5天前
|
数据处理 分布式数据库 Apache
一文聊透Apache Hudi的索引设计与应用
一文聊透Apache Hudi的索引设计与应用
114 3
|
算法 Apache C++
干货!Apache Hudi如何智能处理小文件问题
Apache Hudi是一个流行的开源的数据湖框架,Hudi提供的一个非常重要的特性是自动管理文件大小,而不用用户干预。大量的小文件将会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭。在流式场景中不断摄取数据,如果不进行处理,会产生很多小文件。
437 0
干货!Apache Hudi如何智能处理小文件问题
|
5天前
|
存储 Apache
一文彻底理解Apache Hudi的清理服务
一文彻底理解Apache Hudi的清理服务
36 0
|
5天前
|
分布式计算 Apache 调度
Apache Hudi 异步Compaction部署方式汇总
Apache Hudi 异步Compaction部署方式汇总
44 0
|
5天前
|
存储 分布式数据库 Apache
Apache Hudi索引实现分析(三)之HBaseIndex
Apache Hudi索引实现分析(三)之HBaseIndex
26 0
|
5天前
|
缓存 Apache 索引
Apache Hudi索引实现分析(一)之HoodieBloomIndex
Apache Hudi索引实现分析(一)之HoodieBloomIndex
26 0
|
5天前
|
Apache 索引
Apache Hudi索引实现分析(二)之HoodieGlobalBloomIndex
Apache Hudi索引实现分析(二)之HoodieGlobalBloomIndex
30 0
|
5天前
|
SQL 分布式计算 HIVE
Spark读取变更Hudi数据集Schema实现分析
Spark读取变更Hudi数据集Schema实现分析
51 0
http://www.vxiaotou.com