【问题标题】:DataFlow DoFn hangs unexpectedly when reading from bigtable从 bigtable 读取时,DataFlow DoFn 意外挂起
【发布时间】:2019-10-28 14:00:20
【问题描述】:

我们的数据流管道有一个 DoFn,它使用 hbase multiget 客户端 api 从 bigtable 读取。这似乎会导致数据流在以下堆栈中随机停止:

处理卡在步骤 AttachStuff/BigtableAttacher 至少 04h10m00s 没有在状态过程中输出或完成 在 sun.misc.Unsafe.park(本机方法) 在 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 在 com.google.bigtable.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:523) 在 com.google.bigtable.repackaged.com.google.api.core.AbstractApiFuture.get(AbstractApiFuture.java:56) 在 com.google.cloud.bigtable.hbase.BatchExecutor.batchCallback(BatchExecutor.java:276) 在 com.google.cloud.bigtable.hbase.BatchExecutor.batch(BatchExecutor.java:239) 在 com.google.cloud.bigtable.hbase.AbstractBigtableTable.get(AbstractBigtableTable.java:241) 在 com.askscio.google.docbuilder.BigtableAnchorsAttacher.getAnchors(BigtableAnchorsAttacher.java:86) 在 com.askscio.google.docbuilder.BigtableAnchorsAttacher.process(BigtableAnchorsAttacher.java:129) 在 com.askscio.docbuilder.core.ScioDoFn.processWithErrorHandling(ScioDoFn.java:39) 在 com.askscio.google.docbuilder.BigtableAnchorsAttacher$DoFnInvoker.invokeProcessElement(未知来源)

我们正在使用 Beam 库 2.12.0。 DoFn 在 StartBundle 中初始化 bigtable 连接。

每次 DoFn 调用从 bigtable 中查找不超过 10 个键

它的单个集群、3 个节点和 SSD。存储利用率为 2.2 GB,最大节点 CPU 利用率为 13%,最大读/写速率为 2000 次读取/秒和 1000 次写入/秒

开始捆绑:

bigtableConn = BigtableConfiguration.connect(
    config.getString(ConfigKeys.Google.PROJECT_ID),
    config.getString(ConfigKeys.Google.INSTANCE_ID)
);
fooTable = bigtableConn.getTable(TableName.valueOf(BigtableDocumentStore.FOO_TABLE_NAME));

过程:

List<Get> gets = Lists.newArrayList();
// keys are no more than 10
for (String s : keys) {
   Get get = new Get(Bytes.toBytes(s))
                     .addFamily(Bytes.toBytes(BigtableDocumentStore.FOO_COLUMN_FAMILY))
                        .setMaxVersions(1);
   gets.add(get);
}
Result[] results= fooTable.get(gets);

拆解:

fooTable.close();
bigTableConn.close();

【问题讨论】:

  • 请注意 startBundle 应与 finishBundle 配对,并与拆卸一起设置。鉴于 startBundle 是按包调用的,而 teardown 是按 DoFn 实例调用的,也许您正在打开而不是关闭太多的 bigtable 连接?
  • 所以实际的 startBundle 代码如下所示。鉴于我们仅在变量为 null 时创建连接,这种不匹配不应该导致连接泄漏,对吧? ` class Foo { 静态连接 conn=null; @StartBundle public void startBundle(StartBundleContext bundleContext) throws Exception { synchronized (Foo.class) { if (conn == null) { // 创建 bigtable conn 并将 conn 分配给它 } } } } `
  • 好的,看起来不错。您是否(接近)超出您的阅读限制?它看起来确实在等待读取完成。
  • 根据 bigtable 实例页面,我的 3 节点 ssd 集群可以达到 30,000 行/秒。我检查了我的 bigtable 实例的最大读取速率是 2000 行/秒
  • 您能否提供有关您正在使用的客户端库版本的更多详细信息,即您的 pom.xml 中 bigtable-hbase-beam 的版本?

标签: google-cloud-platform google-cloud-dataflow google-cloud-bigtable


【解决方案1】:

我建议将连接管理移至 @Setup & Teardown 并使用引用计数,以防您使用多核工作器。

Bigtable 连接非常重,并且旨在每个进程都是单例的。 BigtableConfiguration.connect()返回的HBase连接对象,实际上包裹了一个grpc通道池,每个cpu有2个通道,构建起来非常昂贵。

您有几个选项可以改进您的管道:

  1. 将配置选项“google.bigtable.use.cached.data.channel.pool”设置为“true”,这将重用内部连接池

  2. 在你的 DoFn 中做这样的事情:

    // instance vars
    static Object connectionLock = new Object();
    static Connection bigtableConn = null;
    
    // @Setup
    synchronized(connectionLock) {
      if (numWorkers++ == 0) {
        bigtableConn = BigtableConfiguration.connect(...);
      } 
    }
    
    // @Teardown
    synchronized(connectionLock) {
      if (--numWorkers == 0) {
        bigtableConn.close();
      } 
    }
    

【讨论】:

    猜你喜欢
    • 2020-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多