【问题标题】:Using kafka streams to extract information from logs contained in a kafka topic使用 kafka 流从 kafka 主题中包含的日志中提取信息
【发布时间】:2017-07-05 09:55:22
【问题描述】:

这是我第一次尝试使用 kafka 流。 我已经用我的日志正确地创建了一个主题,如下所示:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning
{"event":"bank.user.patch","ts":"2017-05-11T15:02:53.647+02:00","svc":"dpbank.wb.tlc-1","request":{"ts":"2017-05-11T15:02:53.905+02:00","aw":"c0dc73ae-e903-43c2-bba8-2138d7772945","end_point":"/bank/v1/user/Nqp0a++O4wEKgBCMs35GTw==","method":"PATCH","app_instance":"85164F62-91FA-4FF4-BE8E-1C0BA8F291A9-1488367268","user_agent":"Dy/1.1/iOS/10.1.1/iPhone MDL","user_id":423,"user_ip":"xxxxx","username":"booWlWuCPPltvQgMNiKwrQ==","app_id":"db2ffe87712530981e9871","app_name":"DApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"Ns35GTw==","request_attributes":{"user":{"msisdns":"rTA4G5+h9YA==","selfie":false,"taxcode":"Qz96apGFHlQoV/VtGrJDzZpt6cN4mTxSZs1pEI="}},"response_attributes":{"user":{"sharing_id":"NqpCMs35GTw==","msisdns":"Zsd/v08t6hU9AQV8zXna9Ypv/JITMZj3ulGw=","msisdn_caller":"booWlWuCPPltrQ==","selfie":false,"taxcode":"apGFHlQoV/VtGrJDzZpt6cN4mTxWd+K5SZs1pEI","status":"INCOMPLETE","document_info":{"document_image":false}}}},"class":"DPAPI"}
{"event":"bank.user.patch","ts":"2017-04-07T17:42:31.035+02:00","svc":"dpbank.wb.tlc-1","request":{"ts":"2017-04-07T17:42:31.353+02:00","aw":"99c57-8598-b226af153ab9","end_point":"/ba19XFUV+FA==","method":"PATCH","app_instance":"3558887f-7480-4176-b96c-d989ef1a7aa5-1489492341","user_agent":"Drodroid/5.0.1/Samsung-SM-N910C","user_id":398,"user_ip":"151.14.81.82","username":"dNGqxhJ+4kmmF1h3hgu=","app_id":"db2ffeac6c07712530981e9871","app_name":"DropPayApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"KJl+60+x67JFUV+FA==","request_attributes":{"user":{"sharing_id":"KJl+T619XFUV+FA==","msisdns":[],"firstname":"gR47acZfexoW+HYA==","lastname":"h3gRVpNzavhNu4wQ==","gender":"M","selfie":false,"taxcode":"2INKXPiBeg5acM4nn04S+JrlgJ9rmYHNghUw=","status":"INCOMPLETE","birthinfo":{"city":"Zurigo","date":"1975-06-16","country_id":241},"residence":{"city":"CAIA","address":"Va ello 44","zipcode":"926","country_id":1,"city_id":123}}},"response_attributes":{"user":{"sharing_id":"KJl+60+x67JT619XFUV+FA==","msisdns":[],"firstname":"gR47acZfHdgSGcexoW+HYA==","lastname":"h3MyQR3YgRVpNzavhNu4wQ==","gender":"M","selfie":false,"taxcode":"2INKXPiBeg5acM4nn04S+JrllI6mH2YgJ9rmYHNghUw=","status":"INCOMPLETE","birthinfo":{"city":"Zurigo","date":"1975-06-16","country_id":1},"residence":{"city":"TANIA","address":"Vlo 44","zipcode":"926","country_id":18,"city_id":103},"document_info":{"document_image":false}}}},"class":"DPAPI"}

现在我正在尝试使用这个主题来做一些逻辑。

例如,我想在我的主题中获取每个日志的一些字段,并将它们放在 Ktable 中,然后将它们发送到其他地方。

我尝试过这样做,但不幸的是没有结果

到目前为止,这是我所做的:

public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstate");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> source = builder.stream("example");

然后我尝试了 Ktable,它应该是我的“表”,其中包含一些从日志中获取的字段。

KTable<String, Long> counts = source.print();

        // need to override value serde to Long type
        counts.to("example-output");

只是看看逻辑是否正确,看看我是否正确获取日志并将它们放在另一个“输出”主题中。但是这样做我得到了一个Exception in thread "main" java.lang.NullPointerException...

我在正确的道路上吗?我应该怎么做才能正确读取kafka主题并提取一些字段并将它们放入Ktable中?

谢谢

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    首先,如果您确实显示了完整的异常,这将有所帮助——您为什么以及从哪里获得NullPointerException

    我在正确的道路上吗?我应该怎么做才能正确读取kafka主题并提取一些字段并将它们放入Ktable中?

    您的一般方法没问题,但看起来您的代码有一些错误。例如:

    KTable<String, Long> counts = source.print();
    

    print() 返回void,因此上面的行甚至无法编译。也许您应该仔细检查您的答案是否实际上包含所有相关信息?

    我建议您查看https://github.com/confluentinc/examples 上提供的许多示例和演示应用程序。他们应该为您提供一些模板作为起点。

    【讨论】:

      猜你喜欢
      • 2018-09-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多