【问题标题】:Pyflink 1.14 table connectors - Kafka authenticationPyflink 1.14 表连接器 - Kafka 身份验证
【发布时间】:2021-12-24 20:17:23
【问题描述】:

我只见过 kafka 连接的 Pyflink table API 示例,其中在连接建立 (doc ref) 中包含身份验证详细信息,即源表连接:

source_ddl = """
        CREATE TABLE source_table(
            a VARCHAR,
            b INT
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'source_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'properties.group.id' = 'test_3',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """

但是,我需要连接到启用身份验证的 kafka 源。通过“解释”所有 property.XXX 都专用于 kafka 配置,我将示例更改如下并进行了测试:

import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, environment_settings
from pyflink.table.table_environment import StreamTableEnvironment

KAFKA_SERVERS = 'localhost:9092'
KAFKA_USERNAME = "user"
KAFKA_PASSWORD = "XXX"
KAFKA_SOURCE_TOPIC = 'source'
KAFKA_SINK_TOPIC = 'dest'


def log_processing():

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
    env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.11-1.14.0.jar")
    env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
    settings = EnvironmentSettings.new_instance()\
                      .in_streaming_mode()\
                      .use_blink_planner()\
                      .build()

    t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
    
    source_ddl = f"""
            CREATE TABLE source_table(
                Cylinders INT,
                Displacement INT,
                Horsepower INT,
                Weight INT,
                Acceleration INT,
                Model_Year INT,
                USA INT,
                Europe INT,
                Japan INT
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE_TOPIC}',
              'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
              'properties.group.id' = 'testgroup12',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    sink_ddl = f"""
            CREATE TABLE sink_table(
                Cylinders INT,
                Displacement INT,
                Horsepower INT,
                Weight INT,
                Acceleration INT,
                Model_Year INT,
                USA INT,
                Europe INT,
                Japan INT
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SINK_TOPIC}',
              'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
              'properties.group.id' = 'testgroup12',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    t_env.sql_query("SELECT * FROM source_table").execute_insert("sink_table").wait()

    t_env.execute("kafka-table")


if __name__ == '__main__':
    log_processing()

通过从 cli 添加此作业,没有响应或指示使用相应作业 ID 实例化作业:

查看 flink UI 时分别没有创建作业

如果我错误地配置了连接,请有人纠正我,或者指出一个相关的文档来源(我已经用谷歌搜索了很多......)

【问题讨论】:

  • 你指定了两个不兼容的 jar 文件——你有 flink-connector-kafka 用 scala 2.11 编译,flink-sql-connector-kafka 用 scala 2.12 编译。不确定它是否会有所帮助,但请尝试修复它。
  • 另外,请查看作业管理器日志以获取线索。
  • 嗨@DavidAnderson,非常感谢你的收获! (尴尬地叹了口气)我整天都在 2.11 和 2.12 之间进行测试,并认为 flink-sql-connector 是唯一真正的依赖项....所以在所有测试之间,一定是迷路了专注于此,最终感到沮丧。将 flink-connector-kafka 更改为 2.12 确实解决了这个问题。关于 (1) 所需的 scala 版本和 (2) 所需的 jar 依赖项在哪里可以验证的任何建议?然后我很乐意将所有内容写下来作为答案,以供其他人参考

标签: apache-kafka apache-flink flink-streaming flink-sql pyflink


【解决方案1】:

按照@DavidAnderson 的建议找到了问题。我的问题中的代码按原样工作......只需要分别更新依赖项jar。如果使用 Scala 2.12 和 flink 版本 1.14,以下依赖项适用(jar 依赖项已下载并在相应目录中的 jobManager 上可用):

env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")

env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.12-1.14.0.jar")

env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")

一个有用的参考网站,后来我发现是https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.14.0

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-24
    相关资源
    最近更新 更多