【问题标题】:Transfer data from Kafka store to a Kafka topic将数据从 Kafka 存储传输到 Kafka 主题
【发布时间】:2022-01-11 10:07:20
【问题描述】:

我想在 Kafka 中做这样的事情:

  1. 继续在 KStream/Ktable/Kafka-store 中存储数据
  2. 当我的应用程序接收到特定事件/数据时,仅将特定数据集从上述存储发送到主题。

我们可以在 Kafka 中做到这一点吗?我认为单独使用 Kafka 消费者不会对此有所帮助,因为当一组数据已被使用时,我们无法启动/暂停消费者。

【问题讨论】:

  • 您可以尝试根据您需要的过滤器从上面的商店创建 kstream,以便创建带有所需数据的流创建主题。
  • 你能举个例子吗

标签: apache-kafka apache-kafka-streams


【解决方案1】:

这里是流创建和加入流创建配置:-

CREATE STREAM Customer Profile WITH (
KAFKA_TOPIC = 'Customer Profile',
VALUE_FORMAT = 'AVRO',
KEY_FORMAT = 'AVRO'

);

CREATE STREAM Customer Buy WITH (
KAFKA_TOPIC = 'Customer Buy',
VALUE_FORMAT = 'AVRO',
KEY_FORMAT = 'AVRO'

);

在这里,我正在创建一个 Kafka 流,其中包含从 MYSQL 表创建的另一个主题

创建流后,我们需要在应用连接和过滤后创建另一个流,但流中的连接有条件,我们只能应用两个流的连接。 这是加入前:-

CREATE STREAM account_summary_by_category_by_month_by_accunt_no(
account_no int,
date_time timestamp,
category text,
category_description text,
amount float,
customer_id int,
FROM Customer_Profile  cp INNER JOIN Customer_Buy cb
WITHIN 7 DAYS
ON cp.id = cb.order_id;

您可以在此处应用所有 ksql 函数,如过滤数据的 SQL

【讨论】:

  • 我不认为你可以在 ksql 实体或 Kafka 主题中包含空格
  • 我知道,但这是为了理解
猜你喜欢
  • 2016-05-27
  • 1970-01-01
  • 2022-09-25
  • 2018-11-18
  • 2014-09-20
  • 2023-03-26
  • 1970-01-01
  • 2021-08-20
  • 2021-10-23
相关资源
最近更新 更多