【问题标题】:How to use map-function in SPARK with Java如何在 Java 中使用 SPARK 中的映射函数
【发布时间】:2015-01-05 05:31:42
【问题描述】:

我尝试在 spark 中读取 csv 文件,并且我想拆分以逗号分隔的行,以便我有一个带有二维数组的 RDD。我对 Spark 很陌生。

我尝试过这样做:

public class SimpleApp 
{   
    public static void main(String[] args) throws Exception 
    {       
        String master = "local[2]";
        String csvInput = "/home/userName/Downloads/countrylist.csv";
        String csvOutput = "/home/userName/Downloads/countrylist";

        JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));

        JavaRDD<String> csvData = sc.textFile(csvInput, 1);
        JavaRDD<String> words = csvData.map(new Function <List<String>>() { //line 43
              @Override
              public List<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            });

        words.saveAsTextFile(csvOutput);
    }
}

这应该拆分行并返回一个 ArrayList。但我不确定这一点。 我收到此错误:

SimpleApp.java:[43,58] wrong number of type arguments; required 2

【问题讨论】:

标签: java csv apache-spark


【解决方案1】:

所以这个程序有两个小问题。首先是你可能想要 flatMap 而不是 map,因为你试图返回单词的 RDD 而不是单词列表的 RDD,我们可以使用 flatMap 来展平结果。另一个是,我们的函数类还需要调用它的输入类型。我会将 JavaRDD 单词...替换为:

JavaRDD<String> words = rdd.flatMap(
  new FlatMapFunction<String, String>() { public Iterable<String> call(String s) {
      return Arrays.asList(s.split("\\s*,\\s*"));
    }});

【讨论】:

  • 谢谢,但这是我的错!我想创建一个单词列表的 RDD。我现在用@Gábor Bakos 评论修复了它。我也将 JavaRDD 更改为 JavaRDD>
  • 哦,拉德。在那种情况下,你就这样做
  • 我们如何返回一个向量的rdd?
  • @Holden 你能告诉我reduce函数有什么问题吗stackoverflow.com/questions/63843599/…
【解决方案2】:

这是来自https://opencredo.com/data-analytics-using-cassandra-and-spark/ Java 教程的代码示例。

Scala 代码:

  /* 1*/    val includedStatuses = Set("COMPLETED", "REPAID")
/* 2*/    val now = new Date();
/* 3*/    sc.cassandraTable("cc", "cc_transactions")
/* 4*/      .select("customerid", "amount", "card", "status", "id")
/* 5*/      .where("id < minTimeuuid(?)", now)
/* 6*/      .filter(includedStatuses contains _.getString("status"))
/* 7*/      .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/      .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/      .reduceByKey(_ + _)
/*10*/      .map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/*11*/      .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))

Java 代码:

SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ProjectPropertie.context);
        JavaRDD<Balance> balances = functions.cassandraTable(ProjectPropertie.KEY_SPACE, Transaction.TABLE_NAME)
                .select("customerid", "amount", "card", "status", "id")
                .where("id < minTimeuuid(?)", date)
                .filter( row -> row.getString("status").equals("COMPLETED") )
                .keyBy(row -> new Tuple2<>(row.getString("customerid"), row.getString("card")))
                .mapToPair( row -> new Tuple2<>(row._1,row._2.getInt("amount")))
                .reduceByKey( (i1,i2) -> i1.intValue()+i2.intValue())
                .flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Integer>, Balance>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<Balance> call(Tuple2<Tuple2<String, String>, Integer> r) throws Exception {
                        List<Balance> list = new ArrayList<Balance>();
                        list.add(new Balance(r._1._1, r._1._2, r._2,reportDate));
                        return list.iterator();
                    }
                }).cache();

ProjectPropertie.contextSparkContext 以下是获取 SparkContext 的方法(每个 JVM 只应使用一个上下文):

   SparkConf conf = new SparkConf(true).setAppName("App_name").setMaster("local[2]").set("spark.executor.memory", "1g")
                            .set("spark.cassandra.connection.host", "127.0.0.1,172.17.0.2")
                            .set("spark.cassandra.connection.port", "9042")
                            .set("spark.cassandra.auth.username", "cassandra")
                            .set("spark.cassandra.auth.password", "cassandra");
   SparkContext context = new SparkContext(conf);

对于数据源,我使用的是 Cassandra,其中 172.17.0.2 是运行我的 Cassandra 节点的 docker 容器,而 127.0.0.1 是主机(在这种情况下是本地的)

【讨论】:

【解决方案3】:

这是你应该做的……

        //======Using flatMap(RDD of words)==============
       JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
        JavaRDD<String> counts = csvData.flatMap(new FlatMapFunction<String, String>() {
          //line 43
              @Override
              public Iterable<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            });     

    //======Using map(RDD of Lists of words)==============
        JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
        JavaRDD<List<String>> counts = csvData.map(new Function <String, List<String>>() { //line 43
              @Override
              public List<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            }); 

    //=====================================

    counts.saveAsTextFile(GlobalConstants.STR_OUTPUT_FILE_PATH);

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2020-06-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-18
  • 2019-09-24
  • 2015-06-29
  • 1970-01-01
相关资源
最近更新 更多