pgmqtt

PostgreSQL 的 CDC 到 MQTT 代理扩展

概览

扩展包名版本分类许可证语言
pgmqtt0.1.0ETLELv2Rust
ID扩展名BinLibLoadCreateTrustReloc模式
9620pgmqtt-

manually upgraded PGRX from 0.16.1 to 0.17.0 by Vonng; requires wal_level = logical for CDC.

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY0.1.01817161514pgmqtt-
RPMPIGSTY0.1.01817161514pgmqtt_$v-
DEBPIGSTY0.1.01817161514postgresql-$v-pgmqtt-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
d13.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u22.x86_64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u22.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u24.x86_64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u24.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u26.x86_64PIGSTY MISSPIGSTY MISSPIGSTY MISSPIGSTY MISSPIGSTY MISS
u26.aarch64

构建

您可以使用 pig build 命令构建 pgmqtt 扩展的 RPM / DEB 包:

pig build pkg pgmqtt         # 构建 RPM / DEB 包

安装

您可以直接安装 pgmqtt 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install pgmqtt;          # 当前活跃 PG 版本安装
pig ext install -y pgmqtt -v 18  # PG 18
pig ext install -y pgmqtt -v 17  # PG 17
pig ext install -y pgmqtt -v 16  # PG 16
pig ext install -y pgmqtt -v 15  # PG 15
pig ext install -y pgmqtt -v 14  # PG 14
dnf install -y pgmqtt_18       # PG 18
dnf install -y pgmqtt_17       # PG 17
dnf install -y pgmqtt_16       # PG 16
dnf install -y pgmqtt_15       # PG 15
dnf install -y pgmqtt_14       # PG 14
apt install -y postgresql-18-pgmqtt   # PG 18
apt install -y postgresql-17-pgmqtt   # PG 17
apt install -y postgresql-16-pgmqtt   # PG 16
apt install -y postgresql-15-pgmqtt   # PG 15
apt install -y postgresql-14-pgmqtt   # PG 14

创建扩展

CREATE EXTENSION pgmqtt;

用法

来源:README, interfaces, configuration, limitations, Cargo.toml

pgmqtt 是一个 pgrx 扩展,会在 PostgreSQL 中嵌入 MQTT broker,并使用 change data capture 将表变更转为 MQTT 消息。它也支持 inbound topic mappings,让 MQTT publish 可向 PostgreSQL 表插入行。

CREATE EXTENSION pgmqtt;

Outbound Mapping

将表变更发布到 topic:

SELECT pgmqtt_add_outbound_mapping(
  'public',
  'my_table',
  'topics/{{ op | lower }}',
  '{{ columns | tojson }}',
  1
);

有了该 mapping 后,INSERTUPDATEDELETE 会把 JSON payload 发布到 topics/insert 等 topic。文档化函数签名还接受可选的 qos integer DEFAULT 0template_type text DEFAULT 'jinja2'

Inbound Mapping

从 MQTT publish 插入行:

SELECT pgmqtt_add_inbound_mapping(
  'sensor/{site_id}/temperature',
  'sensor_readings',
  '{"site_id": "{site_id}", "value": "$.temperature"}'::jsonb
);

sensor/site-1/temperature 发布 {"temperature": 22.5} 会向 sensor_readings 插入一行。

Inbound mappings 也可通过传入 opconflict_columnstarget_schemamapping_nametemplate_type 执行 upsertdelete 操作。Topic patterns 使用 {variable} 捕获;JSON payload fields 使用 $.temperature$payload$topic 等表达式。

查看和删除 Mappings

SELECT * FROM pgmqtt_list_outbound_mappings();
SELECT pgmqtt_remove_outbound_mapping('public', 'my_table');

SELECT * FROM pgmqtt_list_inbound_mappings();
SELECT pgmqtt_remove_inbound_mapping('temp_readings');

SELECT * FROM pgmqtt_status();

pgmqtt_status() 报告 active connections、subscriptions、retained messages、pending session messages、CDC mappings、CDC slot state、inbound mappings、pending inbound writes 和 dead letters。

MQTT Client 示例

mosquitto_sub -h localhost -t 'topics/#'
mosquitto_pub -h localhost -t 'sensor/site-1/temperature' -m '{"temperature": 22.5}'

配置

文档化 GUC 位于 pgmqtt namespace 下:

ALTER SYSTEM SET pgmqtt.cdc_every_n_ticks = 16;
SELECT pg_reload_conf();

Listener GUC 包括 pgmqtt.mqtt_enabledpgmqtt.mqtt_port1883)、pgmqtt.ws_enabledpgmqtt.ws_port9001)、pgmqtt.mqtts_enabledpgmqtt.mqtts_port8883)、pgmqtt.wss_enabledpgmqtt.wss_port9002)。TLS 和认证设置包括 pgmqtt.tls_cert_filepgmqtt.tls_key_filepgmqtt.license_keypgmqtt.jwt_public_keypgmqtt.jwt_requiredpgmqtt.jwt_required_ws

性能和可观测性 GUC 包括 pgmqtt.tick_interval_mspgmqtt.max_client_buffer_bytespgmqtt.cdc_every_n_tickspgmqtt.debug_logpgmqtt.metrics_snapshot_intervalpgmqtt.metrics_retention_dayspgmqtt.metrics_connections_cache_intervalpgmqtt.metrics_hook_functionpgmqtt.metrics_notify_channel。Listener 和 TLS 设置在 MQTT background worker 启动时读取,因此需要重启 worker,而不是只执行 pg_reload_conf()

注意事项

  • README 要求 wal_level = logical;没有 logical decoding 时 CDC 侧无法工作。
  • 本项目 CSV 跟踪版本 0.1.0、PostgreSQL 14-18,以及包侧 pgrx 0.17.0 rebuild note。上游 Cargo.toml 仍展示较旧的 build defaults,因此这里以 CSV 作为 package/platform 权威来源。
  • 文档记录支持 MQTT 5.0 和 MQTT 3.1.1 clients。QoS 0 和 QoS 1 受支持;QoS 2 未实现,请求 QoS 2 的 subscriptions 会降级到 QoS 1。
  • CDC 捕获 INSERTUPDATEDELETE;不捕获 DDL changes 和 TRUNCATEDELETE 可能需要 REPLICA IDENTITY FULL

最后修改 2026-05-18: routine extension update (ac43610)