【问题标题】:Importing a text file into Cassandra using Spark when there are multiple variable types当有多种变量类型时,使用 Spark 将文本文件导入 Cassandra
【发布时间】:2014-12-16 23:25:48
【问题描述】:

我正在使用 Spark 将数据从文本文件导入 CQL 表(在 DataStax 上)。我已经用一个所有变量都是字符串的文件成功地做到了这一点。我首先使用 CQL 创建了表,然后在 Spark shell 中使用 Scala 运行:

val file = sc.textFile("file:///home/pr.txt").map(line => line.split("\\|").map(_.toString));
file.map(line => (line(0), line(1))).saveToCassandra("ks", "ks_pr", Seq("proc_c", "proc_d"));

我要导入的其余文件包含多种变量类型。我已经使用 CQL 设置了表并在那里指定了适当的类型,但是在 spark 中导入文本文件时如何转换它们?

【问题讨论】:

    标签: scala cassandra apache-spark cql datastax-enterprise


    【解决方案1】:

    例如,如果 proc_c 是 Int 并且 proc_d 是 Double 你可以这样做:

    file.map{
       line => (line(0), line(1)).
               map({ case (l, r) => (l.toInt, r.toDouble) }).
               saveToCassandra("ks", "ks_pr", Seq("proc_c", "proc_d")
    }
    

    【讨论】:

    • 我相信你的括号有语法错误。您是否打算使用嵌套地图?
    • 修复括号后,我能够使用此答案解决我的问题。我使用成功的代码是:file.map(line => (line(0), line(1), line(2), line(3), line(4), line(5), line(6))) .map({ case (a, b, c, d, e, f, g) => (a.toInt, b.toString, c.toString, d.toString, e.toString, f.toString, g.toString) }) .saveToCassandra("ks", "ks_pr", Seq("proc_a","proc_b","proc_c","proc_d","proc_e","proc_f","proc_g"));
    【解决方案2】:

    使用它从 txt 文件中获取记录并存储到 cassandra db:

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import com.datastax.driver.core.Cluster;
    import com.datastax.driver.core.Row;
    import com.datastax.driver.core.Session;
    
    public class App {
    
      public static void main(String[] args) throws NumberFormatException, IOException {
        String serverIp = " ? ";
        String keyspace = "? ";
        String username=" ?";
        String password=" ? ";
    
        Cluster cluster = Cluster.builder()
                                .addContactPoints(serverIp)
                                .withCredentials(username.trim(), password.trim())
                                .build();
        Session session = cluster.connect(keyspace);
        File file = new File("E:\\new workspace\\Casandracheck3\\text1.txt");
    
        BufferedReader br = new BufferedReader(new FileReader(file)); 
    
        String st; 
        String mc_name=null;
        String mobileno=null;
        String customer_id=null;
        String date_time=null;
        Integer cust_id=0;
        while ((st = br.readLine()) != null) {
    
          StringTokenizer tokenizer = new StringTokenizer(st, ","); 
    
          mc_name = tokenizer.nextToken();
          mobileno = tokenizer.nextToken();
          customer_id=tokenizer.nextToken();
          date_time=tokenizer.nextToken();
          cust_id=Integer.parseInt(customer_id);
    
          System.out.println("USERNAME=" + mc_name + "&MOBILENO=" + mobileno + "&CUSTOMER_ID=" + cust_id + "&DATE_TIME=" + date_time);
          System.out.println("checking before queryy..............................");
    
          String cqlStatement = "insert  into table_name(id,mc_name,mc_mobileno,customer_id,mc_imported_date)"
                + "values(now(),'" + mc_name + "','" + mobileno + "'," + customer_id+ ",'"+date_time+"')";
    
          for (Row row : session.execute(cqlStatement)) {
            System.out.println(row.toString());
          }
        }
      }
    }
    

    【讨论】:

    • 这个用于从 txt 文件中获取记录并存储到 cassandra db 中
    • 您好,欢迎来到 SO。您应该编辑您的答案并将此评论放入其中以使其更加可见。
    • 不好意思说,不过这段代码效率真的很低。。。最好用cassandra loader之类的东西
    猜你喜欢
    • 2011-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多