本人初次接触 Paimon,以下内容仅是自己的理解,如有错误,欢迎评论。
下面所有讨论的内容仅限 PK 表。
Paimon 通过 bucket 提升数据的读写并行度,每一个 bucket 是一棵独立的 LSM 树,也是最小的读写单元。
当表存在分区列时,那么会有两层索引。第一层是分区,第二层是 bucket。Bucket 挂载在每一个分区下面。
两层索引目录结构如下:
├── dt=2012
│ ├── bucket-0
│ │ └── ...
│ └── bucket-1
│ └── ...
├── dt=2013
│ ├── bucket-0
│ │ └── ...
│ └── bucket-1
│ └── ...
├── manifest
│ └── ...
├── schema
│ └── ...
└── snapshot
└── ...
Paimon 支持三种 bucket 模式:
- Fixed bucket:bucket > N
- Dynamic bucket:bucket = -1
- Postpone bucket:bucket = -2
Paimon 理论上期望同一主键都会落在一个分区的一个 bucket 上面,但是实际情况不是这样。
先讨论最简单的情况,即不存在跨分区更新同一主键的情况。
不会跨分区更新同一主键
建表的时候,把分区列包含在主键列中,那么同一个主键一定只会落在同一个分区上,即不存在跨分区的情况。
Fixed bucket
建表的时候,指定 bucket > 0 即为 fixed bucket。算法就是 HASH(primary_key) % bucket 。
CREATE TABLE test (
id bigint,
dt string,
name string,
primary key(id, dt) not enforced
) PARTITIONED BY(dt)
WITH (
'bucket' = '5'
);
insert into test values(1, '2012', 'smith');
写入数据的时候,先找到 dt=2012 分区,执行 HASH(id, dt) % 5 找到对应的 bucket。
优点:
- 实现简单,HASH 取模很高效。
缺点:
- 要一开始规划好 bucket 的数量,如果 bucket 数量过多,可能产生过多小文件。如果 bucket 数量少,并行度上不去。
- 每一个分区的 bucket 数量都是固定的,当分区级别发生数据倾斜的时候,bucket 数量不好设置。比如
dt=2012的数据有 100G,但是dt=2013的数据只有 1G,此时 fixed bucket 无法处理该情况,无法兼顾两个分区。 - Bucket 数量一旦设定好,就无法调整,除非对表进行 rescale 操作(就是 INSERT OVERWRITE)。
有一种奇技淫巧,可以实现对不同分区,实现不同的 bucket 数量:先 ALTER TABLE 修改表的属性,然后 INSERT OVERWRITE 某一个分区。
-- 修改 dt=2022-01-01 的 bucket 数量为 4
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
SELECT * FROM ...;
-- 修改 dt=2022-01-02 的 bucket 数量为 8
ALTER TABLE my_table SET ('bucket' = '8');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
SELECT * FROM ...;
Dynamic bucket
当 bucket=-1时开启 dynamic bucket,无需用户自己设置 bucket 数量。Paimon 会按照如下规则自行扩容 bucket:
- Option1:
'dynamic-bucket.target-row-num': controls the target row number for one bucket. - Option2:
'dynamic-bucket.initial-buckets': controls the number of initialized bucket. - Option3:
'dynamic-bucket.max-buckets': controls the number of max buckets.
开启 dynamic bucket 后,Paimon 需要维护 index,决定HASH(primary_key) 映射到哪一个 bucket。
每一个 bucket 有一个 index 文件,index 文件会存储该 bucket 下所有主键的 HASH_VALUE:
HASH_VALUE | HASH_VALUE | HASH_VALUE | HASH_VALUE | …
每一个 HASH_VALUE 对应一个 primary key,长度固定 4bytes。
Snapshot 里面的 indexManifest字段标记总 index 文件名称,该文件会存储所有分区,每一个 bucket 的 index 文件。
索引文件查找规则:Snapshot -> IndexManifest -> IndexManifestEntry -> index-xxxxx。
IndexManifest 内容:_PARTITION 表示哪一个 partition,_BUCKET 表示该 partition 下面的哪一个 bucket。
{
"_VERSION" : 1,
"_KIND" : 0,
"_PARTITION" : "2012",
"_BUCKET" : 0,
"_INDEX_TYPE" : "HASH",
"_FILE_NAME" : "index-e448a3ac-1e40-47f5-b081-2cbfd792530c-1",
"_FILE_SIZE" : 8,
"_ROW_COUNT" : 2,
"_DELETIONS_VECTORS_RANGES" : null,
"_EXTERNAL_PATH" : null
}
{
"_VERSION" : 1,
"_KIND" : 0,
"_PARTITION" : "2013",
"_BUCKET" : 0,
"_INDEX_TYPE" : "HASH",
"_FILE_NAME" : "index-e448a3ac-1e40-47f5-b081-2cbfd792530c-0",
"_FILE_SIZE" : 4,
"_ROW_COUNT" : 1,
"_DELETIONS_VECTORS_RANGES" : null,
"_EXTERNAL_PATH" : null
}
Bucket 填充按照数据到来的顺序填充:先看 index ,检查该 primary key 是不是已经落在某个 bucket 中。如果这是个新 primary key,则填充在空闲的 bucket 上,如果现有 bucket 都满了,则开辟新的 bucket。
Dynamic bucket 同一分区只能有一个 write job,因为同一分区的 key -> bucket 路由状态、bucket 扩容状态必须由单一 writer 串行维护;否则多个 writer 会各自做路由决策,导致同 key 被分到不同 bucket,最终出现重复数据。
优点:
- 不需要写死 bucket 数量,随着数据量规模变大,它能自己开辟新 bucket。
缺点:
- 在往一个分区写数据的时候,需要先加载该分区所有 bucket 的 index。1 亿个主键大约消耗 1GB 内存。
- 当存在跨分区更新时,性能会明显变差,后文会展开说明。
Postpone bucket
设置 bucket=-2开启 postpone bucket,写入的数据会先放在 bucket-postpone文件夹下面,此时数据是不会被 reader 读取到。只有当用户手动执行 compact 后,数据才会被划分到指定的 bucket 下面。
Compact 前:数据都会以 avro 格式临时存储在 bucket-postpone文件夹下:
├── dt=2012
│ └── bucket-postpone
│ └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-061ef2ef-93a0-4b7b-b978-ea2bab71befc-0.avro
├── dt=2013
│ └── bucket-postpone
│ └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-88dfc7d7-b040-44d2-87e9-13e7c37bb71b-0.avro
├── manifest
│ ├── manifest-3f088188-b88d-4eea-a614-06c648c5a1ae-0
│ ├── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-0
│ └── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-1
├── schema
│ └── schema-0
└── snapshot
├── LATEST
└── snapshot-1
用户执行 Compact 的时候,按照 postpone.default-bucket-num划分 bucket(默认值为 1)。
bucket-postpone在执行 compact 命令的时候,底层直接复用的是 FixedBucket 逻辑。
Compact 后:
├── dt=2012
│ ├── bucket-0
│ │ └── data-1b180ea9-d358-4dcc-b218-c7abc96df86e-0.parquet
│ └── bucket-postpone
│ └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-061ef2ef-93a0-4b7b-b978-ea2bab71befc-0.avro
├── dt=2013
│ ├── bucket-0
│ │ └── data-501cc1b2-beb3-4a42-8ee2-f0eac2c7958e-0.parquet
│ └── bucket-postpone
│ └── data--u-cd383d25-7047-473d-8a55-77b1c9db98ff-s-0-w-88dfc7d7-b040-44d2-87e9-13e7c37bb71b-0.avro
├── manifest
│ ├── manifest-3f088188-b88d-4eea-a614-06c648c5a1ae-0
│ ├── manifest-905baa52-a049-417f-b6d5-a0e1144ba8e1-0
│ ├── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-0
│ ├── manifest-list-b3a9879c-6907-4eff-ab5f-ad8090c09389-1
│ ├── manifest-list-e51a6085-27a1-4e91-9a76-b9cdb1da0142-0
│ └── manifest-list-e51a6085-27a1-4e91-9a76-b9cdb1da0142-1
├── schema
│ └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
├── snapshot-1
└── snapshot-2
注意点:
一个分区只有第一次 compact 的时候,bucket 划分会遵循 postpone.default-bucket-num 的值。后续该分区所有的 compact 都会复用已经确定的 bucket 数量(即使你修改了 postpone.default-bucket-num也直接无视)。
决策 bucket 数量的伪代码逻辑:
// 当前的 bucket 数量决策逻辑
for (BinaryRow partition : partitions) {
int bucketNum = defaultBucketNum; // ① 先取 postpone.default-bucket-num
Iterator<ManifestEntry> it = table.newSnapshotReader()
.withPartitionFilter(partition)
.onlyReadRealBuckets()
.readFileIterator();
if (it.hasNext()) {
bucketNum = it.next().totalBuckets(); // ② 如果已有真实 bucket,用已有的数量
}
// ③ 用 bucketNum 构造 FixedBucketSink 写入
}
如果你实在想修改 bucket 数量,则需要通过 rescale 命令重新分配 bucket 数量。
优点:
- 把划分 bucket 的操作延后到 compact 阶段,这时候可以根据
bucket-postpone文件夹的数据规模,手动设定 bucket 数量进行 compact。 - 可以对每一个分区独立的进行 compact,这样可以每个分区 bucket 数量不一样。(其实这个 fixed bucket 也能实现)
缺点:
- 要 compact 之后,才可以被 reader 可见,数据实时度不够。
主键跨分区更新
当主键存在跨分区更新这种情况的时候,bucket 划分就变得复杂了。
考虑如下例子:
CREATE TABLE test (
id bigint,
dt string,
name string,
primary key(id) not enforced
)
PARTITIONED BY (dt)
WITH (
'bucket' = '-1'
);
test 表的分区列是 dt,但是主键列是 id。
考虑一个场景:
insert into test values (1, '2012', 'smith');
insert into test values (1, '2013', 'danny');
此时 dt=2012 和 dt=2013 这两个分区下都有 id=1 的数据,Paimon 需要保证数据读取的正确性,理论上,danny 应该覆盖 smith 对应的旧值。
接下来看看 Paimon 是怎么做的。
Fixed bucket & Postpone bucket
先上结论,Fixed bucket 和 Postpone bucket 在 cross partition 的情况下,会导致数据重复。需要用户自行在数据输入端确保主键的唯一性。
Paimon 官网也说明了:
You can also use Cross Partitions Upsert with bucket (N > 0) or bucket (-2), in these modes, there is no global index to ensure that your data undergoes reasonable deduplication, so relying on your input to have a complete changelog can ensure the uniqueness of the data.
验证一下:
create table test(
id bigint,
dt string,
name string,
primary key(id) not enforced
) partitioned by (dt) with ('bucket'='1');
insert into test values(1, '2012', 'smith'), (2, '2012', 'danny');
insert into test values(1, '2013', 'blossom');
理论上 dt=2012 的 smith 会被 dt=2013 的 blossom 替代。但是现在如果你执行 SELECT * 扫描任务,你会看到数据 id=1 的主键重复。
id dt name
1 2013 blossom
1 2012 smith
2 2012 danny
Dynamic bucket
在跨分区更新的情况下,dynamic bucket 不会再创建 IndexManifest 索引文件。
create table test(
id bigint,
dt string,
name string,
primary key(id) not enforced
) partitioned by (dt) with ('bucket'='-1');
insert into test values(1, '2012', 'smith'), (2, '2012', 'danny');
目录结构上没有 index:
├── dt=2012
│ └── bucket-0
│ └── data-87175f86-40a1-44c4-9c42-0efc88fd07b5-0.parquet
├── manifest
│ ├── manifest-33694416-45b0-4cde-bece-09ed653a868c-0
│ ├── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-0
│ └── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-1
├── schema
│ └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
└── snapshot-1
此时再插入一行数据:
insert into test values(1, '2013', 'blossom');
├── dt=2012
│ └── bucket-0
│ ├── data-7afdb6f3-cb48-4e3a-912f-555af68e69dd-0.parquet
│ └── data-87175f86-40a1-44c4-9c42-0efc88fd07b5-0.parquet
├── dt=2013
│ └── bucket-0
│ └── data-4833fa82-d805-479a-8807-25e69e8647ec-0.parquet
├── manifest
│ ├── manifest-33694416-45b0-4cde-bece-09ed653a868c-0
│ ├── manifest-71bf8258-1715-4540-8688-f2fbe852b1d3-0
│ ├── manifest-list-9041c45f-9087-4ef3-b7e1-f9fbd02672b5-0
│ ├── manifest-list-9041c45f-9087-4ef3-b7e1-f9fbd02672b5-1
│ ├── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-0
│ └── manifest-list-c637c63e-a4f6-4c3e-ae51-b12a1914ac94-1
├── schema
│ └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
├── snapshot-1
└── snapshot-2
这时候 dt=2013 里面多出了我们新插入的数据。dt=2012 里面也多了一个 parquet,它负责将旧的 smith 记录标记为删除(Deduplicate 模式)。
dt=2012 第一次 insert 生成的 parquet 文件内容:
{"_KEY_id": 1, "_SEQUENCE_NUMBER": 0, "_VALUE_KIND": 0, "id": 1, "dt": "2012", "name": "smith"}
{"_KEY_id": 2, "_SEQUENCE_NUMBER": 1, "_VALUE_KIND": 0, "id": 2, "dt": "2012", "name": "danny"}
dt=2012 标记删除的 parquet 文件内容:
{"_KEY_id": 1, "_SEQUENCE_NUMBER": 2, "_VALUE_KIND": 3, "id": 1, "dt": "2012", "name": "blossom"}
此时你查询 Paimon 表,结果都是正确的。
那么问题来了,Paimon 是如何维护 index,来确定一个 primary key 在哪一个 bucket 里面?
就是在一个 writer 任务启动的时候,它会使用 RocksDB 维护一个全局索引,先扫描所有的历史数据,构建出一个全局的 PK-> (partition, bucket) 的映射,再开始写入数据。
所以其缺点也很明显,当数据量较大时,writer 任务初始化会明显变慢。
官方也提了一嘴:
Performance: For tables with a large amount of data, there will be a significant loss in performance. Moreover, initialization takes a long time.
总结
- 上面的讨论都只是涉及 writer,对于 reader 来说,只需要按照元信息读取数据即可,不需要关心 bucket 的策略。
- 在设计表结构的时候,最好把分区列也加入主键列,这样可以获得最佳性能,而且不需要担心数据会出现重复。
原创文章,作者:Smith,如若转载,请注明出处:https://www.inlighting.org/archives/apache-paimon-pk-table-data-distribution
微信扫一扫