pg_incremental

增量处理流式事件

概览

扩展包名版本分类许可证语言
pg_incremental1.4.1FEATPostgreSQLC
ID扩展名BinLibLoadCreateTrustReloc模式
2850pg_incrementalpg_catalog
相关扩展pg_cron age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY1.4.11817161514pg_incrementalpg_cron
RPMPIGSTY1.4.11817161514pg_incremental_$vpg_cron_$v
DEBPIGSTY1.4.11817161514postgresql-$v-pg-incrementalpostgresql-$v-cron
OS / PGPG18PG17PG16PG15PG14
el8.x86_64PIGSTY MISSPIGSTY MISS
el8.aarch64PIGSTY MISSPIGSTY MISS
el9.x86_64PIGSTY MISSPIGSTY MISS
el9.aarch64PIGSTY MISSPIGSTY MISS
el10.x86_64PIGSTY MISSPIGSTY MISS
el10.aarch64PIGSTY MISSPIGSTY MISS
d12.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
d12.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
d13.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
d13.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u22.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u22.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u24.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u24.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS

构建

您可以使用 pig build 命令构建 pg_incremental 扩展的 RPM / DEB 包:

pig build pkg pg_incremental         # 构建 RPM / DEB 包

安装

您可以直接安装 pg_incremental 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install pg_incremental;          # 当前活跃 PG 版本安装
pig ext install -y pg_incremental -v 18  # PG 18
pig ext install -y pg_incremental -v 17  # PG 17
pig ext install -y pg_incremental -v 16  # PG 16
dnf install -y pg_incremental_18       # PG 18
dnf install -y pg_incremental_17       # PG 17
dnf install -y pg_incremental_16       # PG 16
apt install -y postgresql-18-pg-incremental   # PG 18
apt install -y postgresql-17-pg-incremental   # PG 17
apt install -y postgresql-16-pg-incremental   # PG 16

创建扩展

CREATE EXTENSION pg_incremental CASCADE;  -- 依赖: pg_cron

用法

pg_incremental: PostgreSQL 中的增量数据处理

pg_incremental 扩展在 PostgreSQL 中提供快速、可靠的增量批处理流水线。它定义参数化查询,为新数据周期性执行,确保恰好一次投递。

CREATE EXTENSION pg_incremental CASCADE;  -- 依赖 pg_cron

流水线类型

共有三种类型的流水线:

  • 序列流水线 – 处理来自表的序列值范围
  • 时间间隔流水线 – 在时间间隔过后处理时间范围
  • 文件列表流水线 – 处理文件列表函数返回的新文件

序列流水线

创建一个使用序列增量聚合新行的流水线:

SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$
  INSERT INTO events_agg
  SELECT date_trunc('day', event_time), count(*)
  FROM events
  WHERE event_id BETWEEN $1 AND $2
  GROUP BY 1
  ON CONFLICT (day) DO UPDATE SET event_count = events_agg.event_count + excluded.event_count
$$);

$1$2 被设置为可以安全处理的最小和最大序列值。

带批次大小限制:

SELECT incremental.create_sequence_pipeline(
  'event-aggregation', 'events',
  $$ ... $$,
  schedule := '* * * * *',
  max_batch_size := 10000
);

时间间隔流水线

按固定时间间隔处理数据:

SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
  INSERT INTO events_agg
  SELECT event_time::date, count(distinct event_id)
  FROM events
  WHERE event_time >= $1 AND event_time < $2
  GROUP BY 1
$$);

$1$2 被设置为时间范围的起始和结束(不包含)。

按间隔执行(例如每日导出):

SELECT incremental.create_time_interval_pipeline('event-export',
  time_interval := '1 day',
  batched := false,
  start_time := '2024-11-01',
  command := $$ SELECT export_events($1, $2) $$
);

文件列表流水线

在新文件出现时处理它们:

SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$
  SELECT import_events($1)
$$);

管理函数

函数描述
incremental.execute_pipeline(name)手动执行流水线(仅在有新数据时)
incremental.reset_pipeline(name)重置流水线,从头开始重新处理
incremental.drop_pipeline(name)删除流水线
incremental.skip_file(pipeline, path)在文件列表流水线中跳过有问题的文件

监控

SELECT * FROM incremental.sequence_pipelines;
SELECT * FROM incremental.time_interval_pipelines;
SELECT * FROM incremental.processed_files;

通过 pg_cron 检查作业结果:

SELECT jobname, start_time, status, return_message
FROM cron.job_run_details JOIN cron.job USING (jobid)
WHERE jobname LIKE 'pipeline:%' ORDER BY 1 DESC LIMIT 5;

最后修改 2026-03-14: update extension metadata (953cbd0)