【问题标题】:Flink Source kafka Join with CDC source to kafka sinkFlink Source kafka Join CDC source to kafka sink
【发布时间】:2021-07-14 10:35:29
【问题描述】:

我们正在尝试从 DB-cdc 连接器(upsert 行为)表加入。 使用“kafka”事件源,通过现有的 cdc 数据键丰富这些事件。 kafka-source(id, B, C) + cdc(id, D, E, F) = result(id, B, C, D, E, F) 放入一个kafka sink(追加)

INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
JOIN mongodb_source ON source.device_id = mongodb_source._id

问题,这只有在我们的 kafka sink 是 'upsert-kafka' 时才有效。 但这在数据库中删除时创建了墓碑。 我们只需要表现得像普通事件,而不是变更日志。 但我们不能只使用“kafka”接收器,因为 db 连接器是 upsert 所以不兼容......

这样做的方法是什么?将 upsert 转换为仅附加事件?

s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)
    # use blink table planner
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

   ddl = """CREATE TABLE sink (
            `zapatos` INT,
            `naranjas` STRING,
            `account_id` STRING,
            `user_id` STRING,
            `device_id` STRING,
            `time28` INT,
            PRIMARY KEY (device_id) NOT ENFORCED
        ) WITH (
            'connector' = 'upsert-kafka',
            'topic' = 'as-test-output-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """CREATE TABLE source (
            `device_id` STRING,
            `timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
            `event_type` STRING,
            `payload` ROW<`zapatos` INT, `naranjas` STRING, `time28` INT, `device_id` STRING>,
            `trace_id` STRING
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'as-test-input-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'key.fields' = 'device_id',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """
    CREATE TABLE mongodb_source (
        `_id` STRING PRIMARY KEY, 
        `account_id` STRING,
        `user_id` STRING,
        `device_id` STRING
    ) WITH (
        'connector' = 'mongodb-cdc',
        'uri' = '******',
        'database' = '****',
        'collection' = 'testflink'
    )
    """
    st_env.sql_update(ddl)


    st_env.sql_update("""
        INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
        SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
        JOIN mongodb_source ON source.device_id = mongodb_source._id
     """)

# execute
    st_env.execute("kafka_to_kafka")

不要介意 Mongo-cdc 连接器,它是新的,但可以用作 mysql-cdc 或 postgre-cdc。

感谢您的帮助!

【问题讨论】:

    标签: apache-flink flink-streaming flink-sql pyflink


    【解决方案1】:

    您是否尝试过使用 LEFT JOIN 而不是 JOIN? 如果您的目的只是丰富卡夫卡事件(如果有来自 mongo 的事件),那么它不应该创建墓碑......

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-08-31
      • 2022-10-04
      • 1970-01-01
      • 2017-03-27
      • 2021-11-30
      • 2019-08-01
      • 2021-11-27
      • 2022-10-04
      相关资源
      最近更新 更多