【问题标题】:Amazon Keyspace (Cassandra) query no node was available to execute queryAmazon Keyspace (Cassandra) 查询没有节点可用于执行查询
【发布时间】:2021-10-29 11:28:43
【问题描述】:

我在 AWS EMR 中使用在 Apache Flink 上运行的 AWS Keyspace (Cassandra 3.11.2)。下面的查询有时会引发异常。 AWS Lambda 上使用的相同代码也具有相同的异常 NoHost。我做错了什么?

String query = "INSERT INTO TEST (field1, field2) VALUES(?, ?)";
PreparedStatement prepared = CassandraConnector.prepare(query);
int i = 0;
BoundStatement bound = prepared.bind().setString(i++, "Field1").setString(i++, "Field2")
                    .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
ResultSet rs = CassandraConnector.execute(bound);
 at com.datastax.oss.driver.api.core.NoNodeAvailableException.copy(NoNodeAvailableException.java:40)
 at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
 at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
 at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
 at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
 at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53)
 at com.test.manager.connectors.CassandraConnector.execute(CassandraConnector.java:16)
 at com.test.repository.impl.BackupRepositoryImpl.insert(BackupRepositoryImpl.java:36)
 at com.test.service.impl.BackupServiceImpl.insert(BackupServiceImpl.java:18)
 at com.test.flink.function.AsyncBackupFunction.processMessage(AsyncBackupFunction.java:78)
 at com.test.flink.function.AsyncBackupFunction.lambda$asyncInvoke$0(AsyncBackupFunction.java:35)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

这是我的代码:

CassandraConnector.java: 因为 initpreparedStatement 的开销很大,所以我缓存了这个。

public class CassandraConnector {
    private static final ConcurrentHashMap<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<String, PreparedStatement>();

    public static ResultSet execute(BoundStatement bound) {
        CqlSession session = CassandraManager.getSessionInstance();
        return session.execute(bound);
    }

    public static ResultSet execute(String query) {
        CqlSession session = CassandraManager.getSessionInstance();
        return session.execute(query);
    }

    public static PreparedStatement prepare(String query) {
        PreparedStatement result = preparedStatementCache.get(query);
        if (result == null) {
            CqlSession session = CassandraManager.getSessionInstance();
            result = session.prepare(query);
            preparedStatementCache.putIfAbsent(query, result);
        }

        return result;
    }
}

CassandraManager.java: 我正在对会话对象使用单例双重检查锁定。

public class CassandraManager {
    private static final Logger logger = LoggerFactory.getLogger(CassandraManager.class);
    private static final String SSL_CASSANDRA_PASSWORD = "password";
    private static volatile CqlSession session;

    static {
        try {
            initSession();
        } catch (Exception e) {
            logger.error("Error CassandraManager getSessionInstance", e);
        }
    }

    private static void initSession() {
        List<InetSocketAddress> contactPoints = Collections.singletonList(InetSocketAddress.createUnresolved(
                "cassandra.ap-southeast-1.amazonaws.com", 9142));
        DriverConfigLoader loader = DriverConfigLoader.fromClasspath("application.conf");

        Long start = BaseHelper.getTime();
        session = CqlSession.builder().addContactPoints(contactPoints).withConfigLoader(loader)
                .withAuthCredentials(AppUtil.getProperty("cassandra.username"),
                        AppUtil.getProperty("cassandra.password"))
                .withSslContext(getSSLContext()).withLocalDatacenter("ap-southeast-1")
                .withKeyspace(AppUtil.getProperty("cassandra.keyspace")).build();
        logger.info("End connect: " + (new Date().getTime() - start));

    }

    public static CqlSession getSessionInstance() {
        if (session == null || session.isClosed()) {
            synchronized (CassandraManager.class) {
                if (session == null || session.isClosed()) {
                    initSession();
                }
            }
        }

        return session;
    }

    public static SSLContext getSSLContext() {
        InputStream in = null;
        try {
            KeyStore ks = KeyStore.getInstance("JKS");
            in = CassandraManager.class.getClassLoader().getResourceAsStream("cassandra_truststore.jks");
            ks.load(in, SSL_CASSANDRA_PASSWORD.toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            tmf.init(ks);

            SSLContext ctx = SSLContext.getInstance("TLS");
            ctx.init(null, tmf.getTrustManagers(), null);
            return ctx;
        } catch (Exception e) {
            logger.error("Error CassandraConnector getSSLContext", e);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    logger.error("", e);
                }
            }
        }

        return null;
    }
}

应用程序.conf

datastax-java-driver {
  basic.request {
    timeout = 5 seconds
    consistency = LOCAL_ONE
  }
  advanced.connection {
    max-requests-per-connection = 1024
    pool {
      local.size = 1
      remote.size = 1
    }
  }
  advanced.reconnect-on-init = true
  advanced.reconnection-policy {
    class = ExponentialReconnectionPolicy
    base-delay = 1 second
    max-delay = 60 seconds
  }
  advanced.retry-policy {
    class = DefaultRetryPolicy
  }
  advanced.protocol {
    version = V4
  }

  advanced.heartbeat {
    interval = 30 seconds
    timeout = 1 second
  }

  advanced.session-leak.threshold = 8
  advanced.metadata.token-map.enabled = false
}

【问题讨论】:

    标签: java amazon-web-services cassandra flink-streaming amazon-keyspaces


    【解决方案1】:

    有两种情况驱动会报告NoNodeAvailableException

    1. 节点无响应/不可用,驱动程序已将所有节点标记为关闭。
    2. 提供的所有联系方式均无效。

    如果某些插入正在工作但最终遇到NoNodeAvailableException,这表明节点正在过载并最终变得无响应,因此驱动程序不再选择协调器,因为它们都被标记为“关闭”。

    如果所有请求都不起作用,则意味着接触点无法访问或无法解析,因此驱动程序无法连接到集群。干杯!

    【讨论】:

    • 检查“Caused by”也很有意义——这个异常通常表明 SSL 存在问题
    • 这一次,我仍然将成功和查询连接到键空间,所以我不认为所有主机都真的宕机了。 @埃里克·拉米雷斯
    【解决方案2】:

    NoHostAvailableException 是开源驱动程序在重试可用主机后抛出的客户端异常。开源驱动封装了重试的根本原因,可能会造成混淆。

    我建议首先通过设置这些 CloudWatch 指标来提高您的可观察性。您可以按照这个预构建 CloudFormation 模板开始,只需几秒钟。

    以下是使用 Cloud Watch 为 Amazon Keyspaces 设置的 Keyspace & Table Metrics: https://github.com/aws-samples/amazon-keyspaces-cloudwatch-cloudformation-templates

    您还可以使用此帮助程序项目中的以下示例替换重试策略。此项目中的重试策略将尝试或抛出原始异常,这将删除 NoHostAvailableException 的出现,这将为您的应用程序提供更好的透明度。这是 Github 仓库的类似内容:https://github.com/aws-samples/amazon-keyspaces-java-driver-helpers

    如果您使用的是私有 VPC 终端节点,您希望添加以下权限以在 system.peers 表中启用更多条目。 Amazon Keyspaces 刚刚宣布了新功能,该功能将在与私有 VPC 终端节点建立会话时提供更多连接点。

    这是一个关于 Keyspaces 现在如何自动优化通过 AWS PrivateLink 建立的客户端连接以提高可用性以及读写的链接:https://aws.amazon.com/about-aws/whats-new/2021/07/amazon-keyspaces-for-apache-cassandra-now-automatically-optimi/

    此链接讨论将 Amazon Keypscaes 与接口 VPC 终端节点结合使用:https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html。要启用此新功能,您需要为 DescribeNetworkInterfaces 和 DescribeVpcEndpoints 提供额外权限。

      {
       "Version":"2012-10-17",
       "Statement":[
          {
             "Sid":"ListVPCEndpoints",
             "Effect":"Allow",
             "Action":[
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeVpcEndpoints"
             ],
             "Resource":"*"
          }
       ]
    }
    

    【讨论】:

      【解决方案3】:

      我怀疑是这样的:

      .withLocalDatacenter(AppUtil.getProperty("cassandra.localdatacenter"))
      

      拉回与键空间复制定义或配置的数据中心名称不匹配的数据中心名称:

      nodetool status | grep Datacenter
      

      基本上,如果您的连接是使用不存在的本地数据中心定义的,它仍会尝试使用该数据中心中的副本进行读/写。这将失败,因为它显然无法在不存在的数据中心中找到节点。

      这里有类似的问题:NoHostAvailable error in cqlsh console

      【讨论】:

      • 为了清楚起见,我只是编辑我的代码 AppUtil.getProperty("cassandra.localdatacenter") = "ap-southeast-1"。我仍然认为这个问题涉及数据中心,我仍然多次检查我的代码并纠正数据中心。
      • @Scorpion 是否与键空间定义和 Cassandra-rackdc.properties 文件中定义的内容匹配?
      • docs.aws.amazon.com/keyspaces/latest/devguide/… 中所述,本地数据中心是区域。我从本地运行查询选择数据中心 => ap-southeast-1
      猜你喜欢
      • 2022-01-09
      • 2020-02-05
      • 2013-11-26
      • 1970-01-01
      • 2014-12-12
      • 2017-12-28
      • 2015-03-08
      • 2018-02-06
      • 2013-04-07
      相关资源
      最近更新 更多