pgmqtt

PostgreSQL 的 CDC 到 MQTT 代理扩展

概览

扩展包名版本分类许可证语言
pgmqtt0.1.0ETLElastic License 2.0Rust
ID扩展名BinLibLoadCreateTrustReloc模式
9620pgmqtt-

manually upgraded PGRX from 0.16.1 to 0.17.0 by Vonng; requires wal_level = logical for CDC.

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY0.1.01817161514pgmqtt-
RPMPIGSTY0.1.01817161514pgmqtt_$v-
DEBPIGSTY0.1.01817161514postgresql-$v-pgmqtt-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
d13.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u22.x86_64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u22.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u24.x86_64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u24.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0

构建

您可以使用 pig build 命令构建 pgmqtt 扩展的 RPM / DEB 包:

pig build pkg pgmqtt         # 构建 RPM / DEB 包

安装

您可以直接安装 pgmqtt 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install pgmqtt;          # 当前活跃 PG 版本安装
pig ext install -y pgmqtt -v 18  # PG 18
pig ext install -y pgmqtt -v 17  # PG 17
pig ext install -y pgmqtt -v 16  # PG 16
pig ext install -y pgmqtt -v 15  # PG 15
pig ext install -y pgmqtt -v 14  # PG 14
dnf install -y pgmqtt_18       # PG 18
dnf install -y pgmqtt_17       # PG 17
dnf install -y pgmqtt_16       # PG 16
dnf install -y pgmqtt_15       # PG 15
dnf install -y pgmqtt_14       # PG 14
apt install -y postgresql-18-pgmqtt   # PG 18
apt install -y postgresql-17-pgmqtt   # PG 17
apt install -y postgresql-16-pgmqtt   # PG 16
apt install -y postgresql-15-pgmqtt   # PG 15
apt install -y postgresql-14-pgmqtt   # PG 14

创建扩展

CREATE EXTENSION pgmqtt;

用法

pgmqtt 是一个基于 pgrx 的 PostgreSQL 扩展,可以把 CDC 变更接入 MQTT broker。数据库变更可以通过 SQL 定义的映射发布到 MQTT topic,MQTT 发布也可以写回 PostgreSQL 表。

README 的快速上手要求将 wal_level 设为 logical,这样才能正确捕获 CDC 事件。

出站映射

SELECT pgmqtt_add_outbound_mapping(
    'public',
    'my_table',
    'topics/{{ op | lower }}',
    '{{ columns | tojson }}'
);

使用这条映射后,INSERTUPDATEDELETE 对该表的变更会作为 MQTT 消息发布出去。订阅 topics/inserttopics/updatetopics/delete 的客户端会收到 JSON 负载。

入站映射

SELECT pgmqtt_add_inbound_mapping(
    'sensor/{site_id}/temperature',
    'sensor_readings',
    '{"site_id": "{site_id}", "value": "$.temperature"}'::jsonb
);

当客户端向 sensor/site-1/temperature 发布负载 {"temperature": 22.5} 时,README 说明会向 sensor_readings 插入一行,并写入提取出的字段值。

客户端示例

mosquitto_sub -h localhost -t 'topics/#'
mosquitto_pub -h localhost -t 'sensor/site-1/temperature' -m '{"temperature": 22.5}'

范围

上游 README 已覆盖 broker 模型、出站/入站映射示例以及基础 MQTT 客户端用法;它没有单独的项目主页文档,因此这个 stub 仅保留 README 范围。


最后修改 2026-04-14: update extension catalog (fa7cf58)