我通过以下过程成功地做到了这一点:
1- 按复合键(日期,cpt_id)对行进行分组
结果,我得到了一个 JavaPairRDD> 数据集
2- 对该数据集应用地图转换,在作为参数传递给地图的函数中进行“非规范化”
这是我的代码:
@Test
public void testCustomAggregator2() {
DataFrame df = sqlContext.load("src/test/resources/index.json", "json").select("date_mesure", "compteur_id", "type_mesure", "value");
JavaRDD<Row> rows = df.javaRDD();
JavaPairRDD<IndexKey, Iterable<Row>> groupedIndex = rows.groupBy(new Function<Row, IndexKey>() {
@Override
public IndexKey call(Row row) throws Exception {
return new IndexKey(row.getString(0), row.getString(1));
}
});
JavaRDD<Row> computedRows = groupedIndex.map(new Function<Tuple2<IndexKey, Iterable<Row>>, Row>() {
@Override
public Row call(Tuple2<IndexKey, Iterable<Row>> indexKeyIterableTuple2) throws Exception {
Row result = null;
IndexKey key = indexKeyIterableTuple2._1;
Iterable<Row> rowsForKey = indexKeyIterableTuple2._2;
String idx1 = null;
String idx2 = null;
String idx3 = null;
for (Row rowForKey : rowsForKey) {
String typeMesure = rowForKey.getString(2);
String value = rowForKey.getString(3);
switch(typeMesure) {
case "idx1" :
idx1 = value;
break;
case "idx2" :
idx2 = value;
break;
case "idx3" :
idx3 = value;
break;
default :
break;
}
}
result = RowFactory.create(key.getDateMesure(),
key.getCompteurId(),
idx1,
idx2,
idx3);
return result;
}
});
List<Row> resultRows = computedRows.collect();
boolean found = false;
for (Row resultRow : resultRows) {
String dateMesure = resultRow.getString(0);
String compteurId = resultRow.getString(1);
if ("20160603093021556".equals(dateMesure)
&& "cpt1".equals(compteurId)) {
found = true;
String idx1 = resultRow.getString(2);
String idx2 = resultRow.getString(3);
String idx3 = resultRow.getString(4);
Assert.assertEquals("11", idx1);
Assert.assertEquals("22", idx2);
Assert.assertEquals("33", idx3);
}
}
if (!found) {
Assert.fail("Ligne d'index non trouvée");
}
}
希望这会有所帮助,如果 soemone 发现代码中有问题,请告诉我。
正如我所说,我对 Spark 还很陌生,并期待提高自己。