【问题标题】:Flink SQL CSV Streaming ContinuouslyFlink SQL CSV 连续流式传输
【发布时间】:2021-06-20 21:37:53
【问题描述】:

我正在创建 2 个 flink sql 表,1 个用于 CSV 文件系统,另一个用于 kafka。目标是持续监控文件系统文件夹并将新的 csv 文件记录推送到 kafka 主题。但是我在下面编写的查询将 csv 文件记录推送一次,并且 flink 作业进入“完成”模式,并且没有处理文件夹中的任何新文件。有人可以告诉我如何使用源和 csv 文件系统创建 flink sql 连续流,并以 Kafka 为目标。

Flink SQL 创建源表

CREATE TABLE son_hsb_source_filesystem_csv_bulk(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'filesystem', --Don't Change this
    'path' = 'file:///opt/kafka-python-exec/files/' , -- Change file name alone
    'format' = 'csv', --Don't Change this
    'format.ignore-parse-errors' = 'true', --Don't Change this
    'csv.ignore-parse-errors' = 'true', --Don't Change this
    'csv.allow-comments' = 'true' --Don't Change this
);

Flink SQL 创建目标表


CREATE TABLE son_hsb_target_kafka_9092_filesystem_bulk_tests(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'kafka',  --Don't Change this
    'topic' = 'son_hsb_target_kafka_9092_fs_bulk_data_tests',  -- Add any topic name you want
    'scan.startup.mode' = 'earliest-offset',  --Don't Change this
    'properties.bootstrap.servers' = 'localhost:9092', --Don't Change this
    'format' = 'json',  --Don't Change this
    'json.fail-on-missing-field' = 'false', --Don't Change this
    'json.ignore-parse-errors' = 'true' --Don't Change this
);

Flink SQL 创建一个 Streaming Job # 这运行一次并进入 Finished 模式

INSERT INTO son_hsb_target_kafka_9092_filesystem_bulk_tests
SELECT file_name,start_time,oss_cell_id,enodeb,dl_payload,rrc_conn_den,rrc_conn_num,pm_array_1 FROM son_hsb_source_filesystem_csv_bulk

如何定义始终处于“运行”状态并查找新文件的流式处理作业。请提出建议。

【问题讨论】:

    标签: flink-streaming flink-sql


    【解决方案1】:

    文档表明此功能尚未针对流式文件系统源实现:

    用于流式传输的文件系统源仍在开发中。未来,社区将添加对常见流式处理用例的支持,即分区和目录监控。

    【讨论】:

    • 有没有办法通过后端实现它?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-01
    • 1970-01-01
    • 2023-03-28
    • 2016-10-10
    • 2022-01-16
    相关资源
    最近更新 更多