pgmb
一个简单的PostgreSQL消息代理系统
概览
| 扩展包名 | 版本 | 分类 | 许可证 | 语言 |
|---|---|---|---|---|
pgmb | 1.0.0 | FEAT | PostgreSQL | SQL |
| ID | 扩展名 | Bin | Lib | Load | Create | Trust | Reloc | 模式 |
|---|---|---|---|---|---|---|---|---|
| 2870 | pgmb | 否 | 否 | 否 | 是 | 否 | 否 | pgmb |
| 相关扩展 | pg_cron http pgmq pgq pg_task pg_cron pg_background pg_later pg_net kafka_fdw |
|---|
版本
| 类型 | 仓库 | 版本 | PG 大版本 | 包名 | 依赖 |
|---|---|---|---|---|---|
| EXT | PIGSTY | 1.0.0 | 1817161514 | pgmb | pg_cron, http |
| RPM | PIGSTY | 1.0.0 | 1817161514 | pgmb_$v | pg_cron_$v, pg_http_$v |
| DEB | PIGSTY | 1.0.0 | 1817161514 | postgresql-$v-pgmb | postgresql-$v-cron, postgresql-$v-http |
构建
您可以使用 pig build 命令构建 pgmb 扩展的 RPM / DEB 包:
pig build pkg pgmb # 构建 RPM / DEB 包
安装
您可以直接安装 pgmb 扩展包的预置二进制包,首先确保 PGDG 和 PIGSTY 仓库已经添加并启用:
pig repo add pgsql -u # 添加仓库并更新缓存
使用 pig 或者是 apt/yum/dnf 安装扩展:
pig install pgmb; # 当前活跃 PG 版本安装
pig ext install -y pgmb -v 18 # PG 18
pig ext install -y pgmb -v 17 # PG 17
pig ext install -y pgmb -v 16 # PG 16
pig ext install -y pgmb -v 15 # PG 15
pig ext install -y pgmb -v 14 # PG 14
dnf install -y pgmb_18 # PG 18
dnf install -y pgmb_17 # PG 17
dnf install -y pgmb_16 # PG 16
dnf install -y pgmb_15 # PG 15
dnf install -y pgmb_14 # PG 14
apt install -y postgresql-18-pgmb # PG 18
apt install -y postgresql-17-pgmb # PG 17
apt install -y postgresql-16-pgmb # PG 16
apt install -y postgresql-15-pgmb # PG 15
apt install -y postgresql-14-pgmb # PG 14
创建扩展:
CREATE EXTENSION pgmb CASCADE; -- 依赖: pg_cron, http
用法
pgmb 扩展提供数据库内置的消息代理,支持基于 HTTP 的工作进程分发、自动重试、死信队列和基于模式的路由。
CREATE EXTENSION pgmb; -- 需要 pg_cron 和 http 扩展
注册工作进程
SELECT pgmb.worker(
'order_processor', -- 工作进程名称
'http://localhost:8080/process', -- 端点 URL
100 -- 每秒请求数限制
);
-- 返回: 工作进程 UUID
创建队列
SELECT pgmb.create(
'order_queue', -- 队列名称
'order.*', -- 绑定键模式(支持 * 通配符)
5, -- 最大重试次数
'550e8400-e29b-41d4-a716-446655440000' -- 工作进程 UUID
);
-- 返回: 队列 UUID
发送消息
-- 简单消息
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb
);
-- 带自定义头部
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123}'::jsonb,
'{"source": "web", "priority": "high"}'::jsonb
);
-- 延迟消息(按时间戳或秒数)
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123}'::jsonb,
'{}'::jsonb,
now() + interval '10 minutes'
);
API 参考
| 函数 | 描述 |
|---|---|
pgmb.worker(name, endpoint, rps) | 注册 HTTP 工作进程端点 |
pgmb.create(name, binding_key, max_retries, worker_id) | 创建带路由模式的队列 |
pgmb.send(id, routing_key, body) | 发送消息 |
pgmb.send(id, routing_key, body, headers) | 发送带头部的消息 |
pgmb.send(id, routing_key, body, headers, delay) | 发送延迟消息 |
工作原理
- 消息通过
pgmb.send()插入到pgmb.messages表 - 触发器根据路由键模式将消息路由到匹配的队列
pg_cron每秒通过 HTTP POST 将消息分发到工作进程端点- 失败的消息会被重试;超过最大重试次数后移入死信队列
监控
SELECT * FROM pgmb.workers;
SELECT * FROM pgmb.queues;
SELECT COUNT(*) FROM pgmb.order_queue WHERE acknoledge = false;
SELECT * FROM pgmb.order_dead_letter_queue;