【问题标题】:Why is my Cassandra Prepared Statement Ingest of Data so slow?为什么我的 Cassandra Prepared Statement 数据摄取如此缓慢?
【发布时间】:2017-09-19 10:51:52
【问题描述】:

我有一个包含 100,000 个名称的 Java 列表,我想将这些名称提取到运行 Datastax Enterprise 5.1 和 Cassandra 3.10.0 的 3 节点 Cassandra 集群中

我的代码被摄取,但需要很长时间。我对集群进行了压力测试,每秒可以进行超过 25,000 次写入。使用我的摄取代码,我得到了大约 200/秒的糟糕性能。

我的 Java 列表中有 100,000 个名称,称为 myList。我使用以下准备好的语句和会话执行来摄取数据。

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");

         int id = 0;

         for(int i = 0; i < myList.size(); i++) {
             id += 1;
             session.execute(prepared.bind(id, myList.get(i)));
        }

我在我的代码中添加了一个集群监视器以查看发生了什么。这是我的监控代码。

    /// Monitoring Status of Cluster
    final LoadBalancingPolicy loadBalancingPolicy =
    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
    ScheduledExecutorService scheduled =
    Executors.newScheduledThreadPool(1);
        scheduled.scheduleAtFixedRate(() -> {
            Session.State state = session.getState();
            state.getConnectedHosts().forEach((host) -> {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                System.out.printf("%s connections=%d, current load=%d, maxload=%d%n",
                        host, connections, inFlightQueries,
                        connections *
                                poolingOptions.getMaxRequestsPerConnection(distance));
            });
    }, 5, 5, TimeUnit.SECONDS); 

监控 5 秒输出显示以下 3 次迭代:

/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=0, maxload=32768
/192.168.20.26:9042 connections=1, current load=1, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768

我似乎没有非常有效地利用我的集群。我不确定自己做错了什么,如果有任何提示,我将不胜感激。

谢谢!

【问题讨论】:

    标签: java cassandra datastax-enterprise


    【解决方案1】:

    使用 executeAsync。

    异步执行提供的查询。此方法不会阻塞。一旦查询被传递到底层网络堆栈,它就会返回。特别是,从这个方法返回并不能保证查询是有效的,甚至不能保证已经提交到活动节点。访问 ResultSetFuture 时将抛出任何与查询失败有关的异常。

    您正在插入大量数据。如果您使用 executeAsync 并且您的集群无法处理这么多的数据,它可能会抛出异常。您可以使用 Semaphore 限制 executeAsync。

    例子:

    PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");
    
    int numberOfConcurrentQueries = 100;
    final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries);
    
    int id = 0;    
    
    for(int i = 0; i < myList.size(); i++) {
        try {
            id += 1;
            semaphore.acquire();
            ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i)));
            Futures.addCallback(future, new FutureCallback<ResultSet>() {
                @Override
                public void onSuccess(ResultSet result) {
                    semaphore.release();
                }
    
                @Override
                public void onFailure(Throwable t) {
                    semaphore.release();
                }
            });
        } catch (Exception e) {
            semaphore.release();
            e.printStackTrace();
        }
    }
    

    来源:
    https://stackoverflow.com/a/30526719/2320144 https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/Session.html#executeAsync-com.datastax.driver.core.Statement-

    【讨论】:

    • 为什么需要 id ?成功数?
    • id 是分区键
    • 这段代码似乎有点破:使用numberOfConcurrentQueries 许可、1 获取和myList.size() 发布创建的信号量......而且我永远不会将信号量用于此类任务。对我来说,一个普通的计数器就足够了。
    • @xmas79 谢谢,已编辑答案。
    猜你喜欢
    • 2015-10-04
    • 2016-05-21
    • 2021-11-28
    • 1970-01-01
    • 1970-01-01
    • 2018-04-16
    • 1970-01-01
    • 1970-01-01
    • 2011-04-12
    相关资源
    最近更新 更多