【发布时间】:2019-08-05 20:38:22
【问题描述】:
我在 java 中有一个 Map 列表,基本上代表行。
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
我正在尝试从中创建一个 Spark DataFrame。
我尝试将其转换为 JavaRDD<Map<String, Object>> 使用
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
但我不知道如何从这里到Dataset<Row>。我见过 Scala 的例子,但在 Java 中没有。
我也尝试将列表转换为 JSON 字符串,并读取 JSON 字符串。
String jsonStr = mapper.writeValueAsString(dataList);
但似乎我必须将其写入文件然后使用
Dataset<Row> df = spark.read().json(pathToFile);
如果可能的话,我宁愿在内存中进行,而不是写入文件并从那里读取。
SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")
.set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate();
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
ObjectMapper mapper = new ObjectMapper();
String jsonStr = mapper.writeValueAsString(dataList);
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);
data.show();
【问题讨论】:
-
dataframe/dataset 是柱状结构。您希望地图行关联的列(或列)的值是多少?顺便说一句,您是否尝试过“createDataFrame(rows, Map.class)”?结果如何?
标签: java apache-spark apache-spark-dataset