wal2json
用逻辑解码捕获 JSON 格式的 CDC 变更
概览
| 扩展包名 | 版本 | 分类 | 许可证 | 语言 |
|---|---|---|---|---|
wal2json | 2.6 | ETL | BSD 3-Clause | C |
| ID | 扩展名 | Bin | Lib | Load | Create | Trust | Reloc | 模式 |
|---|---|---|---|---|---|---|---|---|
| 9630 | wal2json | 否 | 是 | 否 | 否 | 否 | 否 | - |
| 相关扩展 | pglogical wal2mongo decoderbufs decoder_raw kafka_fdw pglogical_origin pglogical_ticker pg_failover_slots |
|---|
版本
| 类型 | 仓库 | 版本 | PG 大版本 | 包名 | 依赖 |
|---|---|---|---|---|---|
| EXT | PGDG | 2.6 | 1817161514 | wal2json | - |
| RPM | PGDG | 2.6 | 1817161514 | wal2json_$v | - |
| DEB | PGDG | 2.6 | 1817161514 | postgresql-$v-wal2json | - |
安装
您可以直接安装 wal2json 扩展包的预置二进制包,首先确保 PGDG 仓库已经添加并启用:
pig repo add pgdg -u # 添加 PGDG 仓库并更新缓存
使用 pig 或者是 apt/yum/dnf 安装扩展:
pig install wal2json; # 当前活跃 PG 版本安装
pig ext install -y wal2json -v 18 # PG 18
pig ext install -y wal2json -v 17 # PG 17
pig ext install -y wal2json -v 16 # PG 16
pig ext install -y wal2json -v 15 # PG 15
pig ext install -y wal2json -v 14 # PG 14
dnf install -y wal2json_18 # PG 18
dnf install -y wal2json_17 # PG 17
dnf install -y wal2json_16 # PG 16
dnf install -y wal2json_15 # PG 15
dnf install -y wal2json_14 # PG 14
apt install -y postgresql-18-wal2json # PG 18
apt install -y postgresql-17-wal2json # PG 17
apt install -y postgresql-16-wal2json # PG 16
apt install -y postgresql-15-wal2json # PG 15
apt install -y postgresql-14-wal2json # PG 14
此扩展不需要执行
CREATE EXTENSION语句
用法
一个逻辑解码输出插件,从 PostgreSQL WAL 生成 JSON 格式的变更数据捕获。
配置
在 postgresql.conf 中:
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
使用流协议(pg_recvlogical)
# 创建复制槽
pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
# 开始消费变更
pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -f -
# 完成后删除槽
pg_recvlogical -d postgres --slot test_slot --drop-slot
使用 SQL 函数
-- 创建逻辑复制槽
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
-- 查看变更(不消费)
SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL);
-- 获取并消费变更
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL,
'pretty-print', '1');
-- 删除槽
SELECT pg_drop_replication_slot('test_slot');
输出格式 v1(每事务 JSON)
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "my_table",
"columnnames": ["a", "b"],
"columntypes": ["integer", "text"],
"columnvalues": [1, "hello"]
},
{
"kind": "delete",
"schema": "public",
"table": "my_table",
"oldkeys": {
"keynames": ["a"],
"keytypes": ["integer"],
"keyvalues": [1]
}
}
]
}
输出格式 v2(每元组 JSON)
启用方式:'format-version', '2'
关键参数
include-xids- 添加事务 ID(默认:false)include-timestamp- 添加时间戳(默认:false)include-schemas- 添加模式名(默认:true)include-types- 添加列类型(默认:true)include-pk- 添加主键信息(默认:false)include-lsn- 添加 WAL LSN(默认:false)include-not-null- 添加 NOT NULL 信息(默认:false)include-default- 添加默认表达式(默认:false)pretty-print- 格式化 JSON 输出(默认:false)filter-tables- 逗号分隔的要包含的表列表add-tables- 与 filter-tables 相同filter-msg-prefixes- 按前缀过滤逻辑消息format-version- 1(每事务)或 2(每元组)actions- 按操作类型过滤:insert、update、delete、truncate