【问题标题】:Datastax Java Driver Custom Retry PolicyDatastax Java 驱动程序自定义重试策略
【发布时间】:2020-12-26 14:29:45
【问题描述】:

我编写了一个自定义重试策略类,我可以在其中传递没有重试驱动程序将执行 onWriteTimeout/onUnavilable/onReadTimeout。

public class CustomRetryPolicy implements RetryPolicy {


  private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);

  @VisibleForTesting
  public static final String RETRYING_ON_READ_TIMEOUT =
      "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, "
          + "received responses: {}, data retrieved: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_WRITE_TIMEOUT =
      "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, "
          + "required acknowledgments: {}, received acknowledgments: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_UNAVAILABLE =
      "[{}] Retrying on unavailable exception on next host (consistency: {}, "
          + "required replica: {}, alive replica: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ABORTED =
      "[{}] Retrying on aborted request on next host (retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ERROR =
      "[{}] Retrying on node error on next host (retries: {})";

  private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";

  private final int readAttempts;
  private final int writeAttempts;
  private final int unavailableAttempts;

  public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
    this.readAttempts = readAttempts;
    this.writeAttempts = writeAttempts;
    this.unavailableAttempts = unavailableAttempts;
  }

  @Override
  public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
      int received, boolean dataPresent, int retryCount) {


    RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_READ_TIMEOUT, LOG_PREFIX, cl, blockFor, received, false, retryCount);
    }

    return decision;
  }



  @Override
  public RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType,
      int blockFor, int received, int retryCount) {
    RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_WRITE_TIMEOUT, LOG_PREFIX, cl, writeType, blockFor, received,
          retryCount);
    }
    return decision;
  }

  @Override
  public RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive,
      int retryCount) {
    RetryDecision decision =
        (retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_UNAVAILABLE, LOG_PREFIX, cl, required, alive, retryCount);
    }

    return decision;
  }

  @Override
  public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount) {
    RetryDecision decision =
        (error instanceof ClosedConnectionException || error instanceof HeartbeatException)
            ? RetryDecision.RETRY_NEXT
            : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ABORTED, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public RetryDecision onErrorResponse(Request request, CoordinatorException error,
      int retryCount) {
    RetryDecision decision =
        (error instanceof ReadFailureException || error instanceof WriteFailureException)
            ? RetryDecision.RETHROW
            : RetryDecision.RETRY_NEXT;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ERROR, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public void close() {

    // Nothing to do

  }



}

我正在使用 datastax java 驱动程序 4.6.0。 但问题是我不能用 CQLSessionBuilder 传递这个类的对象,这可以通过 like

RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();

在旧版本的驱动程序中。我尝试过使用 DriverConfigLoader,但只有传递自定义类名的选项。

你能推荐一下吗?

【问题讨论】:

    标签: java database cassandra datastax datastax-java-driver


    【解决方案1】:

    如果您查看DefaultRetryPolicy 的实现和CustomRetryPolicy 的示例,您会看到两者都接收2 个参数:context 类型为DriverContext,以及带有配置文件名称的字符串。然后您应该能够使用context 通过getConfig 调用获得DriverConfig,然后在配置上使用getProfile 来提取自定义策略所需的配置值 - 您可以将自己的配置值放入配置文件并在重试策略中使用它,如下所示:

    datastax-java-driver {
      advanced.retry-policy {
        class = DefaultRetryPolicy
      }
      profiles {
        custom-retries {
          advanced.retry-policy {
            class = CustomRetryPolicy
            custom-policy {
               read-attempts = 3
               write-attempts = 2
               ...
            }
          }
        }
      }
    }
    

    【讨论】:

    • 谢谢亚历克斯,是否可以传递值而不是从配置文件中提取它,我有一个接口可以提供重试尝试,我正在尝试根据这些值创建自定义重试策略,并在创建 CqlSession 本身时使用它。
    • 您也可以通过编程方式指定配置属性,然后将 application.conf 中的配置与通过编程方式创建的配置合并
    • 谢谢,如果我正确理解了您的回复,我不想像您在回答中提到的那样从 appliation.conf 重试尝试,而是我想务实地设置它[因为它们将来自界面] DriverConfigLoader 然后在创建会话时加载 configloader - withConfigLoader(loader)。
    • 驱动程序的配置可能来自多个来源,并组合在一起,因此您可以通过编程方式设置您的特定内容,其余来自 application.conf 和 reference.conf。您可能需要升级到最新版本的驱动程序 - 我记得 4.6.0 左右的程序加载程序存在一些问题
    猜你喜欢
    • 1970-01-01
    • 2016-06-04
    • 1970-01-01
    • 2016-11-29
    • 2016-11-29
    • 2016-07-21
    • 2018-02-15
    • 1970-01-01
    相关资源
    最近更新 更多