【发布时间】:2018-05-30 08:49:05
【问题描述】:
我在 Scala/Spark 中有一个批处理作业,它根据一些输入动态创建 Drools 规则,然后评估规则。我还输入了RDD[T],它对应于要插入规则引擎的事实。
到目前为止,我正在逐一插入事实,然后触发有关该事实的所有规则。我正在使用rdd.aggregate 执行此操作。
seqOp 操作符是这样定义的:
/**
* @param broadcastRules the broadcasted KieBase object containing all rules
* @param aggregator used to accumulate values when rule matches
* @param item the fact to run Drools with
* @tparam T the type of the given item
* @return the updated aggregator
*/
def seqOp[T: ClassTag](broadcastRules: Broadcast[KieBase])(
aggregator: MyAggregator,
item: T) : MyAggregator = {
val session = broadcastRules.value.newStatelessKieSession
session.setGlobal("aggregator", aggregator)
session.execute(CommandFactory.newInsert(item))
aggregator
}
以下是生成规则的示例:
dialect "mvel"
global batch.model.MyAggregator aggregator
rule "1"
when condition
then do something on the aggregator
end
对于相同的 RDD,批处理需要 20 分钟来评估 3K 条规则,但需要 10 小时来评估 10K 条规则!
我想知道按事实插入事实是否是最好的方法。一次插入RDD的所有项目然后触发所有规则更好吗?这对我来说似乎不是最优的,因为所有事实都会同时在工作记忆中。
你发现上面的代码有什么问题吗?
【问题讨论】:
-
听起来你的工作可以利用分区方案中的一些变化。是否有一些规则可以通过不同数量级的运行时间进行评估,或者所有规则或多或少都相似?
-
规则或多或少相似。我在上面放了一个生成规则的例子
-
Spark UI 告诉您数据是如何分区的?你注意到一些歪斜了吗?
-
没有数据在不同的执行者之间被很好的划分
-
您是否注意到异常的 GC 时间?
标签: apache-spark drools