【发布时间】:2025-12-31 12:35:11
【问题描述】:
我在 3 节点中运行独立的 ksql-server,与 3 节点的 Kafka 集群通信。从 Topic 创建了一个具有 15 个分区的 Stream,并且数据位于 Stream 中以进行一些扩充。有一段代码为 UDF 来查找 IP2Location.bin 文件,UDF 类看起来像:
import java.io.IOException;
import java.util.Map;
import com.google.gson.Gson;
import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {
private IP2Location loc = null;
private Gson gson = null;
@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {
String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}
@Override
public void configure(Map<String, ?> arg0) {
try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();
db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";
loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}
进入Topic 和Stream 的数据非常快(可能是每秒一百万条记录)。在方法上使用synchronized 时,每个ksql-server 节点的速度为每秒3000 条记录/消息。有了这个速度,你知道,要赶上这个速度需要时间。如果没有synchronized 方法,我会看到损坏的数据,因为单个对象/方法被多个线程左右使用。
问题1:udf 调用将如何被 KSQL 调用/调用?
问题2:我可以使用线程处理udf 中的请求吗?
问题3:Topic/Stream有15个分区,我是否应该启动ksql-servers的15个节点?
谢谢。
【问题讨论】:
标签: java apache-kafka user-defined-functions ksqldb