【问题标题】:Convert a List of Map in Java to Dataset in spark将Java中的地图列表转换为火花中的数据集
【发布时间】:2019-08-05 20:38:22
【问题描述】:

我在 java 中有一个 Map 列表,基本上代表行。

List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);

Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);

dataList.add(row1);
dataList.add(row2);

我正在尝试从中创建一个 Spark DataFrame。

我尝试将其转换为 JavaRDD&lt;Map&lt;String, Object&gt;&gt; 使用

JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);

但我不知道如何从这里到Dataset&lt;Row&gt;。我见过 Scala 的例子,但在 Java 中没有。

我也尝试将列表转换为 JSON 字符串,并读取 JSON 字符串。

String jsonStr = mapper.writeValueAsString(dataList);

但似乎我必须将其写入文件然后使用

Dataset<Row> df = spark.read().json(pathToFile);

如果可能的话,我宁愿在内存中进行,而不是写入文件并从那里读取。

SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")
            .set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
    SparkSession sparkSession = 
SparkSession.builder().config(sparkConf).getOrCreate();

List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);

Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);

dataList.add(row1);
dataList.add(row2);

ObjectMapper mapper = new ObjectMapper();
    
String jsonStr = mapper.writeValueAsString(dataList);
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);
data.show();

【问题讨论】:

  • dataframe/dataset 是柱状结构。您希望地图行关联的列(或列)的值是多少?顺便说一句,您是否尝试过“createDataFrame(rows, Map.class)”?结果如何?

标签: java apache-spark apache-spark-dataset


【解决方案1】:

您根本不需要使用 RDD。您需要做的是从地图列表中提取所需的架构,将地图列表转换为行列表,然后使用spark.createDataFrame

在 java 中,这有点痛苦,尤其是在创建 Row 对象时,但它是这样的:

List<String> cols = new ArrayList(dataList.get(0).keySet());
List<Row> rows = dataList
    .stream()
    .map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))
    .map(row -> row.collect(Collectors.toList()))
    .map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())
    .map(Row$.MODULE$::fromSeq)
    .collect(Collectors.toList());

StructType schema = new StructType(
    cols.stream()
        .map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))
        .collect(Collectors.toList())
        .toArray(new StructField[0])
);
Dataset<Row> result = spark.createDataFrame(rows, schema);

【讨论】:

    【解决方案2】:
    public class MyRow implements Serializable {
    
      private String fund;
      private String broker;
      private int qty;
    
      public MyRow(String fund, String broker, int qty) {
        super();
        this.fund = fund;
        this.broker = broker;
        this.qty = qty;
      }
    
      public String getFund() {
        return fund;
      }
    
      public void setFund(String fund) {
        this.fund = fund;
      }
    
    
      public String getBroker() {
        return broker;
      }
    
      public void setBroker(String broker) {
        this.broker = broker;
      }
    
      public int getQty() {
        return qty;
      }
    
      public void setQty(int qty) {
        this.qty = qty;
      }
    
    }
    

    现在创建一个 ArrayList。此列表中的每个项目都将作为最终数据框中的行。

    MyRow r1 = new MyRow("f1", "b1", 100);
    MyRow r2 = new MyRow("f2", "b2", 200);
    List<MyRow> dataList = new ArrayList<>();
    dataList.add(r1);
    dataList.add(r2);
    

    现在我们必须把这个 List 转换成一个 DataSet -

    Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);
    ds.show()
    

    【讨论】:

    • 不幸的是,MyRow 的结构是动态变化的,所以我需要能够动态地做到这一点。
    • @gargravarr。我也有动态架构,你是怎么解决的?
    • @gargravarr 任何人都可以为动态模式做到这一点吗?
    【解决方案3】:

    spark文档已经指出如何加载内存中的json字符串。

    这是来自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例

    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // a Dataset<String> storing one JSON object per string.
    List<String> jsonData = Arrays.asList(
            "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
    Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
    Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
    anotherPeople.show();
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    // +---------------+----+
    

    希望对您有所帮助。

    【讨论】:

      【解决方案4】:
      import org.apache.spark.api.java.function.Function;
      private static JavaRDD<Map<String, Object>> rows;
      private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));
      public void test(){
          rows = sc.parallelize(list);
          JavaRDD<Row> rowRDD = rows.map(f);
          Map<String, Object> headMap = list.get(0);
          TreeMap<String, Object> headerMap = new TreeMap<>(headMap);
          List<StructField> fields = new ArrayList<>();
          StructField field;
          for (String key : headerMap.keySet()) {
              System.out.println("key:::"+key);
              Object value = list.get(0).get(key);
              if (value instanceof Integer) {
                  field = DataTypes.createStructField(key, DataTypes.IntegerType, true);
              }
              else if (value instanceof Double) {
                  field = DataTypes.createStructField(key, DataTypes.DoubleType, true);
              }
              else if (value instanceof Date || value instanceof java.util.Date) {
                  field = DataTypes.createStructField(key, DataTypes.DateType, true);
              }
              else {
                  field = DataTypes.createStructField(key, DataTypes.StringType, true);
              }
                  fields.add(field);
          }
          StructType struct = DataTypes.createStructType(fields);
          Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-03-09
        • 1970-01-01
        • 1970-01-01
        • 2020-05-15
        • 1970-01-01
        • 1970-01-01
        • 2021-07-02
        相关资源
        最近更新 更多