【发布时间】:2017-05-19 11:36:02
【问题描述】:
在嵌套分组方面需要帮助。 spark和scala非常新。感谢您的专家建议。
我正在使用 spark 对 mongo 集合进行转换。我正在使用 IntelliJ-Idea。以下是收藏详情:
{
_id:
customer:
product:
location:
date:
transType:
}
用例:对于每个“产品”和每个位置,交易类型为“已订购”的客户。
//输出类似这样的东西
{
Product: ABCD
location: North america
customer: Cust 1, type: ordered
total: 200
}
{
Product: EFGH
location: North america
customer: Cust 2, type: Ordered
total: 300
}
这是我目前所拥有的:
val conf = new SparkConf().setAppName("PVL").setMaster("local").
set("spark.mongodb.input.uri","mongodb://127.0.0.1:27017/product.transactionEvent").
set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/product.transctionResult")
val sc = new SparkContext(conf)
val rdd = sc.loadFromMongoDB()
val aggRdd = rdd.withPipeline(Seq(
Document.parse("{$match: {transType: 'ordered'}}"),
Document.parse("""{ $group: {_id: {prodId: "$prodId", customer: "$customer", location: "$location", Transtype: "$Transtype"}, total: {$sum:1}}}"""),
Document.parse("""{$group: {_Id: {prodId: "$_id.prodId"}, details: {$addToSet: {customer: "$_id.customer", location: "$_id.location", transType: "$_id.transType", total: "$total"}}}}""")))
但这由于某种原因不起作用。错误是:
服务器上的'未知组操作员'prodId''
首先,是否可以在 spark 中进行这种嵌套?如果是,我做错了什么? 非常感谢任何帮助
【问题讨论】:
-
错误消息表明您的组字段
prodId不存在于您的transactionEvent集合中。我建议了解MongoDB Aggregation。首先通过mongo shell 测试您的聚合以确保其正常工作。 -
谢谢万。我确实有一个名为 prodId 的字段。我只是没有在问题中列出它,但我在收藏中确实有它。我也在 mongo shell 中对其进行了测试,它抛出了同样的错误。我认为这与我的 $group 嵌套有关。
-
您应该使用正确的文档示例以及您在 mongo shell 上测试的聚合示例来更新您的问题。否则,如果没有适当的上下文,人们将很难为您提供帮助。
-
我想我知道问题出在哪里。在
$group语句之一中,我将_id大写(如_Id)。一旦我删除它工作正常。谢谢
标签: mongodb scala apache-spark intellij-idea aggregation-framework