【问题标题】:How to read scylladb table in pyspark dataframe如何在pyspark数据框中读取scylladb表
【发布时间】:2019-01-04 13:14:03
【问题描述】:

我正在尝试将安装了一台电脑的 scylladb 表读取到另一台电脑上的 pyspark 数据帧中。
2 台电脑有 ssh 连接,我可以通过 python 代码读取表格,只有在连接 spark 时才会出现问题。我使用过这个连接器:

--packages datastax:spark-cassandra-connector:2.3.0-s_2.11 , 

我的 spark -version = 2.3.1 ,scala-version-2.11.8。

**First Approach**
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().set("spark.cassandra.connection.host","192.168.0.118")
sc = SparkContext(conf = conf)
spark=SparkSession.builder.config(conf=conf).appName('FinancialRecon').getOrCreate()
sqlContext =SQLContext(sc)
data=spark.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

产生的错误:

文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py”,第 172 行,加载中 调用中的文件“/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”,第 1257 行 文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第 63 行,在 deco 文件“/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,第 328 行,在 get_return_value py4j.protocol.Py4JJavaError: 调用 o43.load 时出错。 : java.lang.ClassNotFoundException: org.apache.spark.Logging 在 Spark 2.0 中被移除。请检查您的库是否与 Spark 2.0 兼容 在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:646) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748) 引起:java.lang.NoClassDefFoundError: org/apache/spark/Logging 在 java.lang.ClassLoader.defineClass1(本机方法) 在 java.lang.ClassLoader.defineClass(ClassLoader.java:763) 在 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 在 java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 在 java.net.URLClassLoader.access$100(URLClassLoader.java:73) 在 java.net.URLClassLoader$1.run(URLClassLoader.java:368) 在 java.net.URLClassLoader$1.run(URLClassLoader.java:362) 在 java.security.AccessController.doPrivileged(本机方法) 在 java.net.URLClassLoader.findClass(URLClassLoader.java:361) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:411) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618) 在 scala.util.Try$.apply(Try.scala:192) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) 在 scala.util.Try.orElse(Try.scala:84) 在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618) ... 13 更多 引起:java.lang.ClassNotFoundException:org.apache.spark.Logging 在 java.net.URLClassLoader.findClass(URLClassLoader.java:381) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 33 更多

我使用的另一种方法是:

data=sc.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

为此我得到:

AttributeError: 'SparkContext' 对象没有属性 'read'

第三种方法:

data=sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

为此,我得到与第一种方法相同的错误。

请告知是scylla spark连接器问题还是某些spark库问题以及如何解决。

【问题讨论】:

    标签: apache-spark cassandra scylla


    【解决方案1】:

    按照以下步骤操作:

    1.使用 packages 行运行 spark-shell。使用 --conf 配置默认的 Spark 配置传递键值对,在我的情况下,scylla 主机是 172.17.0.2

    bin/spark-shell --conf spark.cassandra.connection.host=172.17.0.2 --packages datastax:spark-cassandra-connector:2.3.0-s_2.11
    

    2.在 SparkContext、SparkSession、RDD 和 DataFrame 上启用 Cassandra 特定功能:

    import com.datastax.spark.connector._
    import org.apache.spark.sql.cassandra._
    

    3.从scylla加载数据

    val rdd = sc.cassandraTable("my_keyspace", "my_table")
    

    4.测试

    scala> rdd.collect().foreach(println)
    CassandraRow{id: 1, name: ash}
    

    【讨论】:

      【解决方案2】:

      由于版本冲突而发生错误。也许你可以阅读here来解决它。

      第一种方法会起作用,因为 SparkSession 上提供了 read 方法。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-10-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-03-08
        相关资源
        最近更新 更多