【问题标题】:How to use split on a stream in Apache Camel?如何在 Apache Camel 中的流上使用拆分?
【发布时间】:2019-09-06 04:40:18
【问题描述】:

当从数据库中选择大量数据时,我想使用骆驼的split-组件。它应该将流拆分为 100 个对象并以迭代器格式获取它。

我成功地从 db 接收到多个数据,但是在拆分过程中失败。 我正在从<to> 获取数据,但我不知道在哪里写这个<to> 语法。

<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="DbToKafka" streamCache="true">
        <from uri="timer:foo?repeatCount=1" />
        <split streaming="true">
            <tokenize token="{" />
            <to uri="sql:classpath:sql/get.sql" />
        </split>
        <log message="DATA : ${body}" />
        <marshal>
            <json library="Jackson" />
        </marshal>
        <log message="JSON : ${body}" />
        <to uri="kafka:test-topic?brokers={{kafka.brokers}}" />
        <log message="data sended : ${body}" />
    </route>
</routes>

2019-09-06 13:29:29.380 错误 10260 --- [3 - timer://foo] o.a.camel.processor.DefaultErrorHandler :交付失败 (MessageId:ExchangeId 上的 ID-DESKTOP-225HFIE-1567744166599-0-2: ID-桌面-225HFIE-1567744166599-0-1)。交货后用尽 尝试:1 捕获:java.lang.NullPointerException:源

消息历史 -------------------------------------------------- -------------------------------------------------- ---------------------------------- RouteId ProcessorId 处理器
经过(毫秒)[DbToKafka] [DbToKafka] [timer://foo?repeatCount=1
] [ 9] [DbToKafka ] [split1 ] [使用令牌分割[tokenize{body(): ,}]
] [ 7]

在没有 split 语句的情况下运行,得到以下结果: [{"ROW_ID":"520","COM_CD_NM":"사용중"},{"ROW_ID":"521","COM_CD_NM":"메모지 수수 "},{"ROW_ID":"522","COM_CD_NM":"상호대화
"},{"ROW_ID":"523","COM_CD_NM":"물품수수
"},{"ROW_ID":"524","COM_CD_NM":"기타
"},{"ROW_ID":"525","COM_CD_NM":"자격증변조"},...]

【问题讨论】:

    标签: java apache-kafka apache-camel


    【解决方案1】:

    欢迎来到 StackOverflow!

    错误是因为您试图拆分任何内容。进入拆分前需要先取回数据,然后对拆分数据进行的每一个操作都需要在拆分元素内。

    我认为它无论如何都不起作用,因为您的编组将不起作用,因为标记化将删除令牌 {,因此不是真正的 JSON。

    查看Apache Camel with Json Array split 会给你一个例子来说明如何做到这一点。我怀疑你需要这样的东西 - 编组可能不是必需的,因为无论如何你都是通过 JSONpath 拆分的。

    <route id="DbToKafka" streamCache="true">
        <from uri="timer:foo?repeatCount=1" />
        <to uri="sql:classpath:sql/get.sql" />
        <split streaming="true">
            <jsonpath>$</jsonpath>
            <log message="DATA : ${body}" />
            <marshal>
                <json library="Jackson" />
            </marshal>
            <log message="JSON : ${body}" />
            <to uri="kafka:test-topic?brokers={{kafka.brokers}}" />
            <log message="data sended : ${body}" />
        </split>
    </route>
    

    【讨论】:

      猜你喜欢
      • 2022-08-19
      • 2017-04-14
      • 2023-03-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-11-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多