pgmb

一个简单的PostgreSQL消息代理系统

概览

扩展包名版本分类许可证语言
pgmb1.0.0FEATPostgreSQLSQL
ID扩展名BinLibLoadCreateTrustReloc模式
2870pgmbpgmb
相关扩展pg_cron http pgmq pgq pg_task pg_cron pg_background pg_later pg_net kafka_fdw

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY1.0.01817161514pgmbpg_cron, http
RPMPIGSTY1.0.01817161514pgmb_$vpg_cron_$v, pg_http_$v
DEBPIGSTY1.0.01817161514postgresql-$v-pgmbpostgresql-$v-cron, postgresql-$v-http
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
d13.aarch64
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
u22.x86_64
u22.aarch64
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
u24.x86_64
u24.aarch64
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0

构建

您可以使用 pig build 命令构建 pgmb 扩展的 RPM / DEB 包:

pig build pkg pgmb         # 构建 RPM / DEB 包

安装

您可以直接安装 pgmb 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install pgmb;          # 当前活跃 PG 版本安装
pig ext install -y pgmb -v 18  # PG 18
pig ext install -y pgmb -v 17  # PG 17
pig ext install -y pgmb -v 16  # PG 16
pig ext install -y pgmb -v 15  # PG 15
pig ext install -y pgmb -v 14  # PG 14
dnf install -y pgmb_18       # PG 18
dnf install -y pgmb_17       # PG 17
dnf install -y pgmb_16       # PG 16
dnf install -y pgmb_15       # PG 15
dnf install -y pgmb_14       # PG 14
apt install -y postgresql-18-pgmb   # PG 18
apt install -y postgresql-17-pgmb   # PG 17
apt install -y postgresql-16-pgmb   # PG 16
apt install -y postgresql-15-pgmb   # PG 15
apt install -y postgresql-14-pgmb   # PG 14

创建扩展

CREATE EXTENSION pgmb CASCADE;  -- 依赖: pg_cron, http

用法

pgmb: 构建在 PostgreSQL 内部的轻量级消息代理系统

pgmb 扩展提供数据库内置的消息代理,支持基于 HTTP 的工作进程分发、自动重试、死信队列和基于模式的路由。

CREATE EXTENSION pgmb;  -- 需要 pg_cron 和 http 扩展

注册工作进程

SELECT pgmb.worker(
    'order_processor',                     -- 工作进程名称
    'http://localhost:8080/process',       -- 端点 URL
    100                                    -- 每秒请求数限制
);
-- 返回: 工作进程 UUID

创建队列

SELECT pgmb.create(
    'order_queue',                         -- 队列名称
    'order.*',                             -- 绑定键模式(支持 * 通配符)
    5,                                     -- 最大重试次数
    '550e8400-e29b-41d4-a716-446655440000' -- 工作进程 UUID
);
-- 返回: 队列 UUID

发送消息

-- 简单消息
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123, "amount": 45.67}'::jsonb
);

-- 带自定义头部
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123}'::jsonb,
    '{"source": "web", "priority": "high"}'::jsonb
);

-- 延迟消息(按时间戳或秒数)
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123}'::jsonb,
    '{}'::jsonb,
    now() + interval '10 minutes'
);

API 参考

函数描述
pgmb.worker(name, endpoint, rps)注册 HTTP 工作进程端点
pgmb.create(name, binding_key, max_retries, worker_id)创建带路由模式的队列
pgmb.send(id, routing_key, body)发送消息
pgmb.send(id, routing_key, body, headers)发送带头部的消息
pgmb.send(id, routing_key, body, headers, delay)发送延迟消息

工作原理

  1. 消息通过 pgmb.send() 插入到 pgmb.messages
  2. 触发器根据路由键模式将消息路由到匹配的队列
  3. pg_cron 每秒通过 HTTP POST 将消息分发到工作进程端点
  4. 失败的消息会被重试;超过最大重试次数后移入死信队列

监控

SELECT * FROM pgmb.workers;
SELECT * FROM pgmb.queues;
SELECT COUNT(*) FROM pgmb.order_queue WHERE acknoledge = false;
SELECT * FROM pgmb.order_dead_letter_queue;

最后修改 2026-03-14: update extension metadata (953cbd0)