pgmqtt
概览
| 扩展包名 | 版本 | 分类 | 许可证 | 语言 |
|---|---|---|---|---|
pgmqtt | 0.1.0 | ETL | ELv2 | Rust |
| ID | 扩展名 | Bin | Lib | Load | Create | Trust | Reloc | 模式 |
|---|---|---|---|---|---|---|---|---|
| 9620 | pgmqtt | 否 | 是 | 否 | 是 | 否 | 否 | - |
manually upgraded PGRX from 0.16.1 to 0.17.0 by Vonng; requires wal_level = logical for CDC.
版本
| 类型 | 仓库 | 版本 | PG 大版本 | 包名 | 依赖 |
|---|---|---|---|---|---|
| EXT | PIGSTY | 0.1.0 | 1817161514 | pgmqtt | - |
| RPM | PIGSTY | 0.1.0 | 1817161514 | pgmqtt_$v | - |
| DEB | PIGSTY | 0.1.0 | 1817161514 | postgresql-$v-pgmqtt | - |
构建
您可以使用 pig build 命令构建 pgmqtt 扩展的 RPM / DEB 包:
pig build pkg pgmqtt # 构建 RPM / DEB 包
安装
您可以直接安装 pgmqtt 扩展包的预置二进制包,首先确保 PGDG 和 PIGSTY 仓库已经添加并启用:
pig repo add pgsql -u # 添加仓库并更新缓存
使用 pig 或者是 apt/yum/dnf 安装扩展:
pig install pgmqtt; # 当前活跃 PG 版本安装
pig ext install -y pgmqtt -v 18 # PG 18
pig ext install -y pgmqtt -v 17 # PG 17
pig ext install -y pgmqtt -v 16 # PG 16
pig ext install -y pgmqtt -v 15 # PG 15
pig ext install -y pgmqtt -v 14 # PG 14
dnf install -y pgmqtt_18 # PG 18
dnf install -y pgmqtt_17 # PG 17
dnf install -y pgmqtt_16 # PG 16
dnf install -y pgmqtt_15 # PG 15
dnf install -y pgmqtt_14 # PG 14
apt install -y postgresql-18-pgmqtt # PG 18
apt install -y postgresql-17-pgmqtt # PG 17
apt install -y postgresql-16-pgmqtt # PG 16
apt install -y postgresql-15-pgmqtt # PG 15
apt install -y postgresql-14-pgmqtt # PG 14
创建扩展:
CREATE EXTENSION pgmqtt;
用法
来源:README, interfaces, configuration, limitations, Cargo.toml
pgmqtt 是一个 pgrx 扩展,会在 PostgreSQL 中嵌入 MQTT broker,并使用 change data capture 将表变更转为 MQTT 消息。它也支持 inbound topic mappings,让 MQTT publish 可向 PostgreSQL 表插入行。
CREATE EXTENSION pgmqtt;
Outbound Mapping
将表变更发布到 topic:
SELECT pgmqtt_add_outbound_mapping(
'public',
'my_table',
'topics/{{ op | lower }}',
'{{ columns | tojson }}',
1
);
有了该 mapping 后,INSERT、UPDATE 和 DELETE 会把 JSON payload 发布到 topics/insert 等 topic。文档化函数签名还接受可选的 qos integer DEFAULT 0 和 template_type text DEFAULT 'jinja2'。
Inbound Mapping
从 MQTT publish 插入行:
SELECT pgmqtt_add_inbound_mapping(
'sensor/{site_id}/temperature',
'sensor_readings',
'{"site_id": "{site_id}", "value": "$.temperature"}'::jsonb
);
向 sensor/site-1/temperature 发布 {"temperature": 22.5} 会向 sensor_readings 插入一行。
Inbound mappings 也可通过传入 op、conflict_columns、target_schema、mapping_name 和 template_type 执行 upsert 与 delete 操作。Topic patterns 使用 {variable} 捕获;JSON payload fields 使用 $.temperature、$payload、$topic 等表达式。
查看和删除 Mappings
SELECT * FROM pgmqtt_list_outbound_mappings();
SELECT pgmqtt_remove_outbound_mapping('public', 'my_table');
SELECT * FROM pgmqtt_list_inbound_mappings();
SELECT pgmqtt_remove_inbound_mapping('temp_readings');
SELECT * FROM pgmqtt_status();
pgmqtt_status() 报告 active connections、subscriptions、retained messages、pending session messages、CDC mappings、CDC slot state、inbound mappings、pending inbound writes 和 dead letters。
MQTT Client 示例
mosquitto_sub -h localhost -t 'topics/#'
mosquitto_pub -h localhost -t 'sensor/site-1/temperature' -m '{"temperature": 22.5}'
配置
文档化 GUC 位于 pgmqtt namespace 下:
ALTER SYSTEM SET pgmqtt.cdc_every_n_ticks = 16;
SELECT pg_reload_conf();
Listener GUC 包括 pgmqtt.mqtt_enabled、pgmqtt.mqtt_port(1883)、pgmqtt.ws_enabled、pgmqtt.ws_port(9001)、pgmqtt.mqtts_enabled、pgmqtt.mqtts_port(8883)、pgmqtt.wss_enabled、pgmqtt.wss_port(9002)。TLS 和认证设置包括 pgmqtt.tls_cert_file、pgmqtt.tls_key_file、pgmqtt.license_key、pgmqtt.jwt_public_key、pgmqtt.jwt_required、pgmqtt.jwt_required_ws。
性能和可观测性 GUC 包括 pgmqtt.tick_interval_ms、pgmqtt.max_client_buffer_bytes、pgmqtt.cdc_every_n_ticks、pgmqtt.debug_log、pgmqtt.metrics_snapshot_interval、pgmqtt.metrics_retention_days、pgmqtt.metrics_connections_cache_interval、pgmqtt.metrics_hook_function、pgmqtt.metrics_notify_channel。Listener 和 TLS 设置在 MQTT background worker 启动时读取,因此需要重启 worker,而不是只执行 pg_reload_conf()。
注意事项
- README 要求
wal_level = logical;没有 logical decoding 时 CDC 侧无法工作。 - 本项目 CSV 跟踪版本
0.1.0、PostgreSQL 14-18,以及包侧pgrx0.17.0rebuild note。上游Cargo.toml仍展示较旧的 build defaults,因此这里以 CSV 作为 package/platform 权威来源。 - 文档记录支持 MQTT 5.0 和 MQTT 3.1.1 clients。QoS 0 和 QoS 1 受支持;QoS 2 未实现,请求 QoS 2 的 subscriptions 会降级到 QoS 1。
- CDC 捕获
INSERT、UPDATE和DELETE;不捕获 DDL changes 和TRUNCATE。DELETE可能需要REPLICA IDENTITY FULL。