【发布时间】:2021-05-31 19:43:43
【问题描述】:
我正在用 Java 开发一个自定义 Talend 组件来从 Kafka 主题中读取数据,我的目标是实时处理这些数据并将它们写入文件。
当我获得的数据不是太大时,我的 Java 代码正在运行,否则我会收到此错误:
JAVA ERROR: ArrayIndexOutOfBoundsException
我了解到我发送的数据太大,所以我有两个问题:
- 有没有办法增加我发送的数据的最大大小?
- 如何优化我的代码:在下面的代码中,我可以计算每次轮询中从 Kafka 获得的行数,我认为我可以拆分数据(例如每 10 行 10 行),但问题是是我只能使用一次“return”,这就是我在循环之后发送所有内容的原因。
这是我的 Java 代码:
@Producer
public Record next() {
while (true){
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(10));
System.out.println("new poll");
ArrayList<String> elements = new ArrayList<>();
Integer count = 0;
for (ConsumerRecord<String, Object> record : records) {
count ++;
String current_line = record.value().toString();
JSONObject jsonObject = new JSONObject(current_line);
test = jsonObject.get("type").toString();
elements.add(current_line);
System.out.println("in= " + test);
}
System.out.println("count = " + count.toString());
if (elements.size()>0) {
return builderFactory.newRecordBuilder().withString("name", elements.toString()).build();
}
consumer.commitAsync();
}
}
感谢您的帮助,
问候,
托马斯
【问题讨论】:
-
您好 Thomas,您是否尝试增加 JVM 参数并查看问题是否仍然存在
-
谢谢@AmineBenKhelifa,你是对的,增加RAM内存解决了这个问题,谢谢:)
-
您可以将主题标记为已解决,如果这是解决方案?
标签: java apache-kafka talend