【发布时间】:2020-12-20 21:09:35
【问题描述】:
我有一个实时流解决方案,其中包含 Kafka、Spark(作为聚合引擎)和 Cassandra(作为存储)。用户定义所需的聚合,引擎创建聚合并将它们写入存储。以下是如何创建聚合的示例
CREATE AGGR COUNT FROM input_data WHERE type,event,id
这会为 3 列创建一个 count 聚合并写入 C*。
我们还需要处理历史数据。这意味着如果今天创建了一个聚合,我们需要返回并修复它的历史记录。为了迎合这个用例,我们在 Cassandra 中创建了一个 hvalue 列。这是供参考的架构
CREATE TABLE tbl (
key blob,
key2 blob,
key3 blob,
...
key15 blob,
column1 blob,
column2 blob,
...
column20 blob,
*hvalue* blob,
*value* blob,
PRIMARY KEY ((key, key2, key3 ... key15), column1 ... column20)
) WITH CLUSTERING ORDER BY (column1 ASC,column2 ASC .. column20 ASC)
value 存储在线处理时计算的事实。 hvalue 存储用于历史处理的值。查询时,两列都被检索、合并并返回给用户。
我们正在使用 datastax leftJoin API 来加入 Cassandra。
RDD.leftJoinWithCassandraTable(keyspace,tableName)
.on(SomeColumns(...)
.map { case (ip, row) => row match {
case None => ip
case Some(data) => CASSANDRA_MAP_SCHEMA(...)
)
}
}.saveToCassandra(keyspace,tableName)
简而言之,我们为 RDD 创建一个模式,并将该行写入 Cassandra。
现在,问题来了。在历史过程中,我们需要创建一行来写入 Cassandra。这意味着我们需要向“值”列提供一些数据。如果它是 Cassandra 中不存在的新行,我们创建一个空对象并回写。如果该行存在,我们获取现有值并将其写回。 在线和历史进程将同时运行。这意味着当历史进程读取一行并回写时,在线进程可能已经创建了同一行。这将导致数据损坏,因为历史进程可能会读取过时的数据并更新在线进程写入的值。 我不知道如何解决这个问题。如果有任何其他解决方案可以防止这种情况,我将不胜感激。 我尽力解释,如果需要进一步说明,请告诉我,我会尝试添加更多输入。
提前感谢您的帮助。
【问题讨论】: