【问题标题】:Custom Aggregator Function before Spark 1.5Spark 1.5 之前的自定义聚合器函数
【发布时间】:2016-06-03 17:27:13
【问题描述】:

我是 Spark 的新手,我想知道如何使用 Cascading 框架做一些非常简单的事情。

假设我有以下数据集:

<date> <cpt_id> <mesure_type> <value>
20160603093021556 cpt1 idx1 11
20160603093021556 cpt1 idx2 22
20160603093021556 cpt1 idx3 33
20160603093021556 cpt1 idx4 44
20160603113021556 cpt2 idx1 09
20160603113021556 cpt2 idx2 45
20160603113021556 cpt2 idx3 66
20160603193021556 cpt1 idx1 13
20160603193021556 cpt1 idx2 25
20160603193021556 cpt1 idx3 33
20160603193021556 cpt1 idx4 44

我想将其汇总以获得以下结果(一种非规范化):

<date> <cpt_id> <idx1> <idx2> <idx3> <idx4>
20160603093021556 cpt1 11 22 33 44
20160603113021556 cpt2 09 45 66 null
20160603193021556 cpt1 13 25 33 44

使用 Cascading,我将使用 GroupBy(以日期和 cpt-id 作为分组键)和 Every 缓冲区来生成非规范化元组。

对于 Spark,似乎需要一个用户定义的聚合器函数,但它仅在 Spark 1.5 之后可用(我的 Yarn 集群上提供了 1.3.1)。

我看不到如何使用 1.3.1 API 进行这样的处理。

感谢您的帮助和建议

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-dataframe cascading


    【解决方案1】:

    我通过以下过程成功地做到了这一点:

    1- 按复合键(日期,cpt_id)对行进行分组 结果,我得到了一个 JavaPairRDD> 数据集

    2- 对该数据集应用地图转换,在作为参数传递给地图的函数中进行“非规范化”

    这是我的代码:

    @Test
    public void testCustomAggregator2() {
    
        DataFrame df = sqlContext.load("src/test/resources/index.json", "json").select("date_mesure", "compteur_id", "type_mesure", "value");
    
        JavaRDD<Row> rows = df.javaRDD();
    
        JavaPairRDD<IndexKey, Iterable<Row>> groupedIndex = rows.groupBy(new Function<Row, IndexKey>() {
            @Override
            public IndexKey call(Row row) throws Exception {
                return new IndexKey(row.getString(0), row.getString(1));
            }
        });
    
        JavaRDD<Row> computedRows = groupedIndex.map(new Function<Tuple2<IndexKey, Iterable<Row>>, Row>() {
    
            @Override
            public Row call(Tuple2<IndexKey, Iterable<Row>> indexKeyIterableTuple2) throws Exception {
    
                Row result = null;
    
                IndexKey key = indexKeyIterableTuple2._1;
    
                Iterable<Row> rowsForKey = indexKeyIterableTuple2._2;
    
                String idx1 = null;
    
                String idx2 = null;
    
                String idx3 = null;
    
                for (Row rowForKey : rowsForKey) {
    
                    String typeMesure = rowForKey.getString(2);
    
                    String value = rowForKey.getString(3);
    
                    switch(typeMesure) {
    
                        case "idx1" :
                            idx1 = value;
                            break;
    
                        case "idx2" :
                            idx2 = value;
                            break;
    
                        case "idx3" :
                            idx3 = value;
                            break;
    
                        default :
                            break;
                    }
                }
    
                result = RowFactory.create(key.getDateMesure(),
                                            key.getCompteurId(),
                                            idx1,
                                            idx2,
                                            idx3);
    
                return result;
            }
        });
    
        List<Row> resultRows = computedRows.collect();
    
        boolean found = false;
    
        for (Row resultRow : resultRows) {
    
            String dateMesure = resultRow.getString(0);
    
            String compteurId = resultRow.getString(1);
    
            if ("20160603093021556".equals(dateMesure)
                    && "cpt1".equals(compteurId)) {
    
                found = true;
    
                String idx1 = resultRow.getString(2);
                String idx2 = resultRow.getString(3);
                String idx3 = resultRow.getString(4);
    
                Assert.assertEquals("11", idx1);
                Assert.assertEquals("22", idx2);
                Assert.assertEquals("33", idx3);
            }
        }
    
        if (!found) {
    
            Assert.fail("Ligne d'index non trouvée");
        }
    }
    

    希望这会有所帮助,如果 soemone 发现代码中有问题,请告诉我。

    正如我所说,我对 Spark 还很陌生,并期待提高自己。

    【讨论】:

      猜你喜欢
      • 2016-10-09
      • 1970-01-01
      • 2019-11-05
      • 1970-01-01
      • 2016-10-10
      • 2021-08-20
      • 2016-01-30
      • 2012-01-19
      • 2016-10-12
      相关资源
      最近更新 更多