【问题标题】:Can't connect to cassandra from Pyspark无法从 Pyspark 连接到 cassandra
【发布时间】:2019-09-12 22:27:33
【问题描述】:

我正在尝试从 Pyspark 连接到 cassandra 并运行一些查询。 以下是我完成的所有步骤:

首先我安装了 Spark:

wget http://www.apache.org/dyn/closer.lua/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz

然后:

cd spark-2.1.0-bin-hadoop2.7/

然后我运行这个命令:

./bin/pyspark

我知道了:

16:48 $ ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/02 16:50:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/02 16:50:33 WARN Utils: Your hostname, rleitao-H81M-HD3 resolves to a loopback address: 127.0.1.1; using 192.168.1.26 instead (on interface eth0)
17/05/02 16:50:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/05/02 16:50:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
>>> 

然后:

from pyspark.sql import SQLContext
sql = SQLContext(sc)

然后:

df = sql.read.format("org.apache.spark.sql.cassandra").\
option("spark.cassandra.connection.host", "ec2-IPV4-Adress.REGION.compute.amazonaws.com").\
option("spark.cassandra.auth.username", "user"). \
option("spark.cassandra.auth.password", "pass"). \
option(keyspace="mykeyspace", table="mytable").load()

然后操作,我收到了这个巨大的错误:

    >>> df = sql.read.format("org.apache.spark.sql.cassandra").\
    ... option("spark.cassandra.connection.host", "ec2-IPV4-adress.REGION.compute.amazonaws.com").\
    ... option("spark.cassandra.auth.username", "user"). \
    ... option("spark.cassandra.auth.password", "pass"). \
    ... option(keyspace="mykeyspace", table="mytable").load()
    17/05/02 16:47:43 ERROR Schema: Failed initialising database.
    Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
    java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@fa39daf, see the next exception for details.
        at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
        at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown Source)
        at org.apache.derby.impl.jdbc.EmbedConnection.<init>(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.apache.derby.jdbc.InternalDriver.getNewEmbedConnection(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
        at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
        at java.sql.DriverManager.getConnection(DriverManager.java:664)
        at java.sql.DriverManager.getConnection(DriverManager.java:208)
ct.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
        at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
        at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
        at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
        at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
        at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
        at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
        at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
        at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:57)
        at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
        at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)
        at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
        at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
        at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166)
        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
        at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:192)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        ... 108 more
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/context.py", line 464, in read
        return DataFrameReader(self)
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/readwriter.py", line 70, in __init__
        self._jreader = spark._ssql_ctx.read()
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 79, in deco
        raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"
    >>>

如何使用 cassandra 连接器?我找不到明确的文档。顺便说一句,cassandra 集群在 AWS 上。

任何帮助都会非常感激。

【问题讨论】:

    标签: python amazon-web-services apache-spark cassandra pyspark


    【解决方案1】:
    1. 使用以下命令运行 pyspark:
      ./bin/pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2
    2. 在代码中,使用连接配置创建字典
      hosts = {"spark.cassandra.connection.host": 'host_dns_or_ip_1,host_dns_or_ip_2,host_dns_or_ip_3'}
    3. 在代码中,使用连接配置创建数据框
      data_frame = sqlContext.read.format("org.apache.spark.sql.cassandra").options(**hosts).load(keyspace="your_keyspace", table="your_table")

    【讨论】:

      【解决方案2】:

      以下对我有用:

      ./bin/pyspark --master local[*] --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.2 --conf spark.cassandra.connection.host=host.name --conf spark.cassandra.auth.username=cassandraname --conf spark.cassandra.auth.password=cassandrapwd
      
      >>> df = spark.read.format("org.apache.spark.sql.cassandra")\
         .options(table="tablename", keyspace="keyspacename").load()
      
      >>> df.show()
      

      【讨论】:

        猜你喜欢
        • 2016-04-25
        • 2015-09-25
        • 1970-01-01
        • 2021-09-12
        • 1970-01-01
        • 2019-03-24
        • 2013-09-14
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多