【发布时间】:2019-10-28 14:00:20
【问题描述】:
我们的数据流管道有一个 DoFn,它使用 hbase multiget 客户端 api 从 bigtable 读取。这似乎会导致数据流在以下堆栈中随机停止:
处理卡在步骤 AttachStuff/BigtableAttacher 至少 04h10m00s 没有在状态过程中输出或完成 在 sun.misc.Unsafe.park(本机方法) 在 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 在 com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:523) 在 com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get(AbstractApiFuture.java:56) 在 com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback(BatchExecutor.java:276) 在 com.google.cloud.bigtable.hbase.BatchExecutor.batch(BatchExecutor.java:239) 在 com.google.cloud.bigtable.hbase.AbstractBigtableTable.get(AbstractBigtableTable.java:241) 在 com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors(BigtableAnchorsAttacher.java:86) 在 com.askscio.google.docbuilder.BigtableAnchorsAttacher.process(BigtableAnchorsAttacher.java:129) 在 com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling(ScioDoFn.java:39) 在 com.askscio.google.docbuilder.BigtableAnchorsAttacher$DoFnInvoker.invokeProcessElement(未知来源)
我们正在使用 Beam 库 2.12.0。 DoFn 在 StartBundle 中初始化 bigtable 连接。
每次 DoFn 调用从 bigtable 中查找不超过 10 个键
它的单个集群、3 个节点和 SSD。存储利用率为 2.2 GB,最大节点 CPU 利用率为 13%,最大读/写速率为 2000 次读取/秒和 1000 次写入/秒
开始捆绑:
bigtableConn = BigtableConfiguration.connect(
config.getString(ConfigKeys.Google.PROJECT_ID),
config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));
过程:
List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
Get get = new Get(Bytes.toBytes(s))
.addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
.setMaxVersions(1);
gets.add(get);
}
Result[] results= fooTable.get(gets);
拆解:
fooTable.close();
bigTableConn.close();
【问题讨论】:
-
请注意 startBundle 应与 finishBundle 配对,并与拆卸一起设置。鉴于 startBundle 是按包调用的,而 teardown 是按 DoFn 实例调用的,也许您正在打开而不是关闭太多的 bigtable 连接?
-
所以实际的 startBundle 代码如下所示。鉴于我们仅在变量为 null 时创建连接,这种不匹配不应该导致连接泄漏,对吧? ` class Foo { 静态连接 conn=null; @StartBundle public void startBundle(StartBundleContext bundleContext) throws Exception { synchronized (Foo.class) { if (conn == null) { // 创建 bigtable conn 并将 conn 分配给它 } } } } `
-
好的,看起来不错。您是否(接近)超出您的阅读限制?它看起来确实在等待读取完成。
-
根据 bigtable 实例页面,我的 3 节点 ssd 集群可以达到 30,000 行/秒。我检查了我的 bigtable 实例的最大读取速率是 2000 行/秒
-
您能否提供有关您正在使用的客户端库版本的更多详细信息,即您的 pom.xml 中 bigtable-hbase-beam 的版本?
标签: google-cloud-platform google-cloud-dataflow google-cloud-bigtable