【问题标题】:pyspark rdd/dataframe not creating table in cassandra automaticallypyspark rdd/dataframe 不会自动在 cassandra 中创建表
【发布时间】:2021-06-01 15:48:24
【问题描述】:

在检查所有来源后发现 datastax-spark-cassandra 连接器支持在 cassandra 中使用 scala 和 java 中的 rdd 自动创建表。对于 pyspark,可以使用另一个包来完成这项工作——https://github.com/anguenot/pyspark-cassandra。但即使有了这个包也无法自动创建表。使用数据框,我根本没有找到任何选择。我是 pyspark 和 cassandra 的新手,非常感谢任何帮助。也尝试仅使用 anguenot 包作为依赖项。 火花版本:2.4.7 Cassandra:最新的 docker 镜像

Pyspark shell >> pyspark --packages anguenot/pyspark-cassandra:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
>>> spark = SparkSession.builder.master('local[*]').appName('cassandra').config("spark.cassandra.connection.host", "ip").config("spark.cassandra.connection.port", "port").config("spark.cassandra.auth.username", "username").config("spark.cassandra.auth.password", "password").getOrCreate()
>>> from datetime import datetime
>>> rdd = sc.parallelize([{
...     "key": k,
...     "stamp": datetime.now(),
...     "tags": ["a", "b", "c"],
...     "options": {
...             "foo": "bar",
...             "baz": "qux",
...     }
... } for k in ["x", "y", "z"]])

>>> rdd.saveToCassandra("test", "testTable")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'RDD' object has no attribute 'saveToCassandra' 

【问题讨论】:

  • 您对 Spark 2.4 有硬性要求吗?或者你可以升级到 Spark 3?
  • 将检查 Spark3

标签: apache-spark pyspark cassandra rdd spark-cassandra-connector


【解决方案1】:

你应该在创建rdd之前导入pyspark_cassandra

>>> import pyspark_cassandra
>>> rdd = sc.parallelize(...)
>>> rdd.saveToCassandra("test", "testTable")

https://github.com/anguenot/pyspark-cassandra#examples

【讨论】:

  • 试过之前忘记添加这里,这给出了以下错误> java.io.IOException: 找不到 test.testTable 或任何类似命名的键空间和表对。由此我明白无论如何我们不能自动创建表。我对么 ? @Aleksandr Sorokoumov
  • 不确定是否可以从 RDD 自动创建表。例如,Spark 无法猜测哪些字段应该成为主键。
【解决方案2】:

通常,可以从用于 RDD (saveAsCassandraTable or saveAsCassandraTableEx) 或数据帧 (createCassandraTable and createCassandraTableEx) 的 Spark Cassandra 连接器创建表,但此功能仅在 Scala API 中可用。

自版本 3.0 起,Spark Cassandra 连接器 supports Catalogs API (Spark 3+),因此您将能够使用 Spark SQL 处理键空间和表(创建/更改/删除),如下所示:

spark.sql("""
CREATE TABLE casscatalog.ksname.testTable (
     key_1 Int, key_2 Int, key_3 Int, 
     cc1 STRING, cc2 String, cc3 String, value String) 
  USING cassandra
  PARTITIONED BY (key_1, key_2, key_3)
  TBLPROPERTIES (
    clustering_key='cc1.asc, cc2.desc, cc3.asc'
  )
""")

【讨论】:

    猜你喜欢
    • 2018-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-06
    • 1970-01-01
    • 2014-01-17
    • 2021-06-29
    • 2018-09-14
    相关资源
    最近更新 更多