kafka_fdw

Kafka外部数据源包装器

概览

扩展包名版本分类许可证语言
kafka_fdw0.0.3FDWPostgreSQLC
ID扩展名BinLibLoadCreateTrustReloc模式
8730kafka_fdw-
相关扩展pgmq mongo_fdw redis_fdw wrappers multicorn redis hdfs_fdw wal2json

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY0.0.31817161514kafka_fdw-
RPMPIGSTY0.0.31817161514kafka_fdw_$v-
DEBPIGSTY0.0.31817161514postgresql-$v-kafka-fdw-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
d13.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3

构建

您可以使用 pig build 命令构建 kafka_fdw 扩展的 RPM / DEB 包:

pig build pkg kafka_fdw         # 构建 RPM / DEB 包

安装

您可以直接安装 kafka_fdw 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install kafka_fdw;          # 当前活跃 PG 版本安装
pig ext install -y kafka_fdw -v 18  # PG 18
pig ext install -y kafka_fdw -v 17  # PG 17
pig ext install -y kafka_fdw -v 16  # PG 16
pig ext install -y kafka_fdw -v 15  # PG 15
pig ext install -y kafka_fdw -v 14  # PG 14
dnf install -y kafka_fdw_18       # PG 18
dnf install -y kafka_fdw_17       # PG 17
dnf install -y kafka_fdw_16       # PG 16
dnf install -y kafka_fdw_15       # PG 15
dnf install -y kafka_fdw_14       # PG 14
apt install -y postgresql-18-kafka-fdw   # PG 18
apt install -y postgresql-17-kafka-fdw   # PG 17
apt install -y postgresql-16-kafka-fdw   # PG 16
apt install -y postgresql-15-kafka-fdw   # PG 15
apt install -y postgresql-14-kafka-fdw   # PG 14

创建扩展

CREATE EXTENSION kafka_fdw;

用法

kafka_fdw: CSV 格式消息的 Kafka 外部数据包装器

创建服务器

CREATE EXTENSION kafka_fdw;

CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw
  OPTIONS (brokers 'localhost:9092');

服务器选项: brokers(必填,逗号分隔的 Kafka broker 端点)。

创建用户映射

CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;

创建外部表(CSV 格式)

CREATE FOREIGN TABLE kafka_csv (
  part int OPTIONS (partition 'true'),
  offs bigint OPTIONS (offset 'true'),
  some_int int,
  some_text text,
  some_date date,
  some_time timestamp
)
SERVER kafka_server
OPTIONS (format 'csv', topic 'my_topic', batch_size '30', buffer_delay '100');

需要两个元数据列:一个带 partition 'true',一个带 offset 'true'。其余列与消息格式匹配。

表选项: formatcsvjson)、topic(Kafka 主题名)、batch_sizebuffer_delay(毫秒)、strict(强制严格模式验证)、ignore_junk(将格式错误的列设为 NULL)。

创建外部表(JSON 格式)

CREATE FOREIGN TABLE kafka_json (
  part int OPTIONS (partition 'true'),
  offs bigint OPTIONS (offset 'true'),
  some_int int OPTIONS (json 'int_val'),
  some_text text OPTIONS (json 'text_val')
)
SERVER kafka_server
OPTIONS (format 'json', topic 'my_json_topic', batch_size '30', buffer_delay '100');

使用 json 列选项将列名映射到 JSON 键。

消费消息

-- 从特定分区和偏移量读取
SELECT * FROM kafka_csv WHERE part = 0 AND offs > 1000 LIMIT 60;

-- 从多个分区读取
SELECT * FROM kafka_csv
WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300);

注意:offset 在 SQL 中是保留关键字;在某些上下文中引用偏移量列时使用双引号。

生产消息

-- 指定分区插入
INSERT INTO kafka_csv (part, some_int, some_text)
  VALUES (0, 42, 'hello from partition 0');

-- 自动分区选择插入
INSERT INTO kafka_csv (some_int, some_text)
  VALUES (42, 'auto-partitioned message');

最后修改 2026-03-14: update extension metadata (953cbd0)