Bulk Loading(批量装载)¶
Explanation · 原理进阶
一句话理解
把一大块已有的历史数据(TB–PB 级)一次性装进湖表。包括首次冷启动、Hive 表迁移、外部系统导入、过期数据修复。流式入湖的"兄弟场景",套路完全不同。
TL;DR
- 冷启动湖仓的第一件大事,方法对错直接影响 3 个月的性能
- 核心是避免小文件 + 合理分区 + 写入并行度匹配
- Hive → Iceberg 有 register-only 和 rewrite 两条路
- 大规模装载常常要分批 + compaction 穿插,避免一轮"大写入 + 大 compact" 爆开
- 和流式入湖不要混用一个作业,各跑各的更稳
四种典型 Bulk Load 场景¶
场景 A:冷启动¶
新建湖仓第一次把历史数据灌进来。
- 选择当前快照作为截断点
- 之后切流式入湖承接增量
场景 B:Hive → Iceberg 迁移¶
两条路:
- Register-only(零重写) —— 不动数据文件,把 Hive 表的 Parquet 作为 Iceberg 数据文件注册
- 最快,几分钟
- 不利用 Iceberg 的新能力(hidden partitioning、列 ID)
- 长期要重写成 native Iceberg 布局
- Rewrite(全重写) —— 按 Iceberg native spec 重写一遍
- 慢,按 TB 级看分钟到小时
- 直接获得 Iceberg 全部能力
推荐:先 register-only 解锁读写,半年内逐步 rewrite。
场景 C:外部系统导入¶
第三方 Parquet / CSV / JSON 堆栈灌湖。
- 先建 staging 表(Iceberg 或 Paimon append-only)
- 验证 schema / 基数 / NULL 率
- 再 INSERT 到目标事实表
场景 D:回填 / 修复¶
历史数据出错,批量重写某段分区。
- 用 Iceberg
REPLACE PARTITION或MERGE INTO - 不要直接删再写 —— 中间不可用窗口
- 利用 Snapshot 事后可 rollback
工程要点¶
1. 文件大小¶
目标 128MB – 1GB。小文件一旦造成后面几乎救不回来:
# 装载后立即跑
CALL system.rewrite_data_files(
table => 'db.t',
options => map(
'target-file-size-bytes', '536870912' -- 512MB
)
);
2. 写入并行度¶
- 分区数 = 文件数(大致)
- 并行写 100 个分区 = 100 个文件的同时写
- 和 target file size 呼应:期望每个并行 writer 产出 ~target size
3. 分区设计¶
冷启动就是你决定分区策略的唯一便宜机会。Iceberg Hidden Partitioning 是首选:
CREATE TABLE orders (
order_id BIGINT,
ts TIMESTAMP,
user_id BIGINT,
amount DECIMAL
) USING iceberg
PARTITIONED BY (days(ts), bucket(16, user_id));
这两个维度(时间 + hash 散列)适合绝大多数事实表。
4. 分批 vs 一次性¶
PB 级别装载一次性扔给 Spark:
- Executor 内存炸
- Shuffle 爆
- 失败重试代价昂贵
分批:按时间分区或按主键范围分批,每批 10–100GB。批之间穿插 compaction 让表保持健康。
5. 事务边界¶
- Spark
saveAsTable/write.insertInto—— 一次 commit - 不建议一次性 commit PB 级(失败 roll back 很贵)
- 小分批 commit,每批成功独立进账
Spark 装载示例¶
# Hive → Iceberg native rewrite
spark.sql("""
CREATE TABLE warehouse.db.orders
USING iceberg
PARTITIONED BY (days(ts), bucket(16, user_id))
TBLPROPERTIES (
'write.parquet.compression-codec' = 'zstd',
'write.target-file-size-bytes' = '536870912'
)
AS SELECT * FROM hive_db.orders_legacy
""")
# 后续每日增量
spark.sql("""
INSERT INTO warehouse.db.orders
SELECT * FROM hive_db.orders_legacy
WHERE dt = '2026-04-15'
""")
Paimon 装载¶
流式湖表也有批装载场景:
-- Paimon primary key 表的初始化
INSERT INTO paimon_orders
SELECT * FROM mysql_snapshot_dump;
-- 随后切流式 CDC 承接增量
Paimon 的 dynamic-bucket 模式在初始化时特别有用——写入时自动决定分桶,不要预估分桶数。
Paimon bucket vs Iceberg hidden partitioning 对照¶
初始化装载时两家的分区/分桶策略理念不同:
| 维度 | Paimon | Iceberg |
|---|---|---|
| 分组粒度 | Partition + Bucket 双层(粗+细) | 纯 Partition(hidden transforms) |
| 细粒度策略 | Bucket 按主键 hash · 决定并行写和合并粒度 | 分区 transform(bucket(N, col) / days(ts) 等)· 决定剪枝粒度 |
| 初始化灵活性 | dynamic-bucket 模式写入时自动定桶数 · 免预估 |
分区 spec 改动需 PARTITION SPEC evolution(协议层支持演化 · 数据不动) |
| 失败回退 | 错了改 bucket 数要 rescale | 错了改 spec 不影响历史数据(见 Iceberg Partition Evolution) |
| 初装建议 | 从 16 bucket 起 · 或用 dynamic | 先小粒度分区试跑 · 再按查询路径 evolve |
结论:初次装载时两家都不用过度预估——Paimon 有 dynamic-bucket 兜底;Iceberg 有 partition evolution 事后可改。详见 湖仓表格式 · Partition Evolution。
装载后正确性验证 · 必跑 Checklist¶
观察:Bulk load 一旦错 · 后面整个湖里都带毒——增量 CDC 在错误全量的基础上继续增删改 · 错误会累积扩散。装载完成不是"完成" · 必须做正确性验证才能切 CDC。
7 条强制 checklist¶
-- 1. 行数核对 · source 和湖表主键数 / 总行数
SELECT COUNT(*) FROM mysql.orders; -- source
SELECT COUNT(*) FROM iceberg.app.orders; -- target
-- 差异 > 预期误差(如 < 0.01% 允许 · 更多要查)
-- 2. 主键唯一性 · 湖表不应有重复 PK
SELECT order_id, COUNT(*) FROM iceberg.app.orders
GROUP BY order_id HAVING COUNT(*) > 1;
-- 有结果 = bulk 过程中有重复写 · 排查
-- 3. Null-rate 漂移 · 关键字段 null 比例
SELECT
100.0 * COUNT(*) FILTER (WHERE user_id IS NULL) / COUNT(*) AS null_pct
FROM iceberg.app.orders;
-- 对照源库 · 差异大说明类型映射出错或数据截断
-- 4. 金额 / 状态类分布 · 业务字段 sum / 枚举分布
SELECT status, COUNT(*), SUM(amount) FROM iceberg.app.orders
GROUP BY status;
-- 和源库对照 · 不一致说明类型转换丢精度或数据过滤有误
-- 5. 分区完整性 · 每个预期分区都有数据
SELECT partition_date, COUNT(*) FROM iceberg.app.orders
GROUP BY partition_date ORDER BY partition_date;
-- 有缺口意味着某批次装载失败被跳过
-- 6. 边界时间点记录 · source DB 的 binlog offset 是否准确保留
-- (CDC cutover 用 · 见下节)
额外 2 条手动:
- 抽样对比 · 随机拉 1000 行逐字段对比源库(catch 隐藏数据损坏)
- 业务端冒烟 · 业务方跑一次关键 query · 确认数字对("财务月报对不对")
验证失败的正确应对¶
- 绝对不要"先切 CDC 再修"——错误数据会被 CDC 的 update/delete 放大
- 重新 bulk 的代价低于 带毒湖表的后续清理
- 用 Iceberg branch / Paimon tag 冻结当前装载状态 · 在 branch 上修复再合并 main(见 Iceberg Branching)
常见陷阱¶
- 装载时关闭 compaction → 小文件海啸
- 装载后不立即
rewrite_data_files:Zone Maps / 排序缺失,后续查询慢 - 分区选反(按低基数列分区) → 几十万分区,Catalog 压垮
- 装载作业和业务流共用 Spark 集群 → 资源抢占,全都慢
- Hive register-only 后立刻删 Hive 表 → 旧目录被 Iceberg manifest 引用,数据丢失
和流式入湖的配合¶
一旦历史装载完,切到流式:
不要让两者同时写同一张表(除非明确设计过)。典型做法:
- 装载作业写到临时 staging 表
- 流式入湖从装载截断点开始(给 offset 明确值)
- 两者通过主键保证幂等
相关¶
- Kafka 到湖 —— 流式侧
- 流式入湖
- Compaction
- Apache Iceberg
延伸阅读¶
- Iceberg Migration Guide
- Migrating to Iceberg at Scale —— Netflix / Tabular 博客
- Paimon 批初始化文档