kafka_fdw
Kafka外部数据源包装器
概览
| 扩展包名 | 版本 | 分类 | 许可证 | 语言 |
|---|---|---|---|---|
kafka_fdw | 0.0.3 | FDW | PostgreSQL | C |
| ID | 扩展名 | Bin | Lib | Load | Create | Trust | Reloc | 模式 |
|---|---|---|---|---|---|---|---|---|
| 8730 | kafka_fdw | 否 | 是 | 否 | 是 | 否 | 是 | - |
| 相关扩展 | pgmq mongo_fdw redis_fdw wrappers multicorn redis hdfs_fdw wal2json |
|---|
版本
| 类型 | 仓库 | 版本 | PG 大版本 | 包名 | 依赖 |
|---|---|---|---|---|---|
| EXT | PIGSTY | 0.0.3 | 1817161514 | kafka_fdw | - |
| RPM | PIGSTY | 0.0.3 | 1817161514 | kafka_fdw_$v | - |
| DEB | PIGSTY | 0.0.3 | 1817161514 | postgresql-$v-kafka-fdw | - |
构建
您可以使用 pig build 命令构建 kafka_fdw 扩展的 RPM / DEB 包:
pig build pkg kafka_fdw # 构建 RPM / DEB 包
安装
您可以直接安装 kafka_fdw 扩展包的预置二进制包,首先确保 PGDG 和 PIGSTY 仓库已经添加并启用:
pig repo add pgsql -u # 添加仓库并更新缓存
使用 pig 或者是 apt/yum/dnf 安装扩展:
pig install kafka_fdw; # 当前活跃 PG 版本安装
pig ext install -y kafka_fdw -v 18 # PG 18
pig ext install -y kafka_fdw -v 17 # PG 17
pig ext install -y kafka_fdw -v 16 # PG 16
pig ext install -y kafka_fdw -v 15 # PG 15
pig ext install -y kafka_fdw -v 14 # PG 14
dnf install -y kafka_fdw_18 # PG 18
dnf install -y kafka_fdw_17 # PG 17
dnf install -y kafka_fdw_16 # PG 16
dnf install -y kafka_fdw_15 # PG 15
dnf install -y kafka_fdw_14 # PG 14
apt install -y postgresql-18-kafka-fdw # PG 18
apt install -y postgresql-17-kafka-fdw # PG 17
apt install -y postgresql-16-kafka-fdw # PG 16
apt install -y postgresql-15-kafka-fdw # PG 15
apt install -y postgresql-14-kafka-fdw # PG 14
创建扩展:
CREATE EXTENSION kafka_fdw;
用法
创建服务器
CREATE EXTENSION kafka_fdw;
CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');
服务器选项: brokers(必填,逗号分隔的 Kafka broker 端点)。
创建用户映射
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
创建外部表(CSV 格式)
CREATE FOREIGN TABLE kafka_csv (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
)
SERVER kafka_server
OPTIONS (format 'csv', topic 'my_topic', batch_size '30', buffer_delay '100');
需要两个元数据列:一个带 partition 'true',一个带 offset 'true'。其余列与消息格式匹配。
表选项: format(csv 或 json)、topic(Kafka 主题名)、batch_size、buffer_delay(毫秒)、strict(强制严格模式验证)、ignore_junk(将格式错误的列设为 NULL)。
创建外部表(JSON 格式)
CREATE FOREIGN TABLE kafka_json (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int OPTIONS (json 'int_val'),
some_text text OPTIONS (json 'text_val')
)
SERVER kafka_server
OPTIONS (format 'json', topic 'my_json_topic', batch_size '30', buffer_delay '100');
使用 json 列选项将列名映射到 JSON 键。
消费消息
-- 从特定分区和偏移量读取
SELECT * FROM kafka_csv WHERE part = 0 AND offs > 1000 LIMIT 60;
-- 从多个分区读取
SELECT * FROM kafka_csv
WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300);
注意:offset 在 SQL 中是保留关键字;在某些上下文中引用偏移量列时使用双引号。
生产消息
-- 指定分区插入
INSERT INTO kafka_csv (part, some_int, some_text)
VALUES (0, 42, 'hello from partition 0');
-- 自动分区选择插入
INSERT INTO kafka_csv (some_int, some_text)
VALUES (42, 'auto-partitioned message');