【问题标题】:Update cassandra rows spark cassandra更新 cassandra 行 spark cassandra
【发布时间】:2015-09-28 14:05:53
【问题描述】:

我正在使用带有 spark cassandra 连接器 1.2.3 的 spark 1.2, 我正在尝试更新表格的某些行:

示例:

CREATE TABLE myTable ( 
a text, 
b text, 
c text, 
date timestamp, 
d text, 
e text static, 
f text static, 
PRIMARY KEY ((a, b, c), date, d) 
) WITH CLUSTERING ORDER BY (date ASC, d ASC)

val interactions = sc.cassandraTable[(String, String, String, DateTime, String, String)]("keySpace", "myTable"). 
select("a","b","c","date", "d", "e","f") 
val empty = interactions.filter(r => r._6 == null).cache() 
empty.count()

我只计算“e”包含 null 的行数,然后用“b”的值替换它们

 val update_inter = empty.map( r =>  (r._1,r._2, r._3, r._4, r._5, r._2)) 
 update_inter.saveToCassandra("keySpace", "myTable", SomeColumns("a","b","c","date", "d", "e", "f"))

这在我签入 cqlsh 时有效,但是当我通过 spark cassandra 请求相同的行时,我仍然得到 null 值。

这是 spark cassandra 连接器中的错误吗?谢谢你的帮助。

【问题讨论】:

  • 您能否说明您是如何尝试检索这些行的?
  • val newInteractions = sc.cassandraTable[(String, String, String, DateTime, String, String)]("keySpace", "myTable"). select("a","b","c","date","d","e","f") val newEmpty = interaction.filter(r => r._6 == null).cache( ) newEmpty .count()
  • 不,这就是您选择要更新的行的方式。我的意思是,一旦行被转换,您将如何尝试检索它们。另外,你确定其他列和r._6一样不为空吗?
  • 是的,方法相同。我正在从 cassandra 表中进行新的选择,并计算 r._6 = null 的行数,逻辑上必须为 0 行。关于其他列是的,我确定它们不是空的
  • 好的,这样就可以告诉您是否已成功更新所有“e”列包含空值的行。这个计数不返回 0 行吗?

标签: apache-spark spark-cassandra-connector


【解决方案1】:

当发生插入/更新时,Cassandra 不会在原地覆盖行,而是将插入或更新数据的新时间戳版本写入另一个 SSTable。

您的 Spark 作业不是更新现有行而是写入新行,或者您的 SSTables 尚未将更改写入磁盘。如果要将结果写入新表,则 null 'e' 列的计数为零。

尝试使用 nodetool 刷新命令并阅读以下内容:Cassandra Compaction

【讨论】:

  • 感谢您的回答,但我尝试了 nodetool flush 和 nodetool repair。但这并不能解决问题。我认为问题是由“e”列的静态思考引起的?他们对静态列的更新有什么问题吗?
【解决方案2】:

.mode('append') 用于附加我猜。我面临类似的问题,但使用 java 连接器,但似乎在 python 中此选项可用

【讨论】:

    猜你喜欢
    • 2019-01-20
    • 2015-10-28
    • 2020-07-27
    • 1970-01-01
    • 2019-04-10
    • 1970-01-01
    • 2018-03-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多