【问题标题】:NoNodeAvailableException after some insert request to cassandra对 cassandra 的一些插入请求后出现 NoNodeAvailableException
【发布时间】:2024-01-19 04:21:01
【问题描述】:

我正在尝试使用异步执行和驱动程序版本 4(与我的 Cassandra 实例相同)将数据插入 Cassandra 本地集群

我已经用这种方式实例化了 cql 会话:

CqlSession cqlSession = CqlSession.builder()
  .addContactEndPoint(new DefaultEndPoint(
    InetSocketAddress.createUnresolved("localhost",9042))).build();

然后我以异步方式创建一个语句:

return session.prepareAsync(
       "insert into table (p1,p2,p3, p4) values (?, ?,?, ?)")
          .thenComposeAsync(
              (ps) -> {
                 CompletableFuture<AsyncResultSet>[] result = data.stream().map(
                     (d) -> session.executeAsync(
                          ps.bind(d.p1,d.p2,d.p3,d.p4)
                       )
                  ).toCompletableFuture()
              ).toArray(CompletableFuture[]::new);
          return CompletableFuture.allOf(result);
      }
);

data 是一个填充了用户数据的动态列表。

当我执行代码时,我得到以下异常:

Caused by: com.datastax.oss.driver.api.core.NoNodeAvailableException: No node was available to execute the query

    at com.datastax.oss.driver.api.core.AllNodesFailedException.fromErrors(AllNodesFailedException.java:53)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.sendRequest(CqlPrepareHandler.java:210)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.onThrottleReady(CqlPrepareHandler.java:167)
    at com.datastax.oss.driver.internal.core.session.throttling.PassThroughRequestThrottler.register(PassThroughRequestThrottler.java:52)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.<init>(CqlPrepareHandler.java:153)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.process(CqlPrepareAsyncProcessor.java:66)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.process(CqlPrepareAsyncProcessor.java:33)
    at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:210)
    at com.datastax.oss.driver.api.core.cql.AsyncCqlSession.prepareAsync(AsyncCqlSession.java:90)

节点处于活动状态,并且在异常上升之前插入了一些数据。我还尝试在会话生成器上设置数据中心名称,但没有任何结果。

如果节点已启动并正在运行,为什么会出现此异常?实际上我只有一个本地节点,这可能是个问题?

【问题讨论】:

    标签: cassandra insert


    【解决方案1】:

    我没有看到的最重要的事情是一种限制当前活动异步线程数量的方法。

    基本上,如果该(映射的)数据流受到足够大的打击,它基本上会创建它正在等待的所有这些新线程。如果来自这些线程的写入数量造成了节点无法跟上或赶不上的足够背压,则节点将变得不堪重负并且不接受请求。

    看看 DataStax 的 Ryan Svihla 的这篇文章:

    Cassandra: Batch Loading Without the Batch — The Nuanced Edition

    它的代码来自3.x版本的驱动,但是概念是一样的。基本上,提供一些方法来限制写入,或限制在任何给定时间运行的“正在运行的线程”的数量,这应该会有很大帮助。

    【讨论】:

      【解决方案2】:

      最后,我找到了一个解决方案,使用BatchStatement 和一些自定义代码来创建一个卡盘列表。

          int chunks = 0;
          if (data.size() % 100 == 0) {
            chunks = data.size() / 100;
          } else {
            chunks = (data.size() / 100) + 1;
          }
      
          final int finalChunks = chunks;
      
          return session.prepareAsync(
                 "insert into table (p1,p2,p3, p4) values (?, ?,?, ?)")
                  .thenComposeAsync(
                          (ps) -> {
      
      
                            AtomicInteger counter = new AtomicInteger();
      
                            final List<CompletionStage<AsyncResultSet>> batchInsert = data.stream()
                                    .map(
                                            (d) -> ps.bind(d.p1,d.p2,d.p3,d.p4)
      
                                    )
                                    .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / finalChunks))
                                    .values().stream().map(
                                            boundedStatements -> BatchStatement.newInstance(BatchType.LOGGED, boundedStatements.toArray(new BatchableStatement[0]))
                                    ).map(
                                            session::executeAsync
                                    ).collect(Collectors.toList());
      
                            return CompletableFutures.allSuccessful(batchInsert);
                          }
                  );
      

      【讨论】:

        最近更新 更多