【问题标题】:Questions for reading data from JDBC source in DataStream FlinkDataStream Flink中从JDBC源读取数据的问题
【发布时间】:2021-08-10 12:37:52
【问题描述】:

我正在启动一个新的 Flink 应用程序,以允许我的公司执行大量报告。我们有一个现有的遗留系统,其中大部分数据都保存在 SQL Server 数据库中。在开始使用新部署的 Kafka 流中的更多数据之前,我们首先需要使用这些数据库中的数据。

我花了很多时间阅读 Flink 书籍和网页,但我有一些简单的问题和假设希望你能帮助我进步。

首先,我想使用 DataStream API,这样我们既可以使用历史数据,也可以使用实时数据。我不认为我想使用 DataSet API,但我也没有看到使用 SQL/Table api 的意义,因为我更愿意在 Java 类中编写我的函数。我需要维护自己的状态,似乎 DataStream 键控函数是要走的路。

现在我正在尝试针对我们的生产数据库实际编写代码,我需要能够使用 SQL 查询读取“流”数据 - 似乎没有 JDBC 源连接器,所以我认为我必须制作JDBC 自己调用,然后可能使用 env.fromElements() 创建一个 DataSource。显然这是一个“有界”数据集,但我还有什么意思要加载历史数据呢?将来我还想包含一个 Kafka 流,它只有几周的数据,所以我想我有时需要将来自 SQL Server/Snowflake 数据库的数据与来自 Kafka 流的实时流合并。最佳实践是什么,因为我没有看到讨论此问题的示例。

通过从 JDBC 源检索数据,我还看到了一些使用 StreamingTableEnvironment 的示例 - 我是否打算以某种方式使用它来将 JDBC 连接中的数据查询到我的 DataStream 函数等?同样,我想用 Java 而不是 Flink SQL 编写我的函数。如果我只使用 DataStream API,最好使用 StreamingTableEnvironment 来查询 JDBC 数据吗?

【问题讨论】:

  • 是否有特定的理由不使用 Debezium 或其他更好的特定 CDC 工具?
  • 您似乎在问多个问题,请每个问题问一个重点问题。
  • @OneCricketeer - 这取决于你的用例,Debezium 的实现相当繁重,它在内部使用 Kafka 连接。
  • @SwapnilKhante“重”以什么方式? OP 已经有 Kafka,因此他们可以使用 Kafka Connect。 JDBC 比 CDC 更占用资源,这是我的观点

标签: java jdbc apache-kafka apache-flink flink-streaming


【解决方案1】:

以下方法可用于从数据库中读取数据并创建数据流:

  1. 您可以使用 RichParallelSourceFunction 对数据库进行自定义查询并从中获取数据流。可以在 RichParallelSourceFunction 类的扩展中触发带有 JDBC 驱动程序的 SQL。

  2. 使用Table DataStream API - 可以通过创建 JDBC 目录来查询数据库,然后将其转换为流

  3. 对此的替代方案,可能是更昂贵的解决方案 - 您可以使用 Flink CDC connectors,它为 Apache Flink 提供源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更

然后你可以添加Kafka作为源并获取数据流。

因此,您的管道可能如下所示: 您将两个源都转换为数据流,您可以使用例如协同处理功能加入这些流,这也将使您有可能维护一个状态并在您的业务逻辑中使用它。最后,使用 Sink 函数将您的最终输出发送到数据库、Kafka 甚至 AWS S3 存储桶。

【讨论】:

  • 非常感谢您的回复,并为我提出的一些含糊不清的问题向其他人道歉。还为时过早!事实上,我确实读过关于实现自定义源的信息,因此我确实将编写自己的 JDBC 源,这些源对我的遗留数据库进行必要的查询,并希望将它们与任何“实时”Kafka 数据连接起来。稍后我将研究 CDC 的方法。
猜你喜欢
  • 2018-09-12
  • 1970-01-01
  • 2020-11-10
  • 2016-11-08
  • 1970-01-01
  • 2018-09-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多