【问题标题】:Error : cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture错误:无法转换为 shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture
【发布时间】:2019-12-31 06:04:59
【问题描述】:

我使用 spark-sql 2.4.1 和 spark-cassandra-connector_2.11 和 java8。

在将数据保存到 C* 表时,出现以下错误,有什么解决方法的线索吗?

在 AWS EC2 集群上运行时发生。

Caused by: java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture
    at com.datastax.spark.connector.writer.AsyncExecutor.com$datastax$spark$connector$writer$AsyncExecutor$$tryFuture$1(AsyncExecutor.scala:38)
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:71)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$2.apply(TableWriter.scala:234)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$2.apply(TableWriter.scala:233)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)

这是我正在使用的 pom.xml 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>snpmi</groupId>
<artifactId>ca-datamigration</artifactId>
<version>0.0.1</version>
<name>ca-datamigration</name>
<description>ca-datamigration</description>
<packaging>jar</packaging>

<properties>
  <build.scope>provided</build.scope>
  <app.jar.name>${project.artifactId}-${project.version}-package</app.jar.name>

  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target> 

  <scala.version>2.11.12</scala.version>
  <scala.compat.version>2.11</scala.compat.version>
  <spark.version>2.4.1</spark.version>
  <log4j.version>1.2.16</log4j.version>
  <jackson.version>2.6.7</jackson.version>
  <app.jar.name>${project.artifactId}-${project.version}-package</app.jar.name>
</properties>

<dependencies>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
  </dependency>


  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.compat.version}</artifactId>
    <version>${spark.version}</version>
  </dependency>

  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>${log4j.version}</version>
  </dependency>

  <dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.4</version>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.compat.version}</artifactId>
    <version>${spark.version}</version>
  </dependency>

  <dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>spark-cassandra-connector_${scala.compat.version}</artifactId>
    <version>${spark.version}</version>
  </dependency>

  <dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.6.0</version>
  </dependency>

  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_${scala.compat.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-tags_${scala.compat.version}</artifactId>
    <version>${spark.version}</version>
  </dependency>

  <dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_${scala.compat.version}</artifactId>
    <version>1.4.0</version>
  </dependency>

  <dependency>
    <groupId>com.typesafe</groupId>
    <artifactId>config</artifactId>
    <version>1.3.1</version>
  </dependency>

  <dependency>
    <groupId>com.github.nscala-time</groupId>
    <artifactId>nscala-time_2.11</artifactId>
    <version>2.12.0</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>net.jcazevedo</groupId>
    <artifactId>moultingyaml_2.11</artifactId>
    <version>0.4.0</version>
  </dependency>

  <dependency>
    <groupId>com.twitter</groupId>
    <artifactId>jsr166e</artifactId>
    <version>1.1.0</version>
  </dependency>

  <dependency>
    <groupId>org.yaml</groupId>
    <artifactId>snakeyaml</artifactId>
    <version>1.23</version>
  </dependency>

  <dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-yaml</artifactId>
    <version>${jackson.version}</version>
  </dependency>

</dependencies>
<build>
  <plugins>

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-eclipse-plugin</artifactId>
      <configuration>
    <downloadSources>true</downloadSources>
    <downloadJavadocs>false</downloadJavadocs>
      </configuration>
    </plugin>
  </plugins>
</build>

</project>

尝试删除显式网络,如下所示。

对于 Uberjar

<plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.3</version>
      <executions>
        <execution>
        <phase>package</phase>
        <goals>
            <goal>shade</goal>
        </goals>
        <configuration>
      <filters>
        <filter>
          <artifact>*:*</artifact>
          <excludes>
            <exclude>META-INF/*.SF</exclude>
            <exclude>META-INF/*.DSA</exclude>
            <exclude>META-INF/*.RSA</exclude>
          </excludes>
        </filter>
      </filters>
    </configuration>
          </execution>
      </executions>
    </plugin>

这样可以吗?还是我需要做些什么?

谢谢

【问题讨论】:

  • 为什么需要明确包含 Netty?
  • 查看答案 - 我第一次错过您明确包含驱动程序...

标签: apache-spark datastax datastax-java-driver spark-cassandra-connector


【解决方案1】:

从您的pom.xml 中删除以下依赖项:

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.6.0</version>
</dependency>

Spark Cassandra 连接器 (SCC) 包含连接到 Cassandra 的所有部件,因此您无需包含驱动程序。

而且您使用了错误版本的 spark 连接器 - 您使用的版本是用于 spark-submit--packages 标志,而不是用于链接。尝试使用following version

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.4.1</version>
</dependency>

另外,请注意,SCC 的版本与 Spark 版本不完全匹配。当前 Spark 版本为 2.4.3,SCC 版本为 2.4.1。

【讨论】:

  • 运行 mvn dependency:tree 并检查驱动核心是否是从其他地方拉出来的......
  • 如果您通过提交使用它,那么您不需要将其放入pom.xml... 基本上 - 有两个版本:我指出的一个 - 它用于直接链接,等等。第二个(你正在使用的) - 它用于--packages - 它是一个带有连接器的uberjar。只需按照我描述的方式放置依赖项,然后构建 uberjar。另外,请查看我指出的 Github 上的示例 - 它可以正常工作
猜你喜欢
  • 1970-01-01
  • 2022-01-20
  • 2014-02-03
  • 2021-08-21
  • 2021-11-18
  • 1970-01-01
  • 2022-01-25
  • 2018-11-20
  • 2014-01-15
相关资源
最近更新 更多