pg_incremental
增量处理流式事件
仓库
CrunchyData/pg_incremental
https://github.com/CrunchyData/pg_incremental
源码
pg_incremental-1.4.1.tar.gz
pg_incremental-1.4.1.tar.gz
概览
| 扩展包名 | 版本 | 分类 | 许可证 | 语言 |
|---|---|---|---|---|
pg_incremental | 1.4.1 | FEAT | PostgreSQL | C |
| ID | 扩展名 | Bin | Lib | Load | Create | Trust | Reloc | 模式 |
|---|---|---|---|---|---|---|---|---|
| 2850 | pg_incremental | 否 | 是 | 否 | 是 | 否 | 否 | pg_catalog |
| 相关扩展 | pg_cron age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan |
|---|
版本
| 类型 | 仓库 | 版本 | PG 大版本 | 包名 | 依赖 |
|---|---|---|---|---|---|
| EXT | PIGSTY | 1.4.1 | 1817161514 | pg_incremental | pg_cron |
| RPM | PIGSTY | 1.4.1 | 1817161514 | pg_incremental_$v | pg_cron_$v |
| DEB | PIGSTY | 1.4.1 | 1817161514 | postgresql-$v-pg-incremental | postgresql-$v-cron |
构建
您可以使用 pig build 命令构建 pg_incremental 扩展的 RPM / DEB 包:
pig build pkg pg_incremental # 构建 RPM / DEB 包
安装
您可以直接安装 pg_incremental 扩展包的预置二进制包,首先确保 PGDG 和 PIGSTY 仓库已经添加并启用:
pig repo add pgsql -u # 添加仓库并更新缓存
使用 pig 或者是 apt/yum/dnf 安装扩展:
pig install pg_incremental; # 当前活跃 PG 版本安装
pig ext install -y pg_incremental -v 18 # PG 18
pig ext install -y pg_incremental -v 17 # PG 17
pig ext install -y pg_incremental -v 16 # PG 16
dnf install -y pg_incremental_18 # PG 18
dnf install -y pg_incremental_17 # PG 17
dnf install -y pg_incremental_16 # PG 16
apt install -y postgresql-18-pg-incremental # PG 18
apt install -y postgresql-17-pg-incremental # PG 17
apt install -y postgresql-16-pg-incremental # PG 16
创建扩展:
CREATE EXTENSION pg_incremental CASCADE; -- 依赖: pg_cron
用法
pg_incremental 扩展在 PostgreSQL 中提供快速、可靠的增量批处理流水线。它定义参数化查询,为新数据周期性执行,确保恰好一次投递。
CREATE EXTENSION pg_incremental CASCADE; -- 依赖 pg_cron
流水线类型
共有三种类型的流水线:
- 序列流水线 – 处理来自表的序列值范围
- 时间间隔流水线 – 在时间间隔过后处理时间范围
- 文件列表流水线 – 处理文件列表函数返回的新文件
序列流水线
创建一个使用序列增量聚合新行的流水线:
SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$
INSERT INTO events_agg
SELECT date_trunc('day', event_time), count(*)
FROM events
WHERE event_id BETWEEN $1 AND $2
GROUP BY 1
ON CONFLICT (day) DO UPDATE SET event_count = events_agg.event_count + excluded.event_count
$$);
$1 和 $2 被设置为可以安全处理的最小和最大序列值。
带批次大小限制:
SELECT incremental.create_sequence_pipeline(
'event-aggregation', 'events',
$$ ... $$,
schedule := '* * * * *',
max_batch_size := 10000
);
时间间隔流水线
按固定时间间隔处理数据:
SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
INSERT INTO events_agg
SELECT event_time::date, count(distinct event_id)
FROM events
WHERE event_time >= $1 AND event_time < $2
GROUP BY 1
$$);
$1 和 $2 被设置为时间范围的起始和结束(不包含)。
按间隔执行(例如每日导出):
SELECT incremental.create_time_interval_pipeline('event-export',
time_interval := '1 day',
batched := false,
start_time := '2024-11-01',
command := $$ SELECT export_events($1, $2) $$
);
文件列表流水线
在新文件出现时处理它们:
SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$
SELECT import_events($1)
$$);
管理函数
| 函数 | 描述 |
|---|---|
incremental.execute_pipeline(name) | 手动执行流水线(仅在有新数据时) |
incremental.reset_pipeline(name) | 重置流水线,从头开始重新处理 |
incremental.drop_pipeline(name) | 删除流水线 |
incremental.skip_file(pipeline, path) | 在文件列表流水线中跳过有问题的文件 |
监控
SELECT * FROM incremental.sequence_pipelines;
SELECT * FROM incremental.time_interval_pipelines;
SELECT * FROM incremental.processed_files;
通过 pg_cron 检查作业结果:
SELECT jobname, start_time, status, return_message
FROM cron.job_run_details JOIN cron.job USING (jobid)
WHERE jobname LIKE 'pipeline:%' ORDER BY 1 DESC LIMIT 5;