【发布时间】:2018-05-21 19:07:06
【问题描述】:
当我运行写入谷歌云数据存储的数据流作业时,有时我看到指标显示我有一个或两个datastoreRpcErrors:
由于这些数据存储写入通常包含一批键,我想知道在 RpcError 的情况下,是否会自动发生一些重试。如果没有,处理这些情况的好方法是什么?
【问题讨论】:
标签: google-cloud-platform google-cloud-datastore google-cloud-dataflow
当我运行写入谷歌云数据存储的数据流作业时,有时我看到指标显示我有一个或两个datastoreRpcErrors:
由于这些数据存储写入通常包含一批键,我想知道在 RpcError 的情况下,是否会自动发生一些重试。如果没有,处理这些情况的好方法是什么?
【问题讨论】:
标签: google-cloud-platform google-cloud-datastore google-cloud-dataflow
tl;dr:默认情况下,datastoreRpcErrors 会自动重试 5 次。
我在 beam python sdk 中深入研究了datastoreio 的代码。看起来最终的实体突变是通过DatastoreWriteFn()批量刷新的。
# Flush the current batch of mutations to Cloud Datastore.
_, latency_ms = helper.write_mutations(
self._datastore, self._project, self._mutations,
self._throttler, self._update_rpc_stats,
throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS/1000)
RPCError 被write_mutations 中的这段代码在helper 中捕获;并且有一个装饰器@retry.with_exponential_backoff 用于commit 方法;并且默认重试次数设置为5; retry_on_rpc_error 定义了具体的 RPCError 和 SocketError 触发重试的原因。
for mutation in mutations:
commit_request.mutations.add().CopyFrom(mutation)
@retry.with_exponential_backoff(num_retries=5,
retry_filter=retry_on_rpc_error)
def commit(request):
# Client-side throttling.
while throttler.throttle_request(time.time()*1000):
try:
response = datastore.commit(request)
...
except (RPCError, SocketError):
if rpc_stats_callback:
rpc_stats_callback(errors=1)
raise
...
【讨论】:
我认为您应该首先确定发生了哪种错误,以便查看您的选择。
但是,在 Datastore 官方文档中,列出了所有可能的 errors and their error codes 。幸运的是,它们为每个都提供了建议的操作。
我的建议是你实施他们的建议,如果它们对你无效,请查看替代方案
【讨论】: