【问题标题】:Camel condition on aggregate of messages消息聚合的骆驼条件
【发布时间】:2013-12-16 11:17:42
【问题描述】:

我正在寻找一种基于消息聚合有条件地处理消息的方法。我已经研究了很多方法来做到这一点,但似乎 Apache Camel 不支持它。我将解释这个场景,然后是我尝试过的解决方案。

场景: 我正在尝试有条件地清理目录。我每 x 天从目录中轮询一次并获取所有文件(file://...)。我将它路由到一个聚合中,将文件聚合成一个大小(directorySize)。然后我检查这个大小是否超过了某个阈值。

这就是问题所在。如果此条件通过,我现在想删除某些文件,但我无法再访问原始消息,因为它们已聚合在新的交换中。

解决方案:

  • 我尝试再次获取文件来处理它们。问题是,据我所知,您无法按需获取消费者。我尝试使用 pollEnrich,但这只会获取一个文件,而不是目录中的所有文件。
  • 我试图过滤/停止父路由。这里的问题是 filter()/choice...stop()/end() 只会停止带有目录大小的聚合路由,而不是带有文件消息的父路由。我无法有条件地处理这些。
  • 我尝试将聚合条件移动到另一个我会调用的路由,但这会导致与第一个解决方案相同的问题。

我考虑做的事情:

  • 重写聚合策略,不仅将大小聚合,而且将文件本身聚合到 groupedExchange 中。这样我可以在检查后再次拆分聚合。我不太喜欢这个解决方案,因为它会在代码中和运行时产生大量样板。
  • 将文件大小计算器移至处理器而不是聚合器。这将首先破坏使用骆驼的目的..我将手动获取文件并添加大小..对于每个文件..
  • 使用 ControlBus 在该目录上动态启动删除路由。再次进行大量解决方法来实现我认为应该能够以简单的方式完成的事情。
  • 我想为每条父消息设置计算大小,但我不知道如何实现?
  • 另一种我没有想到的停止父路由的方法?

我有点震惊,您不能根据这些消息的聚合来优雅地过滤消息。我在 Camel 中错过了什么可以提供优雅的解决方案吗?或者这是一个最不坏的解决方案?

简单架构

消息(文件)

Message(File) --> AggregatedMessage(directorySize) --> 删除某些文件?

消息(文件)

【问题讨论】:

    标签: apache-camel aggregation


    【解决方案1】:

    Camel 确实很棒,但有时确实很难确切地看出要使用哪种设计模式;)

    首先,您需要保留文件对象的副本,因为在达到阈值之前您不知道是否删除它们 - 基本上(至少)有两种方法可以做到这一点。

    备选方案 1

    第一种方法是在交换属性中使用List。无论您对交换body 做什么,此属性都会一直存在。如果您查看GroupedExchangeAggregationStrategy 的源代码,它就是这样做的:

            list = new ArrayList<Exchange>();
            answer.setProperty(Exchange.GROUPED_EXCHANGE, list);
            // ...
            list.add(newExchange);
    

    或者您可以在自己的交换资产上手动执行相同的操作。无论如何,像您所做的那样使用 Grouped 聚合策略是完全可以的。

    备选方案 2

    “保留”旧消息的第二种方法是将副本发送到已停止的SEDA 队列。所以你会做to("seda:xyz")。您将此队列定义为.noAutoStartup()。然后您可以向它发送消息,它们将在由骆驼管理的内部队列中排队。当您要处理消息时,只需通过 controlbus 启动它,然后再停止它。

    一般来说,除非绝对必要,否则应该避免搞乱启动和停止队列,但这肯定是另一种方法

    建议的解决方案

    我建议你照常做(即备选方案 1):

    • aggregate 通过 GroupedExchangeAggregationStrategy 将单个文件保存在列表中
    • 计算总文件大小(使用处理器,或在此过程中使用自定义聚合策略)
    • 使用filter(simple("${body} &lt; 123"))
    • 通过splitter(simple("${property.CamelGroupedExchange}"))“展开”您的聚合
    • 一一删除文件

    如果这没有意义,或者我以任何方式误解了您的问题,请告诉我。

    【讨论】:

    • 你怎么看这个:from("file://location").aggregate(...).to("direct:test"); from("file//location?cron=...").pollEnrich("direct:test").filter(..);这是别人向我建议的。我觉得这会导致更多的开销,但提供了更大的灵活性来删除例如文件,只要大小超过限制,因为你每次都要重新计算。
    • 它还需要一些调整,例如删除读取锁定,因为我同时从两个消费者点读取目录。我不太熟悉在没有骆驼读锁的情况下删除文件的影响。
    • 我没有使用 pollenricher - 它看起来非常类似于民意调查消费者,它对你有用吗?文件锁 - 在删除文件之前,您需要找到一种方法来释放任何读锁,不幸的是,我在骆驼中没有这样做的经验。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-30
    相关资源
    最近更新 更多