【问题标题】:Kafka - Zookeeper - ACL configurationKafka - Zookeeper - ACL 配置
【发布时间】:2017-03-28 07:33:28
【问题描述】:

上下文

我正在尝试基于 kafka 设置分布式日志记录系统(我知道存在诸如 logstash 之类的东西......)但我希望能够在之后放置一些风暴拓扑,例如,在发送通知时发送通知流量越来越慢。

设置

我在端口 8082 上有一个正在运行的服务器(wilfly swarm,keycloack 身份验证),它托管我的日志功能。我可以通过 REST 将日志线推送到此服务器。在幕后,一个 kafka 生产者正在运行并将消息传播到 kafka。

  • 我在 2181 端口有 zookeeper
  • 我有一个代理在 9092 端口运行
  • 我的日志服务器在 8082 端口运行

我的 server.properties(用于代理):

listeners=PLAINTEXT://localhost:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:Bob;User:Alice;User:anonymous

我的acl配置:

call kafka\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
call kafka\bin\windows\kafka-acls.bat --add --allow-principal User:anonymous --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181  --allow-host http://localhost:8082 --operation Read --operation Write --topic testtopic
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --consumer --topic testtopic --group group --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --producer --topic testtopic --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --producer --topic testtopic --allow-host 192.168.3.63

我的 (java) 生产者属性:

    @Produces
    private Producer<String, String> stringStringProducer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<>(props);
            return producer;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

问题

当我尝试通过 Java 生产者(和控制台生产者)生成消息时,我得到:

[org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | producer-6) Error while fetching metadata with correlation id 10 : {testtopic=UNKNOWN_TOPIC_OR_PARTITION}

有谁知道我做错了什么?

第一个解决方案

我通过授予对 127.0.0.1 的访问权限来克服此错误消息:

call kafka\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

call kafka\bin\windows\kafka-acls.bat --add --allow-principal User:anonymous --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181  --allow-host http://localhost:8082 --operation Read --operation Write --topic testtopic
call kafka\bin\windows\kafka-acls.bat --add --allow-principal User:ANONYMOUS --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181  --allow-host 127.0.0.1 --operation Read --operation Write --topic testtopic

call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --consumer --topic testtopic --group group --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ANONYMOUS --consumer --topic testtopic --group group --allow-host 127.0.0.1

call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --producer --topic testtopic --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ANONYMOUS --producer --topic testtopic --allow-host 127.0.0.1

我通过查看日志文件发现了问题(即转到 kafka 文件夹中的 log4j.properties 并将 log4j.logger.kafka.authorizer.logger 属性更改为 DEBUG。然后你会得到具体的错误(即丢失权限)。

新问题

当我想产生一条消息时,我现在得到:

[2017-03-28 15:39:07,704] WARN Error while fetching metadata with correlation id 0 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-03-28 15:39:07,800] WARN Error while fetching metadata with correlation id 1 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-03-28 15:39:07,912] WARN Error while fetching metadata with correlation id 2 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-03-28 15:39:08,024] WARN Error while fetching metadata with correlation id 3 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

有人知道如何解决这个问题吗?

已解决

我在代理配置 (server.properties) 中向超级用户添加了“匿名”:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:Bob;User:Alice;User:ANONYMOUS
#port = 9092
#advertised.host.name = localhost
#listeners=SASL_SSL://localhost:9092
#security.inter.broker.protocol=SASL_SSL
#sasl.mechanism.inter.broker.protocol=PLAIN
#sasl.enabled.mechanisms=PLAIN
host.name=127.0.0.1
advertised.host.name=localhost
advertised.port=9092

【问题讨论】:

    标签: apache-kafka apache-zookeeper


    【解决方案1】:

    出现问题是因为您在以下行中启用了授权:

    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    

    但是由于以下行,代理使用User:ANONYMOUS 运行:

    listeners=PLAINTEXT://localhost:9092
    

    也就是说,代理无法对自己进行身份验证。就我而言(SSL 身份验证),我必须执行以下操作:

    1. 使用 security.inter.broker.protocol=SSL 启用代理间安全性。
    2. 通过设置listeners=SSL://broker1:9092禁用代理的PLAINTEXT端口(注意没有PLAINTEXT://broker1:9091
    3. 使用 kafka-acls.sh 为我的 SSL 证书中定义的用户定义 ACL。
    4. 重启代理。

    P。 S. 不鼓励您回答中的解决方法。你可以阅读它的含义here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-08-13
      • 2016-11-29
      • 2019-02-19
      • 1970-01-01
      • 2018-03-05
      • 2017-03-24
      • 2019-11-18
      • 2020-08-27
      相关资源
      最近更新 更多