【问题标题】:How to use prepared statement efficiently using datastax java driver in Cassandra?如何在 Cassandra 中使用 datastax java 驱动程序有效地使用准备好的语句?
【发布时间】:2015-04-30 19:41:25
【问题描述】:

我需要使用 Datastax Java 驱动程序查询 Cassandra 中的一张表。下面是我的代码,它工作正常 -

public class TestCassandra {

        private Session session = null;
        private Cluster cluster = null;

        private static class ConnectionHolder {
            static final TestCassandra connection = new TestCassandra();
        }

        public static TestCassandra getInstance() {
            return ConnectionHolder.connection;
        }

        private TestCassandra() {
            Builder builder = Cluster.builder();
            builder.addContactPoints("127.0.0.1");

            PoolingOptions opts = new PoolingOptions();
            opts.setCoreConnectionsPerHost(HostDistance.LOCAL, opts.getCoreConnectionsPerHost(HostDistance.LOCAL));

            cluster = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE).withPoolingOptions(opts)
                    .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("DC2")))
                    .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                    .build();
            session = cluster.connect();
        }

    private Set<String> getRandomUsers() {
        Set<String> userList = new HashSet<String>();

        for (int table = 0; table < 14; table++) {
            String sql = "select * from testkeyspace.test_table_" + table + ";";

            try {
                SimpleStatement query = new SimpleStatement(sql);
                query.setConsistencyLevel(ConsistencyLevel.QUORUM);
                ResultSet res = session.execute(query);

                Iterator<Row> rows = res.iterator();
                while (rows.hasNext()) {
                    Row r = rows.next();

                    String user_id = r.getString("user_id");
                    userList.add(user_id);
                }
            } catch (Exception e) {
                System.out.println("error= " + ExceptionUtils.getStackTrace(e));
            }
        }

        return userList;
    }
}

我在我的主应用程序中使用上面这样的类 -

TestCassandra.getInstance().getRandomUsers();

有什么方法可以有效地在getRandomUsers 中使用PreparedStatement?我想我需要确保我只创建一次PreparedStatement,而不是多次创建它。在我当前的架构中,最好的设计是什么?我该如何使用它?

【问题讨论】:

    标签: java cassandra prepared-statement datastax-java-driver


    【解决方案1】:

    您可以为您需要的语句创建一个缓存(这是一个相当基本的示例,让您了解一下)。让我们从创建将用作缓存的类开始。

    private class StatementCache {
        Map<String, PreparedStatement> statementCache = new HashMap<>();
        public BoundStatement getStatement(String cql) {
            PreparedStatement ps = statementCache.get(cql);
            // no statement cached, create one and cache it now.
            if (ps == null) {
                ps = session.prepare(cql);
                statementCache.put(cql, ps);
            }
            return ps.bind();
        }
    }
    

    然后给你的单例添加一个实例:

    public class TestCassandra {
        private Session session = null;
        private Cluster cluster = null;
        private StatementCache psCache = new StatementCache();
        // rest of class...
    

    最后使用函数中的缓存:

    private Set<String> getRandomUsers(String cql) {
    // lots of code.    
            try {
                SimpleStatement query = new SimpleStatement(cql);
                query.setConsistencyLevel(ConsistencyLevel.QUORUM);
                // abstract the handling of the cache to it's own class.
                // this will need some work to make sure it's thread safe
                // as currently it's not.
                ResultSet res = session.execute(psCache.getStatement(cql));
    

    【讨论】:

    • 这太棒了。我现在明白了我们需要做什么,基本上我们需要创建一个单例线程安全类,我们需要在缓存中填充所有新语句,然后使用 getter 中的这些语句。对吗?
    • @david 实际上,我每 n 个线程都有一个缓存(当 map 的 get 变得太昂贵时,您需要解决,这意味着需要一个新的共享缓存)尝试每 5 个 1 个缓存线程。甚至可能每个线程都应该有自己的 ps 缓存(根据那里有多少语句,语句越多,您需要的缓存就越少)。
    • 我没看懂你的上一条评论,缓存每 n 个线程?我们怎样才能做到这一点?
    • 当混淆成为问题时,示例效果最好。假设您有 2000 个线程在执行 SELECT。这意味着您将与 2000 个线程共享 1 个缓存。从性能上讲,这是一个坏主意。相反,对于每 n 个线程,让我们具体说一下,每 20 个线程共享一个缓存。这意味着您将拥有 2000 / 20 = 100 个准备好的语句缓存。你是怎样做的?您需要在线程集与其各自的缓存之间进行映射。我将把实现留给你。另请注意,这些数字是随机的,需要进行负载测试才能找到最佳的缓存比例。
    • 线程缓存不是必需的。 “您应该只准备一次,并将 PreparedStatement 缓存在您的应用程序中(它是线程安全的)。如果您使用相同的查询字符串多次调用 prepare,驱动程序将记录一个警告。” docs.datastax.com/en/developer/java-driver/3.0/manual/…
    【解决方案2】:

    我的实现或多或少与上面分享的相同,但有性能检查和实现来处理竞争条件。在我的思考过程中查看代码上的内联 cmets。

     import com.datastax.driver.core.PreparedStatement;
     import com.datastax.driver.core.Session;
     import nl.ing.creditcards.commons.activity.ActivityException;
    
     import java.util.Map;
     import java.util.concurrent.ConcurrentHashMap;
    
    public class StatementCache {
    
    /* prevent cache incoherence issues*/
    private static volatile StatementCache sCacheInstance;
    private static final Map<String, PreparedStatement> holder = new ConcurrentHashMap<>();
    private static final String NOT_PERMITTED = "Operation not permitted";
    
    private StatementCache() {
        /*Prevent access through reflection api.*/
        if (sCacheInstance != null) {
            throw new ActivityException(NOT_PERMITTED, "Use getInstance() to retrieve the instance of this class");
        }
    }
    
    /**
     * Double check locking pattern usage for singleton classes
     *
     * @return
     */
    public static StatementCache getInstance() {
        if (sCacheInstance == null) { //Check for the first time
            synchronized (StatementCache.class) { // second check in order to keep the operation atomic
                if (sCacheInstance == null) sCacheInstance = new StatementCache();
            }
        }
        return sCacheInstance;
    }
    
    /**
     * If {@link StatementCache#getStatement#prepared_statement} is already present in cache,
     * then we don't have to synchronize and make threads wait, otherwise, we synchronize the caching bit.
     *
     * @param session
     * @param cql
     * @return
     */
    public PreparedStatement getStatement(Session session, String cql) {
        PreparedStatement prepared_statement = holder.get(cql);
        if (prepared_statement == null) {
            synchronized (this) {
                prepared_statement = holder.get(cql);
                if (prepared_statement == null) {
                    prepared_statement = session.prepare(cql);
                    holder.put(cql, prepared_statement);
                }
            }
        }
        return prepared_statement;
      }
    }
    

    使用这个缓存单例类很简单:

    public class CacheConsumer{
    
        private static Session session;
    
        CacheConsumer(Session session){
         this.session=session;
       }
    
        public void someMethod(){
          String cqlstatement = "SELECT * FROM SOME_TABLE";
          PreparedStatement statement= 
           StatementCache.getInstance().getStatement(session,cqlstatement);
             // You can now use the prepared statement however you wish.
       }
    }
    

    很简单;)

    【讨论】:

      猜你喜欢
      • 2013-05-31
      • 2015-06-23
      • 2016-07-02
      • 2014-12-03
      • 2015-05-13
      • 1970-01-01
      • 2018-06-07
      • 2020-09-28
      • 2017-11-05
      相关资源
      最近更新 更多