【问题标题】:Is there a way to access a table created via KSQL (kafka) through spring-boot?有没有办法通过spring-boot访问通过KSQL(kafka)创建的表?
【发布时间】:2020-09-17 18:34:48
【问题描述】:

我是卡夫卡世界的新手,我真的被困在这里。所以,任何帮助将不胜感激。

我使用以下 KSQL 语句从 kafka 流中创建了一个表:

CREATE TABLE calc AS 
SELECT id, datetime, count(*) 
FROM streamA 
GROUP BY id, datetime 
HAVING count(*) = total;

其中 "streamA" 是由 "topicA"

创建的流

我目前正在使用:

  • Java 8,
  • Spring Boot v2.2.9

我的 pom.xml 看起来像:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://maven.apache.org/POM/4.0.0"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <!-- Packaging -->
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>

    <properties>
        <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    </properties>
    <!-- Versioning -->
    <groupId>some.name</groupId>
    <artifactId>kafka.project</artifactId>
    <version>2020.2.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.9.RELEASE</version>
        <relativePath />
    </parent>

    <!-- Meta-data -->
    <name>[${project.artifactId}]</name>
    <description>Kafka Project</description>

    <!-- Dependencies -->
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <!-- Build settings -->
    <build>
        <!-- Plugins -->
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

那么,有两个问题:

  1. 有没有办法通过 Kafka Streams API 访问该表?
  2. 我可以通过我的应用程序而不是 KSQL 执行类似的操作(例如创建该表)吗?

提前谢谢你

更新 感谢您的建议 Shrey Jakhmola (What's the way of running KSQL from spring boot app),但我有一个需要定期访问的大数据集。我认为这个解决方案并不理想。

@Joshua Oliphant,是的,该表是由从主题创建的流生成的。

【问题讨论】:

标签: java spring-boot apache-kafka stream ksqldb


【解决方案1】:
  1. 有没有办法通过 Kafka Streams API 访问该表?

calc 将由名为 CALC 的更改日志主题支持。如果需要,您可以在应用程序中随意使用此主题。使用标准消费者或 Kafka Streams。

但是,如果您只想查询表的当前状态,那么您可以使用 ksqlDB 的pull queries 进行查询。这些允许您从 ksqlDB 正在构建的表中拉回行。该功能是基本的,因为它不是 ksqlDB 提供的核心 streaming SQL 的一部分,但可以满足一些用例。

如果您需要除此之外的其他东西,那么您还有其他选择:

  1. 您可以将结果泵入您选择的更传统的 sql 系统,例如postgres,并查询。 (可以使用ksql的CREATE SINK CONNECTOR将数据导出到postgres)。
  2. 您可以使用标准 Kafka 客户端在自己的应用程序中使用数据。 (尽管这只有在您的应用程序的每个实例都可以保存表中的所有数据时才有效)。
  3. 您可以在应用程序中使用 Kafka Streams 来使用表。这样做的好处是您的应用程序的多个实例可以聚集在一起,因此每个实例只使用表的一部分数据。然后,您可能希望使用Kafka Streams Interactive Queries 访问表的当前状态。将加载
  1. 我可以通过我的应用程序而不是 KSQL 执行类似的操作(例如创建该表)吗?

如果你想把 ksqlDB 排除在外,那么是的,ksqlDB 在内部使用 KAfka 流,所以你可以用 ksqlDB 做的任何事情,你也可以直接用 Kafka Streams 做。

类似的SQL:

CREATE TABLE calc AS 
   SELECT id, datetime, count(*) 
   FROM streamA 
   GROUP BY id, datetime 
   HAVING count(*) = total;

会映射到类似(粗略的代码):

StreamsBuilder builder = new StreamsBuilder();

builder
   .stream("streamA", Consumed.with(<appropriate serde>))
   .groupBy(<a mapper that returns id and datetime as new key>)
   .count()
   .filter(<filter>);
   .toStream()
   .to("CALC");

new KafkaStreams(builder.build(), props, clients).start();

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-09-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-09
    相关资源
    最近更新 更多