【问题标题】:How to handle exceptions when connection to Cassandra fails?连接 Cassandra 失败时如何处理异常?
【发布时间】:2018-04-04 08:50:19
【问题描述】:

我的 Cassandra 接收器配置如下所示:

    ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .build();
    };

    CassandraSink
            .addSink(cassandraObjectStream)
            .setClusterBuilder(secureCassandraSinkClusterBuilder)
            .build()
            .name("Cassandra-Sink");

现在,当与 Cassandra 的连接未正确配置时,我会收到 NoHostAvailableException,或者当连接意外断开时,我会收到 ConnectionTimeOutException,有时还会收到 写超时异常。这最终会触发 JobExecutionException 并且整个 Flink 作业终止。

我在哪里捕获这些 Cassandra 异常?这些扔在哪里?我尝试在 CassandraSink 周围放置一个 try-catch 块,但没有这样做。我想捕获这些异常并在连接超时的情况下重试连接到 Cassandra,或者在写入超时的情况下重试写入 Cassandra。

【问题讨论】:

    标签: java cassandra apache-flink


    【解决方案1】:

    AFAIK,您不能尝试使用 CassandraSink 捕获这些异常。

    捕获像 TimeoutException 这样的异常的一种方法是为 Cassandra 实现自己的接收器,但这可能需要很多时间......

    另一种方法是,如果您运行流式作业,您可以将任务重试次数设置为大于 1 到 StreamingExecutionEnvironment.setRestartStrategy,并启用检查点,以便流式作业可以根据上一个检查点继续工作。 CassandraSink 支持 WAL,所以 EXACTLY_ONCE 可以在启用检查点的情况下实现。

    【讨论】:

      猜你喜欢
      • 2018-01-30
      • 2013-01-10
      • 1970-01-01
      • 2017-06-06
      • 2015-12-30
      • 2016-01-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多