【问题标题】:SaveToCassandra , Is there any ordering in which the rows are writtenSaveToCassandra ,是否有任何写入行的顺序
【发布时间】:2017-06-20 14:01:30
【问题描述】:

这是我保存到 Cassandra 表的 RDD 的内容。 但看起来第二行是先写的,然后第一行会覆盖它。所以我最终得到了不好的输出。

(494bce4f393b474980290b8d1b6ebef9,2017-02-01,PT0H9M30S,星期三) (494bce4f393b474980290b8d1b6ebef9,2017-02-01,PT0H10M0S,星期三)

有没有办法强制写入 Cassandra 的行的顺序。 请帮忙。 谢谢

【问题讨论】:

  • 什么是表定义?

标签: apache-spark cassandra spark-streaming datastax


【解决方案1】:

SaveToCassandra 有命令吗?

在单个任务中执行是确定性的,但这可能不是 订购您所期待的。这里有两件事需要考虑。

  1. RDD 由 Spark 分区组成,这些分区的执行顺序取决于系统条件。拥有不同数量的核心、异构机器或执行器故障都可能改变执行顺序。可以基于系统以任何顺序执行具有相同 Cassandra 分区数据的两个 Spark 分区。
  2. 对于每个 Spark 分区,记录的批处理顺序与接收顺序相同,但这并不一定意味着它们会以相同顺序发送到 Cassandra。连接器中有一些设置确定何时发送批处理,并且可以想象包含较晚数据的批处理将在包含较早数据的批处理之前执行。这意味着虽然发送批次的顺序是确定性的,但不一定与前一个迭代器的顺序相同。

这对您的应用程序重要吗?

可能不会。仅当您的数据真正分散时,这才真正重要 在 RDD 中。如果特定 Cassandra 分区的条目分布在 多个 Spark 分区,那么 Spark 执行的顺序可能会混乱 你的更新。考虑

Spark Partition 1 has Record A
Spark Partition 2 has Record B

Both Spark Partitions have work start simultaneously, but Record B is
reached before Record A.

但我认为这不太可能是问题。

您遇到的问题很可能是常见问题:the order of statements in my batch is not respected。这个问题的核心是 Cassandra 批处理中的所有语句都是“同时”执行的。这意味着如果任何Primary Key 存在冲突,则需要解决冲突。在这些情况下,Cassandra 为所有冲突选择较大的单元格值。由于连接器会自动将写入同一个分区键的数据批处理在一起,因此最终可能会发生冲突。

您可以在示例中看到这一点,较大的值 (PT0H9M30S) 被保留,较小的值 (PT0H10M0S) 被丢弃。问题不在于顺序,而在于批处理正在发生。

那么我怎样才能根据时间进行更新插入呢?

非常小心。我会考虑采取几种方法。

最好的选择是不根据时间进行更新插入。如果您有多个 PRIMARY_KEY 条目,但只想要最后一个,请在点击 Cassandra 之前减少 Spark。在尝试写入之前删除不需要的条目将节省时间并减轻 Cassandra 集群的负载。否则,您会将 Cassandra 用作相当昂贵的重复数据删除机器。

更糟糕的选择是在 Spark Cassandra 连接器中禁用批处理。这会损害性能,但如果您只关心 Spark 分区中的顺序,就会解决问题。如果您有多个 Spark 分区,这仍然会导致冲突,因为您无法控制它们的执行顺序。

这个故事的寓意

状态不好。秩序不好。如果可能的话,将您的系统设计为幂等的。如果有多个记录并且您知道哪些记录重要,请在使用分布式 LWW 系统之前删除不重要的记录。

【讨论】:

  • 感谢您的回复。学到了很多。我很感激。
【解决方案2】:

这一切都取决于您制作的表格的定义。不保证按分区键(主键的第一部分)排序。

主键的其余部分用于对分区内的键进行排序。这就是您的问题所在。您必须定义聚类列。

这里是这样描述的: https://docs.datastax.com/en/cql/3.1/cql/ddl/ddl_compound_keys_c.html

插入的顺序仍然很重要,但仅在有两个相同信息的情况下,最后一个获胜。我认为情况并非如此。

您还可以考虑将“PT0H9M30S”中的信息放在集群列下,这样您就可以保留数据并且不会覆盖它。

【讨论】:

  • 我确实想要 upsert,第一列是用户,第三列是时间段。我的键是 (userid, date) 对于给定的用户和日期组合,我只想看到 1 行。所以我无法将句点添加到密钥中。但我的问题是 PT0H10M0S 被 PT0H9M30S 覆盖,尽管 RDD 中的行顺序是 PT0H9M30S 然后是 PT0H10M0S。感谢您的意见。谢谢
【解决方案3】:

Cassandra 是时间序列数据库。您应该设计您的表,以便不会发生覆盖。或者如果你想写最早/最新的时间戳,那么你应该使用像 reduceByKey 这样的转换来减少你的 RDD,只保留特定键的最早/最新的时间戳信息。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-03-22
    • 2018-02-04
    • 2016-12-11
    • 1970-01-01
    • 2020-05-27
    相关资源
    最近更新 更多