【问题标题】:How to load Spark Cassandra Connector in the shell?如何在 shell 中加载 Spark Cassandra 连接器?
【发布时间】:2014-11-08 08:28:51
【问题描述】:

我正在尝试在 Spark 1.1.0 中使用 Spark Cassandra Connector

我已经成功地从 GitHub 上的 master 分支构建了 jar 文件,并且已经让包含的演示工作。但是,当我尝试将 jar 文件加载到 spark-shell 时,我无法从 com.datastax.spark.connector 包中导入任何类。

我尝试在spark-shell 上使用--jars 选项,并将带有jar 文件的目录添加到Java 的CLASSPATH。这些选项都不起作用。事实上,当我使用 --jars 选项时,日志输出显示 Datastax jar 正在加载,但我仍然无法从 com.datastax 导入任何内容。

我已经能够使用--jars 将Tuplejump Calliope Cassandra 连接器加载到spark-shell,所以我知道这是有效的。只是 Datastax 连接器对我来说失败了。

【问题讨论】:

  • 这里也一样。我用 sbt 构建了 spark-cassandra-connector。我使用命令$ ./spark-shell --jars ~/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar 并在日志INFO spark.SparkContext: Added JAR file:/root/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar at http://xx.xx.xx.xx:60296/jars/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar with timestamp 1414618174823 中看到这个,但仍然不能import com.datastax.spark.connector._。我正在使用 Spark 1.1.0。

标签: cassandra apache-spark datastax-enterprise


【解决方案1】:

我明白了。以下是我所做的:

$ git clone https://github.com/datastax/spark-cassandra-connector.git
$ cd spark-cassandra-connector
$ sbt/sbt assembly
$ $SPARK_HOME/bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/connector-assembly-1.2.0-SNAPSHOT.jar 

在 scala 提示符下,

scala> sc.stop
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "my cassandra host")
scala> val sc = new SparkContext("spark://spark host:7077", "test", conf)

【讨论】:

  • 最后这个组合对我有用:spark-1.2.1 和 spark-cassandra-connector-1.2.0-rc2
  • 15/10/23 17:26:18 WARN AppClient$ClientEndpoint: 无法连接到主 localhost:7077 a
  • 请参阅下面的@chris-batey 答案 - 这是在 shell 中使用 sc.broadcast 的唯一方法。如果您停止SparkContext 并创建一个新的,如果您尝试使用sc.broadcast,您将获得NotSerialisableException
【解决方案2】:

编辑:现在事情变得容易了一些

有关详细说明,请查看项目网站 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md

或者随意使用 Spark-Packages 来加载库(并非所有版本都已发布) http://spark-packages.org/package/datastax/spark-cassandra-connector

> $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M3-s_2.10

以下假设您使用 OSS Apache C* 运行

您需要使用-driver-class-path 设置来启动该类以包含所有连接器库

我将引用著名 Amy Tobey 的一篇博文

我发现的最简单的方法是使用 then 设置类路径 使用导入的必要类重新启动 REPL 中的上下文 使 sc.cassandraTable() 可见。 新加载的方法不会显示在选项卡完成中。我不知道为什么。

  /opt/spark/bin/spark-shell --driver-class-path $(echo /path/to/connector/*.jar |sed 's/ /:/g')

它会打印一堆日志信息,然后出现scala>提示。

scala> sc.stop

现在上下文已停止,是时候导入连接器了。

scala> import com.datastax.spark.connector._
scala> val conf = new SparkConf()
scala> conf.set("cassandra.connection.host", "node1.pc.datastax.com")
scala> val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
scala> val table = sc.cassandraTable("keyspace", "table")
scala> table.count

如果您使用 DSE 运行

DSE 类加载器和以前的包命名约定存在一个小问题,这将阻止您找到新的 spark-connector 库。您应该能够通过在启动 spark-shell 的脚本中删除指定 DSE 类加载器的行来解决此问题。

【讨论】:

  • 我试过这个,但不幸的是,我仍然无法在 com.datastax 下导入任何内容。看来我的设置肯定有问题,所以我会从头开始重试。
  • 如果您使用的是 Spark 1.1,您需要使用连接器的最新 alpha 版本才能使其正常工作!
  • 我卡在error: object datastax is not a member of package com 有人可以指点我的方向吗?我正在使用火花 1.3
  • @optimist 由于导入了错误的库而遇到此问题。我导入了 spark-assembly 而不是 spark-cassandra-assembly。
  • 本地主机方向?
【解决方案3】:

如果您想避免在 shell 中停止/启动上下文,您还可以将其添加到您的 spark 属性中:

{spark_install}/conf/spark-defaults.conf

spark.cassandra.connection.host=192.168.10.10

【讨论】:

  • bravo - 这实际上是我发现连接到cassandra 的唯一正确方法。以上所有结果都导致sc 不可序列化并且广播变量在spark-shell 中不起作用。
  • 您实际上也可以在启动 shell spark.apache.org/docs/latest/… 时使用 --conf spark.cassandra.connection.host=127.0.0.1 提供连接主机
【解决方案4】:

为了从 spark-shell 访问 Cassandra,我在 cassandra-spark-driver 中构建了一个包含所有依赖项的程序集(“uberjar”)。使用 --jars 选项将其提供给 spark-shell,如下所示:

spark-shell --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar

我遇到了这里描述的同样问题,这种方法既简单又方便(而不是加载一长串依赖项)

我已经用 POM 文件创建了一个要点,您可以download。使用 pom 创建你应该做的 uberjar:

mvn package

如果您使用的是 sbt,请查看 sbt-assembly 插件。

【讨论】:

    【解决方案5】:

    以下步骤描述了如何使用 Spark 节点和 Cassandra 节点设置服务器。

    设置开源 Spark

    这假设您已经安装了 Cassandra。

    第 1 步:下载并设置 Spark

    Go to http://spark.apache.org/downloads.html.
    

    a) 为简单起见,我们将使用其中一个预构建的 Spark 包。 选择 Spark 版本 2.0.0 和 Pre-built for Hadoop 2.7 然后直接下载。这将下载包含 Spark 构建二进制文件的存档。

    b) 将其解压缩到您选择的目录中。我会把我的放在 ~/apps/spark-1.2

    c) 通过打开 Shell 测试 Spark 正在工作

    第 2 步:测试 Spark 的工作原理

    a) cd 进入 Spark 目录 运行“./bin/spark-shell”。这将打开 Spark 交互式 shell 程序

    b) 如果一切正常,它应该显示以下提示:“scala>”

    运行一个简单的计算:

    sc.parallelize(1 到 50).sum(+) 应该输出 1250。

    c) 恭喜 Spark 工作正常! 使用“exit”命令退出 Spark shell

    Spark Cassandra 连接器

    要将 Spark 连接到 Cassandra 集群,需要将 Cassandra 连接器添加到 Spark 项目中。 DataStax 在 GitHub 上提供了他们自己的 Cassandra 连接器,我们将使用它。

    1. 克隆 Spark Cassandra 连接器存储库:

      https://github.com/datastax/spark-cassandra-connector

    2. cd 进入“spark-cassandra-connector” 构建 Spark Cassandra 连接器 通过执行命令

      ./sbt/sbt Dscala-2.11=true 汇编

    这应该将编译后的 jar 文件输出到名为“target”的目录。将有两个 jar 文件,一个用于 Scala,一个用于 Java。 我们感兴趣的 jar 是:“spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar”,用于 Scala。 将 jar 文件移动到一个容易找到的目录中:我把我的放到 ~/apps/spark-1.2/jars 中

    要将连接器加载到 Spark Shell 中:

    使用以下命令启动 shell:

    ../bin/spark-shell –jars ~/apps/spark-1.2/jars/spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar

    将 Spark 上下文连接到 Cassandra 集群并停止默认上下文:

    sc.stop

    导入必要的jar文件:

    import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
    

    使用 Cassandra 连接详细信息创建一个新的 SparkConf:

    val conf = new SparkConf(true).set("spark.cassandra.connection.host", “本地主机”)

    创建一个新的 Spark 上下文:

    val sc = new SparkContext(conf)

    您现在有一个连接到 Cassandra 集群的新 SparkContext。

    【讨论】:

      【解决方案6】:

      Spark-Cassandra-Connector 用 Window-7,8,10 用 JAVA 完成代码 有用。

      import com.datastax.driver.core.Session;
      import com.datastax.spark.connector.cql.CassandraConnector;
      import com.google.common.base.Optional;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.FlatMapFunction;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.PairFlatMapFunction;
      import scala.Tuple2;
      import spark_conn.Spark_connection;
      import java.io.Serializable;
      import java.math.BigDecimal;
      import java.text.MessageFormat;
      import java.util.*;
      import static com.datastax.spark.connector.CassandraJavaUtil.*;
      
      
      public class App implements Serializable
      {
          private transient SparkConf conf;
      
          private App(SparkConf conf) {
              this.conf = conf;
          }
      
          private void run() {
              JavaSparkContext sc = new JavaSparkContext(conf);
              generateData(sc);
              compute(sc);
              showResults(sc);
              sc.stop();
          }
      
          private void generateData(JavaSparkContext sc) {
          CassandraConnector connector =   CassandraConnector.apply(sc.getConf());
      
              // Prepare the schema
         try{ 
         Session session=connector.openSession();
         session.execute("DROP KEYSPACE IF EXISTS java_api");
         session.execute("CREATE KEYSPACE java_api WITH 
         replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
         session.execute("CREATE TABLE java_api.products 
         (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
         session.execute("CREATE TABLE java_api.sales 
         (id UUID PRIMARY KEY,  product INT, price DECIMAL)");
         session.execute("CREATE TABLE java_api.summaries 
         (product INT PRIMARY KEY, summary DECIMAL)");
        }catch(Exception e){System.out.println(e);}
      
              // Prepare the products hierarchy
         List<Product> products = Arrays.asList(
         new Product(0, "All products", Collections.<Integer>emptyList()),
                      new Product(1, "Product A", Arrays.asList(0)),
                      new Product(4, "Product A1", Arrays.asList(0, 1)),
                      new Product(5, "Product A2", Arrays.asList(0, 1)),
                      new Product(2, "Product B", Arrays.asList(0)),
                      new Product(6, "Product B1", Arrays.asList(0, 2)),
                      new Product(7, "Product B2", Arrays.asList(0, 2)),
                      new Product(3, "Product C", Arrays.asList(0)),
                      new Product(8, "Product C1", Arrays.asList(0, 3)),
                      new Product(9, "Product C2", Arrays.asList(0, 3))
          );
      
         JavaRDD<Product> productsRDD = sc.parallelize(products);
         javaFunctions(productsRDD, Product.class).
         saveToCassandra("java_api", "products");
      
         JavaRDD<Sale> salesRDD = productsRDD.filter
         (new Function<Product, Boolean>() {
                  @Override
                  public Boolean call(Product product) throws Exception {
                      return product.getParents().size() == 2;
                  }
              }).flatMap(new FlatMapFunction<Product, Sale>() {
                  @Override
                  public Iterable<Sale> call(Product product) throws Exception {
                      Random random = new Random();
                      List<Sale> sales = new ArrayList<>(1000);
                      for (int i = 0; i < 1000; i++) {
                        sales.add(new Sale(UUID.randomUUID(), 
                       product.getId(), BigDecimal.valueOf(random.nextDouble())));
                      }
                      return sales;
                  }
              });
      
            javaFunctions(salesRDD, Sale.class).saveToCassandra
            ("java_api", "sales");
          }
      
          private void compute(JavaSparkContext sc) {
              JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
                      .cassandraTable("java_api", "products", Product.class)
                      .keyBy(new Function<Product, Integer>() {
                          @Override
                          public Integer call(Product product) throws Exception {
                              return product.getId();
                          }
                      });
      
              JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
                      .cassandraTable("java_api", "sales", Sale.class)
                      .keyBy(new Function<Sale, Integer>() {
                          @Override
                          public Integer call(Sale sale) throws Exception {
                              return sale.getProduct();
                          }
                      });
      
              JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);
      
              JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
                  @Override
                  public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
                      Tuple2<Sale, Product> saleWithProduct = input._2();
                      List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
                      allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
                      for (Integer parentProduct : saleWithProduct._2().getParents()) {
                          allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
                      }
                      return allSales;
                  }
              });
      
              JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
                  @Override
                  public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
                      return v1.add(v2);
                  }
              }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
                  @Override
                  public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
                      return new Summary(input._1(), input._2());
                  }
              });
      
              javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");
          }
      
          private void showResults(JavaSparkContext sc) {
              JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
                      .cassandraTable("java_api", "summaries", Summary.class)
                      .keyBy(new Function<Summary, Integer>() {
                          @Override
                          public Integer call(Summary summary) throws Exception {
                              return summary.getProduct();
                          }
                      });
      
              JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
                      .cassandraTable("java_api", "products", Product.class)
                      .keyBy(new Function<Product, Integer>() {
                          @Override
                          public Integer call(Product product) throws Exception {
                              return product.getId();
                          }
                      });
      
              List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();
      
              for (Tuple2<Product, Optional<Summary>> result : results) {
                  System.out.println(result);
              }
          }
      
          public static void main(String[] args) {
      //        if (args.length != 2) {
      //            System.err.println("Syntax: com.datastax.spark.demo.App <Spark Master URL> <Cassandra contact point>");
      //            System.exit(1);
      //        }
      
      //      SparkConf conf = new SparkConf(true)
      //        .set("spark.cassandra.connection.host", "127.0.1.1")
      //        .set("spark.cassandra.auth.username", "cassandra")            
      //        .set("spark.cassandra.auth.password", "cassandra");
      
              //SparkContext sc = new SparkContext("spark://127.0.1.1:9045", "test", conf);
      
              //return ;
      
              /* try{
                  SparkConf conf = new SparkConf(true); 
                  conf.setAppName("Spark-Cassandra Integration");
                  conf.setMaster("yarn-cluster");
                  conf.set("spark.cassandra.connection.host", "192.168.1.200");
                  conf.set("spark.cassandra.connection.rpc.port", "9042");
                  conf.set("spark.cassandra.connection.timeout_ms", "40000");
                  conf.set("spark.cassandra.read.timeout_ms", "200000");
                  System.out.println("Hi.......Main Method1111...");
                  conf.set("spark.cassandra.auth.username","cassandra");
                  conf.set("spark.cassandra.auth.password","cassandra");
                  System.out.println("Connected Successful...!\n");
                  App app = new App(conf);
                  app.run();
             }catch(Exception e){System.out.println(e);}*/
      
              SparkConf conf = new SparkConf();
              conf.setAppName("Java API demo");
      //     conf.setMaster(args[0]);
      //        conf.set("spark.cassandra.connection.host", args[1]);
                conf.setMaster("spark://192.168.1.117:7077");
                conf.set("spark.cassandra.connection.host", "192.168.1.200");
                conf.set("spark.cassandra.connection.port", "9042");
                conf.set("spark.ui.port","4040");
                conf.set("spark.cassandra.auth.username","cassandra");
                conf.set("spark.cassandra.auth.password","cassandra");
             App app = new App(conf);
              app.run();
          }
      
          public static class Product implements Serializable {
              private Integer id;
              private String name;
              private List<Integer> parents;
      
              public Product() { }
      
              public Product(Integer id, String name, List<Integer> parents) {
                  this.id = id;
                  this.name = name;
                  this.parents = parents;
              }
      
              public Integer getId() { return id; }
              public void setId(Integer id) { this.id = id; }
      
              public String getName() { return name; }
              public void setName(String name) { this.name = name; }
      
              public List<Integer> getParents() { return parents; }
              public void setParents(List<Integer> parents) { this.parents = parents; }
      
              @Override
              public String toString() {
                  return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
              }
          }
      
          public static class Sale implements Serializable {
              private UUID id;
              private Integer product;
              private BigDecimal price;
      
              public Sale() { }
      
              public Sale(UUID id, Integer product, BigDecimal price) {
                  this.id = id;
                  this.product = product;
                  this.price = price;
              }
      
              public UUID getId() { return id; }
              public void setId(UUID id) { this.id = id; }
      
              public Integer getProduct() { return product; }
              public void setProduct(Integer product) { this.product = product; }
      
              public BigDecimal getPrice() { return price; }
              public void setPrice(BigDecimal price) { this.price = price; }
      
              @Override
              public String toString() {
                  return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
              }
          }
      
          public static class Summary implements Serializable {
              private Integer product;
              private BigDecimal summary;
      
              public Summary() { }
      
              public Summary(Integer product, BigDecimal summary) {
                  this.product = product;
                  this.summary = summary;
              }
      
              public Integer getProduct() { return product; }
              public void setProduct(Integer product) { this.product = product; }
      
              public BigDecimal getSummary() { return summary; }
              public void setSummary(BigDecimal summary) { this.summary = summary; }
      
              @Override
              public String toString() {
                  return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-11-11
        • 1970-01-01
        • 2020-11-30
        • 1970-01-01
        • 2019-02-12
        • 2017-08-19
        • 2016-08-02
        • 2019-05-13
        相关资源
        最近更新 更多