【问题标题】:KSQL: Could I use threads in KSQL UDF functions to speed up the process?KSQL:我可以在 KSQL UDF 函数中使用线程来加快进程吗?
【发布时间】:2025-12-31 12:35:11
【问题描述】:

我在 3 节点中运行独立的 ksql-server,与 3 节点的 Kafka 集群通信。从 Topic 创建了一个具有 15 个分区的 Stream,并且数据位于 Stream 中以进行一些扩充。有一段代码为 UDF 来查找 IP2Location.bin 文件,UDF 类看起来像:

import java.io.IOException;
import java.util.Map;

import com.google.gson.Gson;

import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {

    private IP2Location loc = null;
    private Gson gson = null;

    @Udf(description = "fetches the geoloc of the ipaddress.")
    public synchronized String ip2lookup(String ip) {

        String json = null;
        if (loc != null) {
            IP2LocationResult result = null;
            try {
                result = loc.query(ip);
                System.out.println(result);
                json = gson.toJson(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return json;
        }
        return ip;
    }

    @Override
    public void configure(Map<String, ?> arg0) {

        try {
            String db_path = null;
            String os = System.getProperty("os.name").toLowerCase();

            db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";

            loc = new IP2Location(db_path);
            gson = new Gson();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

进入TopicStream 的数据非常快(可能是每秒一百万条记录)。在方法上使用synchronized 时,每个ksql-server 节点的速度为每秒3000 条记录/消息。有了这个速度,你知道,要赶上这个速度需要时间。如果没有synchronized 方法,我会看到损坏的数据,因为单个对象/方法被多个线程左右使用。

问题1:udf 调用将如何被 KSQL 调用/调用?

问题2:我可以使用线程处理udf 中的请求吗?

问题3:Topic/Stream有15个分区,我是否应该启动ksql-servers的15个节点?

谢谢。

【问题讨论】:

    标签: java apache-kafka user-defined-functions ksqldb


    【解决方案1】:

    问题 1:udf 调用将如何被 KSQL 调用/调用?

    不确定你的意思。一旦您的 UDF 可用于 KSQL(请参阅 https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您就可以在您的 KSQL 语句中将 UDF 称为 IP2LOOKUP。您也可以在 KSQL 中运行 SHOW FUNCTIONS 以确认您的 UDF 可用。

    也许你是因为你的下一个问题才问的? KSQL 将一次调用您的 UDF 一条消息。

    问题 2:我可以使用线程处理 udf 中的请求吗?

    您为什么要这样做?您是否担心使用当前 UDF 代码的 KSQL 将无法处理传入的数据量?说到这里,您尝试处理的预期数据量是多少,因为您可能正在尝试过早优化?

    另外,在不了解更多细节的情况下,我认为您的 UDF 的多线程设置不会产生任何优势,因为 UDF 在调用时仍然一次只能处理一条消息(每个 KSQL 服务器或,更准确地说,每个流任务,其中每个 KSQL 服务器可以有很多个任务;我提到这一点是为了清楚地表明 KSQL 中的 UDF 不会通过在所有服务器上只处理一条消息来阻碍您的处理; 处理当然是分布式的并且并行发生)。

    问题 3:Topic/Stream 有 15 个分区,我应该启动 15 个 ksql-servers 节点吗?

    这取决于您的数据量。您可以根据需要旋转尽可能多或尽可能少的 KSQL 服务器。如果数据量较低,单个 KSQL 服务器可能就足够了。如果数据量较大,您可以开始启动额外的 KSQL 服务器,最多 15 个服务器(因为输入主题有 15 个分区)。任何额外的 KSQL 服务器都将处于空闲状态。

    在 15 个 KSQL 服务器不够用的情况下,您应该将输入主题的分区数量从 15 个增加到更大的数量,然后您还可以启动更多的 KSQL 服务器(从而增加计算能力您的设置)。

    【讨论】:

    • 谢谢。我的坏事没有正确框定。 “KSQL 究竟如何调用 / 调用 udf 调用?”应该是,ksql 是在单个对象初始化还是在多个对象/线程中调用 udf 类/方法?我相信我的观点是正确的。
    • 传入数据量为每秒 100 万条消息。
    • 数据量:使用您当前为输入数据设置的 15 个分区进行性能基准测试,并为基准测试运行多达 15 个 KSQL 服务器。如果 15 台服务器不够用,请增加分区数量,随后增加 KSQL 服务器的数量。
    • 嗯,这很有帮助。谢谢。如果我在同一个节点上启动多个服务器,是否有可能以启动客户端 cli 为目标来监控指标?如果没有,我应该使用新节点吗?
    • 嗯,我不明白这个问题。另外,我建议为此创建一个新的 * 问题,因为原来的问题不同并且已经得到回答。
    最近更新 更多