pgmqtt

PostgreSQL 的 CDC 到 MQTT 代理扩展

概览

扩展包名版本分类许可证语言
pgmqtt0.1.0ETLElastic License 2.0Rust
ID扩展名BinLibLoadCreateTrustReloc模式
9620pgmqtt-

manually upgraded PGRX from 0.16.1 to 0.17.0 by Vonng; requires wal_level = logical for CDC.

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY0.1.01817161514pgmqtt-
RPMPIGSTY0.1.01817161514pgmqtt_$v-
DEBPIGSTY0.1.01817161514postgresql-$v-pgmqtt-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
d13.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u22.x86_64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u22.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u24.x86_64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u24.aarch64
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
PIGSTY 0.1.0
u26.x86_64PIGSTY MISSPIGSTY MISSPIGSTY MISSPIGSTY MISSPIGSTY MISS
u26.aarch64PIGSTY MISSPIGSTY MISSPIGSTY MISSPIGSTY MISSPIGSTY MISS

构建

您可以使用 pig build 命令构建 pgmqtt 扩展的 RPM / DEB 包:

pig build pkg pgmqtt         # 构建 RPM / DEB 包

安装

您可以直接安装 pgmqtt 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install pgmqtt;          # 当前活跃 PG 版本安装
pig ext install -y pgmqtt -v 18  # PG 18
pig ext install -y pgmqtt -v 17  # PG 17
pig ext install -y pgmqtt -v 16  # PG 16
pig ext install -y pgmqtt -v 15  # PG 15
pig ext install -y pgmqtt -v 14  # PG 14
dnf install -y pgmqtt_18       # PG 18
dnf install -y pgmqtt_17       # PG 17
dnf install -y pgmqtt_16       # PG 16
dnf install -y pgmqtt_15       # PG 15
dnf install -y pgmqtt_14       # PG 14
apt install -y postgresql-18-pgmqtt   # PG 18
apt install -y postgresql-17-pgmqtt   # PG 17
apt install -y postgresql-16-pgmqtt   # PG 16
apt install -y postgresql-15-pgmqtt   # PG 15
apt install -y postgresql-14-pgmqtt   # PG 14

创建扩展

CREATE EXTENSION pgmqtt;

用法

来源: official README, official repo

pgmqtt 是一个 pgrx 扩展,它把 MQTT broker 嵌入 PostgreSQL,并使用 change data capture 将表变更转换为 MQTT 消息。它也支持入站 topic 映射,使 MQTT publish 可以向 PostgreSQL 表插入行。

CREATE EXTENSION pgmqtt;

出站映射

把表变更发布到 topic:

SELECT pgmqtt_add_outbound_mapping(
  'public',
  'my_table',
  'topics/{{ op | lower }}',
  '{{ columns | tojson }}'
);

使用这条映射后,INSERTUPDATEDELETE 会把 JSON 负载发布到 topics/insert 之类的 topic。

入站映射

把 MQTT publish 写入表:

SELECT pgmqtt_add_inbound_mapping(
  'sensor/{site_id}/temperature',
  'sensor_readings',
  '{"site_id": "{site_id}", "value": "$.temperature"}'::jsonb
);

sensor/site-1/temperature 发布 {"temperature": 22.5} 会向 sensor_readings 插入一行。

MQTT 客户端示例

mosquitto_sub -h localhost -t 'topics/#'
mosquitto_pub -h localhost -t 'sensor/site-1/temperature' -m '{"temperature": 22.5}'

注意事项

  • README 要求 wal_level = logical;没有 logical decoding,CDC 部分就无法工作。
  • 上游文档目前只有 README 级别,因此已记录的 SQL 接口主要限于入站和出站映射工作流。