pgmq

基于Postgres实现类似AWS SQS/RSMQ的消息队列

概览

扩展包名版本分类许可证语言
pgmq1.11.0FEATPostgreSQLSQL
ID扩展名BinLibLoadCreateTrustReloc模式
2880pgmqpgmq
相关扩展kafka_fdw pg_cron pg_task pg_net pg_background pgagent pg_jobmon
下游依赖pg_later vectorize

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY1.11.01817161514pgmq-
RPMPIGSTY1.11.01817161514pgmq_$v-
DEBPIGSTY1.11.01817161514postgresql-$v-pgmq-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
d13.aarch64
u22.x86_64
u22.aarch64
u24.x86_64
u24.aarch64

构建

您可以使用 pig build 命令构建 pgmq 扩展的 RPM / DEB 包:

pig build pkg pgmq         # 构建 RPM / DEB 包

安装

您可以直接安装 pgmq 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install pgmq;          # 当前活跃 PG 版本安装
pig ext install -y pgmq -v 18  # PG 18
pig ext install -y pgmq -v 17  # PG 17
pig ext install -y pgmq -v 16  # PG 16
pig ext install -y pgmq -v 15  # PG 15
pig ext install -y pgmq -v 14  # PG 14
dnf install -y pgmq_18       # PG 18
dnf install -y pgmq_17       # PG 17
dnf install -y pgmq_16       # PG 16
dnf install -y pgmq_15       # PG 15
dnf install -y pgmq_14       # PG 14
apt install -y postgresql-18-pgmq   # PG 18
apt install -y postgresql-17-pgmq   # PG 17
apt install -y postgresql-16-pgmq   # PG 16
apt install -y postgresql-15-pgmq   # PG 15
apt install -y postgresql-14-pgmq   # PG 14

创建扩展

CREATE EXTENSION pgmq;

用法

pgmq: PostgreSQL 轻量级消息队列

PGMQ 是基于 PostgreSQL 构建的轻量级消息队列,提供可见性超时内的"恰好一次"投递保证、FIFO 队列、基于主题的路由和消息归档。

CREATE EXTENSION pgmq;

创建队列

SELECT pgmq.create('my_queue');

发送消息

-- 发送单条消息(返回 msg_id)
SELECT * FROM pgmq.send(
  queue_name => 'my_queue',
  msg        => '{"foo": "bar"}'
);

-- 延迟发送(5 秒内不可见)
SELECT * FROM pgmq.send(
  queue_name => 'my_queue',
  msg        => '{"foo": "bar"}',
  delay      => 5
);

-- 批量发送消息
SELECT pgmq.send_batch(
  queue_name => 'my_queue',
  msgs       => ARRAY['{"a":1}','{"b":2}','{"c":3}']::jsonb[]
);

读取消息

读取消息并在可见性超时期间(以秒为单位)使其不可见:

SELECT * FROM pgmq.read(
  queue_name => 'my_queue',
  vt         => 30,    -- 可见性超时(秒)
  qty        => 2      -- 读取消息数量
);

弹出消息

读取并立即删除一条消息:

SELECT * FROM pgmq.pop('my_queue');

删除消息

SELECT pgmq.delete('my_queue', 6);

归档消息

将消息从队列移动到归档表以便长期保留:

SELECT pgmq.archive(queue_name => 'my_queue', msg_id => 2);

-- 批量归档
SELECT pgmq.archive(queue_name => 'my_queue', msg_ids => ARRAY[3, 4, 5]);

查看已归档的消息:

SELECT * FROM pgmq.a_my_queue;

删除队列

SELECT pgmq.drop_queue('my_queue');

可见性超时

消息在被读取后,在可见性超时(vt)期间变为不可见。如果在此时间内未被删除或归档,它们将重新变为可见以供其他消费者处理。请将 vt 设置为大于预期处理时间。

主要函数

函数描述
pgmq.create(queue_name)创建新队列
pgmq.send(queue_name, msg, [delay])发送消息
pgmq.send_batch(queue_name, msgs)批量发送消息
pgmq.read(queue_name, vt, qty)读取消息并设置可见性超时
pgmq.pop(queue_name)原子性地读取并删除消息
pgmq.delete(queue_name, msg_id)删除消息
pgmq.archive(queue_name, msg_id/msg_ids)归档消息
pgmq.drop_queue(queue_name)删除队列

最后修改 2026-03-14: update extension metadata (953cbd0)