【问题标题】:Cassandra Integration with Apache SparkCassandra 与 Apache Spark 的集成
【发布时间】:2015-05-06 15:22:41
【问题描述】:

我正在尝试运行以下链接中提供的代码: http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java

下面提供的代码:

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;

public class JavaDemo implements Serializable {
    private transient SparkConf conf;

    private JavaDemo(SparkConf conf) {
        this.conf = conf;
    }

    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        generateData(sc);
        compute(sc);
        showResults(sc);
        sc.stop();
    }

    private void generateData(JavaSparkContext sc) {

        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS java_api");
            session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
            session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
            session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
            session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
        }
    }

    private void compute(JavaSparkContext sc) {
    }

    private void showResults(JavaSparkContext sc) {
    }

    public static void main(String[] args) {

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster("local");
        conf.set("spark.cassandra.connection.host", "XX.XX.XX.XX");

        JavaDemo app = new JavaDemo(conf);
        app.run();

    }
}

我收到以下错误:

Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {<Cassandra IP>}:9042<
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:176)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
    at JavaDemo.generateData(JavaDemo.java:28)
    at JavaDemo.run(JavaDemo.java:18)
    at JavaDemo.main(JavaDemo.java:52)
Caused by: java.lang.IllegalArgumentException: Contact points contain multiple data centers:
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.init(LocalNodeFirstLoadBalancingPolicy.scala:47)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1024)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:270)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:169)

有什么我可以做的吗?我试过用 Cassandra 运行 Java 连接,这似乎工作正常

【问题讨论】:

  • 这个异常对我来说很明显:“java.lang.IllegalArgumentException:联系点包含多个数据中心” - 将您的conf.set("spark.cassandra.connection.host", "XX.XX.XX.XX"); 更改为右侧YY.YY.YY.YY
  • 感谢您的回复。我已经尝试更改为使用简单的 Cassandra Java 驱动程序连接的不同工作 IP。得到同样的错误。我们是否需要更改配置来解决:原因:java.lang.IllegalArgumentException:联系点包含多个数据中心
  • 有趣...看起来是从 cassandra 返回的配置导致了该错误。你的集群是多 DC 吗?尝试在groups.google.com/a/lists.datastax.com/forum/#!forum/… 上问这个问题
  • 不,我添加了一个数据中心 >>nodetool status Datacenter: datacenter1 ======================= Status=Up/ Down |/ State=Normal/Leaving/Joining/Moving - 地址加载令牌拥有主机 ID 机架 UN xx.xx.xx.xx 138.48 KB 256? XXXXX-XXXX-XXXX-XXXX-XXXXX-XXXX-XXXX-XXXX-X rack1 注意:非系统keyspaces没有相同的复制设置,生效o

标签: cassandra apache-spark


【解决方案1】:

将项目转换为 Maven 项目,并尝试运行,这以某种方式解决了问题。使用的 Cassandra 版本:2.1.2。我认为这是版本不匹配的问题

下面提供的POM.xml内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mycompany.app</groupId>
  <artifactId>my-app</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>Maven Quick Start Archetype</name>
  <dependencies>
<dependency>
       <groupId>com.datastax.spark</groupId>
       <artifactId>spark-cassandra-connector_2.10</artifactId>
       <version>1.2.0-rc3</version>
</dependency>

<dependency>
       <groupId>com.datastax.spark</groupId>
       <artifactId>spark-cassandra-connector-java_2.10</artifactId>
       <version>1.2.0-rc3</version>
</dependency>
<dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
       <version>1.2.1</version>
</dependency>
<dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
       <version>1.2.1</version>
</dependency>
<dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_2.10</artifactId>
       <version>1.2.0</version>
</dependency>
</dependencies>
</project>

【讨论】:

    猜你喜欢
    • 2012-09-01
    • 1970-01-01
    • 2018-10-04
    • 1970-01-01
    • 2021-01-11
    • 1970-01-01
    • 2017-01-13
    • 1970-01-01
    • 2016-01-29
    相关资源
    最近更新 更多