这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

模块:Kafka

使用 Pigsty 拉起 Kafka Kraft 集群,一个开源的分布式流处理平台。

    Kafka 是一个开源的分布式流处理平台:安装 | 配置 | 管理 | 剧本 | 监控 | 参数 | 资源


    概览

    Kafka 模块本身目前仅在 Pigsty 专业版中提供 Beta 试用预览。


    安装

    如果您使用开源版 Pigsty,可以使用以下命令,在指定节点上安装 Kafka 及其 Java 依赖。

    Pigsty 在官方 Infra 仓库中提供了 Kafka 3.8.0 的 RPM 与 DEB 安装包,如果需要使用,可以直接下载安装。

    ./node.yml -t node_install  -e '{"node_repo_modules":"infra","node_packages":["kafka"]}'
    

    Kafka 依赖 Java 运行环境,因此在安装 Kafka 时,需要安装可用的 JDK (默认使用 OpenJDK 17,但其他 JDK 与版本,例如 8,11 都可以使用)

    # EL7 (没有 JDK 17 支持)
    ./node.yml -t node_install  -e '{"node_repo_modules":"node","node_packages":["java-11-openjdk-headless"]}'
    
    # EL8 / EL9 (使用 OpenJDK 17 )
    ./node.yml -t node_install  -e '{"node_repo_modules":"node","node_packages":["java-17-openjdk-headless"]}'
    
    # Debian / Ubuntu (使用 OpenJDK 17)
    ./node.yml -t node_install  -e '{"node_repo_modules":"node","node_packages":["openjdk-17-jdk"]}'
    

    配置

    单节点 Kafka 配置样例,请注意,在 Pigsty 单机部署模式下,管理节点上的 9093 端口默认已经被 AlertManager 占用。

    建议在管理节点上安装 Kafka 时,Peer Poort 使用其他端口,例如(9095)。

    kf-main:
      hosts:
        10.10.10.10: { kafka_seq: 1, kafka_role: controller }
      vars:
        kafka_cluster: kf-main
        kafka_data: /data/kafka
        kafka_peer_port: 9095     # 9093 is already hold by alertmanager
    

    三节点 Kraft 模式 Kafka 集群配置样例:

    kf-test:
      hosts:
        10.10.10.11: { kafka_seq: 1, kafka_role: controller   }
        10.10.10.12: { kafka_seq: 2, kafka_role: controller   }
        10.10.10.13: { kafka_seq: 3, kafka_role: controller   }
      vars:
        kafka_cluster: kf-test
    

    管理

    以下是基本的 Kafka 集群基本管理操作:

    使用 kafka.yml 创建 Kafka 集群:

    ./kafka.yml -l kf-main
    ./kafka.yml -l kf-test
    

    创建一个名为 test 的 Topic:

    kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
    

    这里 --replication-factor 1 表示每个数据只会复制一次,--partitions 1 表示只创建一个分区。

    使用以下命令,查看 Kafka 中的 Topic 列表:

    kafka-topics.sh --bootstrap-server localhost:9092 --list
    

    使用 Kafka 自带的消息生产者,向 test Topic 发送消息:

    kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
    >haha
    >xixi
    >hoho
    >hello
    >world
    > ^D
    

    使用 Kafka 自带的消费者,从 test Topic 中读取消息:

    kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
    

    剧本

    Pigsty 提供了两个与 KAFKA 模块相关的剧本,分别用于纳管与移除节点。

    • node.yml:纳管节点,并调整节点到期望的状态
    • node-rm.yml:从 pigsty 中移除纳管节点

    此外, Pigsty 还提供了两个包装命令工具:node-addnode-rm,用于快速调用剧本。


    kafka.yml

    用于部署 Kafka KRaft 模式集群的 kafka.yml 剧本包含以下子任务:

    kafka-id       : generate kafka instance identity
    kafka_clean    : remove existing kafka instance (DANGEROUS)
    kafka_user     : create os user kafka
    kafka_pkg      : install kafka rpm/deb packages
    kafka_link     : create symlink to /usr/kafka
    kafka_path     : add kafka bin path to /etc/profile.d
    kafka_svc      : install kafka systemd service
    kafka_dir      : create kafka data & conf dir
    kafka_config   : generate kafka config file
    kafka_boot     : bootstrap kafka cluster
    kafka_launch   : launch kafka service
    kafka_exporter : launch kafka exporter
    kafka_register : register kafka service to prometheus
    

    监控

    Pigsty 提供了两个与 KAFKA 模块有关的监控面板:

    KAFKA Overview 展示了 Kafka 集群的整体监控指标。

    KAFKA Instance 展示了单个 Kafka 实例的监控指标详情


    参数

    Kafka 的可用配置项:

    #kafka_cluster:           #CLUSTER  # kafka cluster name, required identity parameter
    #kafka_role: controller   #INSTANCE # kafka role, controller, broker, or controller-only
    #kafka_seq: 0             #INSTANCE # kafka instance seq number, required identity parameter
    kafka_clean: false                  # cleanup kafka during init? false by default
    kafka_data: /data/kafka             # kafka data directory, `/data/kafka` by default
    kafka_version: 3.8.0                # kafka version string
    scala_version: 2.13                 # kafka binary scala version
    kafka_port: 9092                    # kafka broker listen port
    kafka_peer_port: 9093               # kafka broker peer listen port, 9093 by default (conflict with alertmanager)
    kafka_exporter_port: 9308           # kafka exporter listen port, 9308 by default
    kafka_parameters:                   # kafka parameters to be added to server.properties
      num.network.threads: 3
      num.io.threads: 8
      socket.send.buffer.bytes: 102400
      socket.receive.buffer.bytes: 102400
      socket.request.max.bytes: 104857600
      num.partitions: 1
      num.recovery.threads.per.data.dir: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.retention.hours: 168
      log.segment.bytes: 1073741824
      log.retention.check.interval.ms: 300000
      #log.retention.bytes: 1073741824
      #log.flush.interval.ms: 1000
      #log.flush.interval.messages: 10000
    

    资源

    Pigsty 为 PostgreSQL 提供了一些 Kafka 相关的扩展插件:

    • kafka_fdw,一个有趣的 FDW,允许用户直接从 PostgreSQL 中读写 Kafka Topic 数据
    • wal2json,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 JSON 格式的变更数据
    • wal2mongo,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 BSON 格式的变更数据
    • decoder_raw,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 SQL 格式的变更数据
    • test_decoding,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 RAW 格式的变更数据