Apache Paimon PK 表的 data distribution

本人初次接触 Paimon,以下内容仅是自己的理解,如有错误,欢迎评论。

下面所有讨论的内容仅限 PK 表。

Paimon 通过 bucket 提升数据的读写并行度,每一个 bucket 是一棵独立的 LSM 树,也是最小的读写单元。

当表存在分区列时,那么会有两层索引。第一层是分区,第二层是 bucket。Bucket 挂载在每一个分区下面。

两层索引目录结构如下:

├── dt=2012
│   ├── bucket-0
│   │   └── ...
│   └── bucket-1
│       └── ...
├── dt=2013
│   ├── bucket-0
│   │   └── ...
│   └── bucket-1
│       └── ...
├── manifest
│   └── ...
├── schema
│   └── ...
└── snapshot
    └── ...

Paimon 支持三种 bucket 模式:

  • Fixed bucket:bucket > N
  • Dynamic bucket:bucket = -1
  • Postpone bucket:bucket = -2

Paimon 理论上期望同一主键都会落在一个分区的一个 bucket 上面,但是实际情况不是这样。

先讨论最简单的情况,即不存在跨分区更新同一主键的情况。

不会跨分区更新同一主键

建表的时候,把分区列包含在主键列中,那么同一个主键一定只会落在同一个分区上,即不存在跨分区的情况。

Fixed bucket

建表的时候,指定 bucket > 0 即为 fixed bucket。算法就是 HASH(primary_key) % bucket

CREATE TABLE test (
  id bigint,
  dt string,
  name string,
  primary key(id, dt) not enforced
) PARTITIONED BY(dt)
WITH (
  'bucket' = '5'
);

insert into test values(1, '2012', 'smith');

写入数据的时候,先找到 dt=2012 分区,执行 HASH(id, dt) % 5 找到对应的 bucket。

优点:

  • 实现简单,HASH 取模很高效。

缺点:

  • 要一开始规划好 bucket 的数量,如果 bucket 数量过多,可能产生过多小文件。如果 bucket 数量少,并行度上不去。
  • 每一个分区的 bucket 数量都是固定的,当分区级别发生数据倾斜的时候,bucket 数量不好设置。比如 dt=2012 的数据有 100G,但是 dt=2013 的数据只有 1G,此时 fixed bucket 无法处理该情况,无法兼顾两个分区。
  • Bucket 数量一旦设定好,就无法调整,除非对表进行 rescale 操作(就是 INSERT OVERWRITE)。

有一种奇技淫巧,可以实现对不同分区,实现不同的 bucket 数量:先 ALTER TABLE 修改表的属性,然后 INSERT OVERWRITE 某一个分区。

-- 修改 dt=2022-01-01 的 bucket 数量为 4
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
SELECT * FROM ...;

-- 修改 dt=2022-01-02 的 bucket 数量为 8
ALTER TABLE my_table SET ('bucket' = '8');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
SELECT * FROM ...;

Dynamic bucket

bucket=-1时开启 dynamic bucket,无需用户自己设置 bucket 数量。Paimon 会按照如下规则自行扩容 bucket:

  • Option1: 'dynamic-bucket.target-row-num': controls the target row number for one bucket.
  • Option2: 'dynamic-bucket.initial-buckets': controls the number of initialized bucket.
  • Option3: 'dynamic-bucket.max-buckets': controls the number of max buckets.

开启 dynamic bucket 后,Paimon 需要维护 index,决定HASH(primary_key) 映射到哪一个 bucket。

每一个 bucket 有一个 index 文件,index 文件会存储该 bucket 下所有主键的 HASH_VALUE:

HASH_VALUE | HASH_VALUE | HASH_VALUE | HASH_VALUE | …

每一个 HASH_VALUE 对应一个 primary key,长度固定 4bytes。

Snapshot 里面的 indexManifest字段标记总 index 文件名称,该文件会存储所有分区,每一个 bucket 的 index 文件。

索引文件查找规则:Snapshot -> IndexManifest -> IndexManifestEntry -> index-xxxxx。

IndexManifest 内容:_PARTITION 表示哪一个 partition,_BUCKET 表示该 partition 下面的哪一个 bucket。

{
  "_VERSION" : 1,
  "_KIND" : 0,
  "_PARTITION" : "2012",
  "_BUCKET" : 0,
  "_INDEX_TYPE" : "HASH",
  "_FILE_NAME" : "index-e448a3ac-1e40-47f5-b081-2cbfd792530c-1",
  "_FILE_SIZE" : 8,
  "_ROW_COUNT" : 2,
  "_DELETIONS_VECTORS_RANGES" : null,
  "_EXTERNAL_PATH" : null
}
{
  "_VERSION" : 1,
  "_KIND" : 0,
  "_PARTITION" : "2013", 
  "_BUCKET" : 0,
  "_INDEX_TYPE" : "HASH",
  "_FILE_NAME" : "index-e448a3ac-1e40-47f5-b081-2cbfd792530c-0",
  "_FILE_SIZE" : 4,
  "_ROW_COUNT" : 1,
  "_DELETIONS_VECTORS_RANGES" : null,
  "_EXTERNAL_PATH" : null
}

Bucket 填充按照数据到来的顺序填充:先看 index ,检查该 primary key 是不是已经落在某个 bucket 中。如果这是个新 primary key,则填充在空闲的 bucket 上,如果现有 bucket 都满了,则开辟新的 bucket。

Dynamic bucket 同一分区只能有一个 write job,因为同一分区的 key -> bucket 路由状态、bucket 扩容状态必须由单一 writer 串行维护;否则多个 writer 会各自做路由决策,导致同 key 被分到不同 bucket,最终出现重复数据。

优点:

  • 不需要写死 bucket 数量,随着数据量规模变大,它能自己开辟新 bucket。

缺点:

  • 在往一个分区写数据的时候,需要先加载该分区所有 bucket 的 index。1 亿个主键大约消耗 1GB 内存。
  • 当存在跨分区更新时,性能会明显变差,后文会展开说明。

Postpone bucket

设置 bucket=-2开启 postpone bucket,写入的数据会先放在 bucket-postpone文件夹下面,此时数据是不会被 reader 读取到。只有当用户手动执行 compact 后,数据才会被划分到指定的 bucket 下面。

Compact 前:数据都会以 avro 格式临时存储在 bucket-postpone文件夹下:

├── dt=2012
│   └── bucket-postpone
│       └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-061ef2ef-93a0-4b7b-b978-ea2bab71befc-0.avro
├── dt=2013
│   └── bucket-postpone
│       └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-88dfc7d7-b040-44d2-87e9-13e7c37bb71b-0.avro
├── manifest
│   ├── manifest-3f088188-b88d-4eea-a614-06c648c5a1ae-0
│   ├── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-0
│   └── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-1
├── schema
│   └── schema-0
└── snapshot
    ├── LATEST
    └── snapshot-1

用户执行 Compact 的时候,按照 postpone.default-bucket-num划分 bucket(默认值为 1)。

bucket-postpone在执行 compact 命令的时候,底层直接复用的是 FixedBucket 逻辑。

Compact 后:

├── dt=2012
│   ├── bucket-0
│   │   └── data-1b180ea9-d358-4dcc-b218-c7abc96df86e-0.parquet
│   └── bucket-postpone
│       └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-061ef2ef-93a0-4b7b-b978-ea2bab71befc-0.avro
├── dt=2013
│   ├── bucket-0
│   │   └── data-501cc1b2-beb3-4a42-8ee2-f0eac2c7958e-0.parquet
│   └── bucket-postpone
│       └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-88dfc7d7-b040-44d2-87e9-13e7c37bb71b-0.avro
├── manifest
│   ├── manifest-3f088188-b88d-4eea-a614-06c648c5a1ae-0
│   ├── manifest-905baa52-a049-417f-b6d5-a0e1144ba8e1-0
│   ├── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-0
│   ├── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-1
│   ├── manifest-list-e51a6085-27a1-4e91-9a76-b9cdb1da0142-0
│   └── manifest-list-e51a6085-27a1-4e91-9a76-b9cdb1da0142-1
├── schema
│   └── schema-0
└── snapshot
    ├── EARLIEST
    ├── LATEST
    ├── snapshot-1
    └── snapshot-2

注意点:

一个分区只有第一次 compact 的时候,bucket 划分会遵循 postpone.default-bucket-num 的值。后续该分区所有的 compact 都会复用已经确定的 bucket 数量(即使你修改了 postpone.default-bucket-num也直接无视)。

决策 bucket 数量的伪代码逻辑:

  // 当前的 bucket 数量决策逻辑
  for (BinaryRow partition : partitions) {
      int bucketNum = defaultBucketNum;   // ① 先取 postpone.default-bucket-num

      Iterator<ManifestEntry> it = table.newSnapshotReader()
          .withPartitionFilter(partition)
          .onlyReadRealBuckets()
          .readFileIterator();
      if (it.hasNext()) {
          bucketNum = it.next().totalBuckets();  // ② 如果已有真实 bucket,用已有的数量
      }

      // ③ 用 bucketNum 构造 FixedBucketSink 写入
  }

如果你实在想修改 bucket 数量,则需要通过 rescale 命令重新分配 bucket 数量。

优点:

  • 把划分 bucket 的操作延后到 compact 阶段,这时候可以根据 bucket-postpone 文件夹的数据规模,手动设定 bucket 数量进行 compact。
  • 可以对每一个分区独立的进行 compact,这样可以每个分区 bucket 数量不一样。(其实这个 fixed bucket 也能实现)

缺点:

  • 要 compact 之后,才可以被 reader 可见,数据实时度不够。

主键跨分区更新

当主键存在跨分区更新这种情况的时候,bucket 划分就变得复杂了。

考虑如下例子:

CREATE TABLE test (
  id bigint,
  dt string,
  name string,
  primary key(id) not enforced
)
PARTITIONED BY (dt)
WITH (
  'bucket' = '-1'
);

test 表的分区列是 dt,但是主键列是 id。

考虑一个场景:

insert into test values (1, '2012', 'smith');
insert into test values (1, '2013', 'danny');

此时 dt=2012dt=2013 这两个分区下都有 id=1 的数据,Paimon 需要保证数据读取的正确性,理论上,danny 应该覆盖 smith 对应的旧值。

接下来看看 Paimon 是怎么做的。

Fixed bucket & Postpone bucket

先上结论,Fixed bucket 和 Postpone bucket 在 cross partition 的情况下,会导致数据重复。需要用户自行在数据输入端确保主键的唯一性。

Paimon 官网也说明了:

You can also use Cross Partitions Upsert with bucket (N > 0) or bucket (-2), in these modes, there is no global index to ensure that your data undergoes reasonable deduplication, so relying on your input to have a complete changelog can ensure the uniqueness of the data.

验证一下:

create table test(
  id bigint, 
  dt string, 
  name string, 
  primary key(id) not enforced
) partitioned by (dt) with ('bucket'='1');

insert into test values(1, '2012', 'smith'), (2, '2012', 'danny');
insert into test values(1, '2013', 'blossom');

理论上 dt=2012 的 smith 会被 dt=2013 的 blossom 替代。但是现在如果你执行 SELECT * 扫描任务,你会看到数据 id=1 的主键重复。

id                          dt                          name
1                           2013                        blossom
1                           2012                        smith
2                           2012                        danny

Dynamic bucket

在跨分区更新的情况下,dynamic bucket 不会再创建 IndexManifest 索引文件。

create table test(
  id bigint, 
  dt string, 
  name string, 
  primary key(id) not enforced
) partitioned by (dt) with ('bucket'='-1');

insert into test values(1, '2012', 'smith'), (2, '2012', 'danny');

目录结构上没有 index:

├── dt=2012
│   └── bucket-0
│       └── data-87175f86-40a1-44c4-9c42-0efc88fd07b5-0.parquet
├── manifest
│   ├── manifest-33694416-45b0-4cde-bece-09ed653a868c-0
│   ├── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-0
│   └── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-1
├── schema
│   └── schema-0
└── snapshot
    ├── EARLIEST
    ├── LATEST
    └── snapshot-1

此时再插入一行数据:

insert into test values(1, '2013', 'blossom');
├── dt=2012
│   └── bucket-0
│       ├── data-7afdb6f3-cb48-4e3a-912f-555af68e69dd-0.parquet
│       └── data-87175f86-40a1-44c4-9c42-0efc88fd07b5-0.parquet
├── dt=2013
│   └── bucket-0
│       └── data-4833fa82-d805-479a-8807-25e69e8647ec-0.parquet
├── manifest
│   ├── manifest-33694416-45b0-4cde-bece-09ed653a868c-0
│   ├── manifest-71bf8258-1715-4540-8688-f2fbe852b1d3-0
│   ├── manifest-list-9041c45f-9087-4ef3-b7e1-f9fbd02672b5-0
│   ├── manifest-list-9041c45f-9087-4ef3-b7e1-f9fbd02672b5-1
│   ├── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-0
│   └── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-1
├── schema
│   └── schema-0
└── snapshot
    ├── EARLIEST
    ├── LATEST
    ├── snapshot-1
    └── snapshot-2

这时候 dt=2013 里面多出了我们新插入的数据。dt=2012 里面也多了一个 parquet,它负责将旧的 smith 记录标记为删除(Deduplicate 模式)。

dt=2012 第一次 insert 生成的 parquet 文件内容:

{"_KEY_id": 1, "_SEQUENCE_NUMBER": 0, "_VALUE_KIND": 0, "id": 1, "dt": "2012", "name": "smith"}
{"_KEY_id": 2, "_SEQUENCE_NUMBER": 1, "_VALUE_KIND": 0, "id": 2, "dt": "2012", "name": "danny"}

dt=2012 标记删除的 parquet 文件内容:

{"_KEY_id": 1, "_SEQUENCE_NUMBER": 2, "_VALUE_KIND": 3, "id": 1, "dt": "2012", "name": "blossom"}

此时你查询 Paimon 表,结果都是正确的。

那么问题来了,Paimon 是如何维护 index,来确定一个 primary key 在哪一个 bucket 里面?

就是在一个 writer 任务启动的时候,它会使用 RocksDB 维护一个全局索引,先扫描所有的历史数据,构建出一个全局的 PK-> (partition, bucket) 的映射,再开始写入数据。

所以其缺点也很明显,当数据量较大时,writer 任务初始化会明显变慢。

官方也提了一嘴:

Performance: For tables with a large amount of data, there will be a significant loss in performance. Moreover, initialization takes a long time.

总结

  • 上面的讨论都只是涉及 writer,对于 reader 来说,只需要按照元信息读取数据即可,不需要关心 bucket 的策略。
  • 在设计表结构的时候,最好把分区列也加入主键列,这样可以获得最佳性能,而且不需要担心数据会出现重复。

原创文章,作者:Smith,如若转载,请注明出处:https://www.inlighting.org/archives/apache-paimon-pk-table-data-distribution

打赏 微信扫一扫 微信扫一扫
SmithSmith
上一篇 2026年1月11日 下午2:18
下一篇 2024年3月25日 下午8:25

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注