【发布时间】:2018-07-25 17:23:42
【问题描述】:
我目前正在整合 WSO2 的 Siddhi CEP 和 Kafka。我想通过接收来自 Kafka 的事件来生成 Siddhi 流。收到的 Kafka 数据采用 JSON 格式,其中每个事件如下所示:
{
"event":{
"orderID":"1532538588320",
"timestamps":[
15325,
153
],
"earliestTime":1532538
}
}
我尝试在 WSO2 流处理器中运行的 SiddhiApp 如下所示:
@App:name('KafkaSiddhi')
@App:description('Consume events from a Kafka Topic and print the output.')
-- Streams
@source(type='kafka',
topic.list = 'order-aggregates',
partition.no.list = '0',
threading.option = 'single.thread',
group.id = 'time-aggregates',
bootstrap.servers = 'localhost:9092, localhost:2181',
@map(type='json'))
define stream TimeAggregateStream (orderID string,timestamps
object,earliestTime long);
@sink(type="log")
define stream TimeAggregateResultStream (orderID string, timestamps
object, earliestTime long);
-- Queries
from TimeAggregateStream
select orderID, timestamps, earliestTime
insert into TimeAggregateResultStream;
运行此应用程序应记录我正在收听的订单聚合 Kafka 集群中正在更新的所有数据。但是单击运行时我看不到任何输出。
我可以看出 WSO2 流处理器和 order-aggregates 主题之间存在某种类型的交互,因为每当我使用流模式的不一致数据类型运行应用程序时,都会实时输出错误消息。错误消息如下所示:
[2018-07-25_10-14-37_224] ERROR
{org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper} -
Json message {"event":{"orderID":"210000000016183","timestamps":
[1532538627000],"earliestTime":1532538627000}} contains incompatible
attribute types and values. Value 210000000016183 is not compatible with
type LONG. Hence dropping the message. (Encoded)
但是,当我正确设置架构时,我在运行应用程序时根本不会收到任何输出。我真的不知道如何理解这一点。当我尝试通过在包含“插入”的行中放置断点来调试它时,调试器永远不会在该行停止。
谁能提供一些关于如何解决这个问题的见解?
【问题讨论】:
标签: apache-kafka wso2 siddhi stream-processing