【发布时间】:2020-04-08 12:15:30
【问题描述】:
在我的处理器 API 中,我将消息存储在键值存储中,每 100 条消息我发出一个 POST 请求。如果在尝试发送消息时出现问题(api 没有响应等),我想停止处理消息。在有证据证明 API 调用有效之前。
这是我的代码:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
目前在我的代码中,如果对 API 的调用失败,它将迭代下一个 100(只要它失败,这种情况就会一直发生)并将它们添加到 keyValueStore。我不希望这种情况发生。相反,我宁愿停止流并在 keyValueStore 清空后继续。这可能吗?
我可以扔一个StreamsException吗?
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
throw new StreamsException(e);
}
这会杀死我的流应用程序并因此进程终止吗?
【问题讨论】:
-
收到新消息后计数增加了吗?
-
@TuyenLuong
count不会增加,因为在sendEntriesFromStore方法中我将其重置为0。如果我的POST请求中的某些内容不起作用,我只是不希望将 +100 事件添加到keyValueStore中
标签: apache-kafka kafka-consumer-api apache-kafka-streams