【发布时间】:2017-01-31 22:07:35
【问题描述】:
我正在使用 datastax java driver 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在异步写入 QUORUM 一致性。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Semaphore concurrentQueries = new Semaphore(1000);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
concurrentQueries.acquire();
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
concurrentQueries.release();
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
concurrentQueries.release();
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
我上面的保存方法将以非常快的速度从多个线程中调用。如果我的写入速度超出我的 Cassandra 集群可以处理的速度,那么它将开始抛出错误,我希望我的所有写入都应该成功进入 cassandra,而不会造成任何损失。
问题:
我正在考虑使用某种排序队列或缓冲区来排队请求(例如java.util.concurrent.ArrayBlockingQueue)。 “缓冲区已满”意味着客户端应该等待。缓冲区也将用于重新排队失败的请求。然而,为了更公平,失败的请求可能应该放在队列的前面,以便首先重试。此外,我们应该以某种方式处理队列已满并且同时有新的失败请求的情况。然后,一个单线程工作人员将从队列中挑选请求并将它们发送到 Cassandra。由于它不应该做太多,它不太可能成为瓶颈。该工作人员可以应用它自己的速率限制,例如基于com.google.common.util.concurrent.RateLimiter 的时间安排。
实现此队列或缓冲区功能的最佳方法是什么,它可以在写入 Cassandra 时应用特定的番石榴速率限制,或者是否有更好的方法也让我知道?我想以每秒 2000 个请求的速度写信给 Cassandra(这应该是可配置的,以便我可以使用它来查看最佳设置)。
如下面的 cmets 所述,如果内存不断增加,我们可以使用 Guava Cache 或 CLHM 不断删除旧记录,以确保我的程序不会耗尽内存。我们将在盒子上拥有大约 12GB 的内存,这些记录非常小,所以我认为这不是问题。
【问题讨论】:
-
您能否提供一些有关您正在使用的实例和集群的信息以及表创建语句 + 描述一下有关此访问模式的一些信息。您使用的是什么复制因子。通常 cassandra 的写入速度非常快,即使在非常适中的集群上,您也可以远远超过 2000 req/s。您是否还可以检查声明是否真的准备好了,以及出于某种原因客户没有每次都准备声明?在没有实施缓冲的情况下数据进入的速度是多少。我的直觉是你的 cassandra 集群可能需要扩大/扩大一点
-
我们在每个数据中心有三个节点,复制因子为 3。在这张表上,我们将以非常高的速度写入,稍后我们将读取它以进行一些离线分析。是的,我将准备好的语句缓存一次,然后重用该准备好的语句。这些 cassandra 集群设置不在我的控制范围内,因为我们公司的其他团队负责管理,所以我想确保至少我的代码不会失败并且我们能够编写所有内容。
-
我们本可以使用其他数据库,但由于我们将此数据库用于其他目的,因此我们决定也将其用于此目的。以非常高的速度写入确保您不会丢失数据,然后再读取这些记录以进行一些离线比较。我只是想看看这个排队的事情与正常的一次相比会如何。我也想实现它,只是为了了解我们将如何以有效的方式实现它。
-
我现在明白了。问题是,即使您在某些时候将内存用作缓冲区,您也可能会在负载下用完它,因此即使您可以限制速率,它实际上也可能不是一个好主意。您是否考虑过将这些消息放入某种队列(如 kafka 或 sqs)中,然后通过一些简单的应用程序/进程将消息拉出并以您可以轻松控制的速率将它们推送到 cassandra?这种模式效果很好。如果 cassandra 节点死亡,在内存中管理所有这些可能会让您陷入更大的麻烦。而且中间有一个物理队列会更安全。
-
我感觉到你的痛苦 :( 基本上只需要那些家伙添加更多实例,一切都很好。
标签: java cassandra guava datastax-java-driver rate-limiting