【问题标题】:How to create Kafka-python producer with ssl configuration如何使用 ssl 配置创建 Kafka-python 生产者
【发布时间】:2018-08-22 03:33:41
【问题描述】:

我正在尝试使用 ssl 创建 kafka 生产者。我需要有关如何在构造函数中设置 SSL 参数的信息,kafka-python 客户端中提供的信息描述性不够。

ssl_certfilessl_cafilessl_keyfile 参数是什么。我不确定在哪里可以找到这些文件。

producer = KafkaProducer(bootstrap_servers=kafka_broker,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  security_protocol='SSL',
  api_version=(0,10),
  ssl_cafile='ca-certs.pem',ssl_certfile='server.pem',
  ssl_keyfile='server.pem',ssl_password='xxx')
producer.send('rk976772_topic',{"test":0})

Traceback(最近一次调用最后一次):文件“”,第 1 行,in 文件 “/usr/lib/python2.7/site-packages/kafka/producer/kafka.py”,第 543 行, 在发送 self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) 文件“/usr/lib/python2.7/site-packages/kafka/producer/kafka.py”,第 664 行, 在 _wait_on_metadata “在 %.1f 秒后更新元数据失败。” % max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: 更新失败 60.0 秒后的元数据。

【问题讨论】:

  • 这可能是您提供错误的主题名称的问题。

标签: kafka-python


【解决方案1】:

我在尝试使用 SSL 或 SASL_SSL 配置 kafka 时遇到了这个问题以及许多其他问题。我在这里发布一个完整的教程,以防其他人遇到同样的问题。我在 CentOS 6 上使用 kafka-python 1.4.6 和 kafka 2.2.0。

以下是使用 kafka-python 客户端为 SASL_SSL 工作的配置。这些配置可用于 PLAINTEXT 和 SSL 安全协议以及 SASL_SSL 和 SASL_PLAINTEXT。

用于生成密钥文件、CARoot 和自签名证书以用于 SSL 的 Bash 脚本:

#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:admin123
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

然后您可以使用以下命令来提取 CARoot.pem:

keytool -exportcert -alias CARoot -keystore server.keystore.jks -rfc -file CARoot.pem

在我的 server.properties 文件中,我有:

listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
security.protocol=SSL
sasl.enabled.mechanisms=PLAIN
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=admin123
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=admin123
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
advertised.listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094

在我的 JAAS 配置文件(/etc/kafka/kafka_plain_jaas.conf)中:

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
   username=kafka
   password=kafka-secret
   user_username=password;
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
  username=username
  password=password;
};

在启动 Kafka 服务器之前,需要运行以下命令:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_plain_jaas.conf"

Python 消费者和生产者: ssl_context 和 api_version 是导致我发生 SSL 握手错误的原因,从而导致超时。所以我评论了那些。 (有一些教程提到使用这些。)

from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
logging.basicConfig(level=logging.DEBUG)

try:
    topic = "sendMessage"
    sasl_mechanism = "PLAIN"
    username = "username"
    password = "password"
    security_protocol = "SASL_SSL"

    #context = ssl.create_default_context()
    #context.options &= ssl.OP_NO_TLSv1
    #context.options &= ssl.OP_NO_TLSv1_1

    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9094',
                              #api_version=(0, 10),
                              security_protocol=security_protocol,
                              #ssl_context=context,
                              ssl_check_hostname=True,
                              ssl_cafile='../keys/CARoot.pem',
                              sasl_mechanism = sasl_mechanism,
                              sasl_plain_username = username,
                              sasl_plain_password = password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#,api_version = (0, 10))

    producer = KafkaProducer(bootstrap_servers='localhost:9094',
                             #api_version=(0, 10),
                             security_protocol=security_protocol,
                             #ssl_context=context,
                             ssl_check_hostname=True,
                             ssl_cafile='../keys/CARoot.pem',
                             sasl_mechanism=sasl_mechanism,
                             sasl_plain_username=username,
                             sasl_plain_password=password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#, api_version = (0,10))
    # Write hello world to test topic
    producer.send(topic, bytes("Hello World SSL"))
    producer.flush()


    for msg in consumer:
        print(msg)


except Exception as e:
    print e

【讨论】:

    【解决方案2】:

    我必须通过 SASL_SSL 发布消息 在下面的代码中使用 SASL_SSL 协议创建生产者。

    from kafka import KafkaProducer
    
    security_protocol=environment_params.kafka_security_protocol
    if env=='dev':
        if security_protocol=='SASL_SSL':
            producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'),security_protocol=security_protocol,ssl_cafile='ca-certs.pem',sasl_mechanism='GSSAPI',api_version=environment_params.dev_kafka_api_version)
        elif security_protocol=='PLAINTEXT':
            producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    

    【讨论】:

    • sasl_mechanism='GSSAPI' 仅在 kafka 配置中未设置 sasl.enabled.mechanisms=PLAIN 时才需要。我不小心将SASL_ENABLED_MECHANISMS 而不是KAFKA_SASL_ENABLED_MECHANISMS 设置为docker 容器环境变量,因此sasl.enabled.mechanisms 保持默认为GSSAPI
    【解决方案3】:

    伙计们,请仔细观看并按照说明进行操作...

    第 1 步:运行所有脚本(如有必要,设置值)

    keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey
    
    openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
    
    keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
    
    keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
    
    keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
    
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
    
    keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
    
    keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
    
    keytool -exportcert -alias CARoot -keystore kafka.server.keystore.jks -rfc -file CARoot.pem
    

    结果,你会得到:

    kafka.server.keystore.jks, kafka.server.truststore.jks, kafka.client.truststore.jks, ca-cert, ca-cert.srl, ca-key, cert-file, cert-signed, CARoot.pem
    

    第二步:拷贝kafka.server.keystore.jks和kafka.server.truststore.jks到服务器,修改server.properties文件(在config文件夹下)

    listeners=PLAINTEXT://MYSERVER:9092,SSL://MYSERVER:9093
    advertised.listeners=PLAINTEXT://MYSERVER:9092,SSL://MYSERVER:9093
    ssl.keystore.location=../store/kafka.server.keystore.jks
    ssl.keystore.password=qwerty
    ssl.truststore.location=../store/kafka.server.truststore.jks
    ssl.truststore.password=qwerty
    ssl.client.auth=required
    ssl.endpoint.identification.algorithm=
    

    第 3 步:创建 python 程序

    def kafka_consumer_ssl():
    consumer = KafkaConsumer('test_topic',
                             bootstrap_servers=['MYSERVER:9093'],
                             auto_offset_reset='earliest',
                             enable_auto_commit=True,
                             value_deserializer=lambda x: x.decode('utf-8'),
                             security_protocol='SSL',
                             ssl_check_hostname=False,
                             ssl_cafile='CARoot.pem',
                             ssl_certfile='ca-cert',
                             ssl_keyfile='ca-key',
                             ssl_password='qwerty'
                             )
    
    for event in consumer:
        print(event.value)
    
    kafka_consumer_ssl()
    

    第 4 步:享受!!!

    【讨论】:

      【解决方案4】:

      非常感谢。我有 jks 文件,而我的 kafka-producer 不断给出错误 SSL 认证验证错误 897 虽然转换了 CARoot.pem 文件,但它不起作用。

      有什么帮助是我使用以下命令进行了转换并在生产者上使用并且它起作用了。

      kafka.server.keystore.jks, 
      kafka.server.truststore.jks, 
      kafka.client.truststore.jks, 
      ca-cert, 
      ca-cert.srl, 
      ca-key, cert-file, 
      cert-signed, 
      CARoot.pem
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-07-22
        • 2020-04-26
        • 2018-07-15
        • 2020-05-28
        • 1970-01-01
        • 2021-09-03
        • 2023-01-09
        • 2019-11-30
        相关资源
        最近更新 更多