【问题标题】:Running Google Dataflow pipeline from a Google App Engine app?从 Google App Engine 应用程序运行 Google Dataflow 管道?
【发布时间】:2016-03-11 21:33:13
【问题描述】:

我正在使用 DataflowPipelineRunner 创建一个数据流作业。我尝试了以下场景。

  1. 不指定任何机器类型
  2. 配g1小机
  3. 使用 n1-highmem-2

在上述所有场景中,输入是来自 GCS 的文件,该文件非常小(KB 大小),输出是大查询表。

我在所有场景中都出现内存不足错误

我编译的代码大小为 94mb。我只尝试字数统计示例,它没有读取任何输入(在作业开始之前失败)。请帮助我了解为什么会出现此错误。

注意:我正在使用 appengine 来启动这项工作。

注意:相同的代码适用于测试版0.4.150414

编辑 1

根据答案中的建议尝试了以下方法,

  1. 自动缩放切换到基本缩放
  2. 使用的机器类型 B2 提供 256MB 内存

经过这些配置,Java堆内存问题就解决了。但它试图将一个 jar 上传到超过 10Mb 的暂存位置,因此它失败了。

它记录以下异常

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

我尝试直接上传 jar 文件 - appengine-api-1.0-sdk-1.9.20.jar,但它仍然尝试上传此 jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar。 我不知道它是什么罐子。任何关于这个 jar 的想法都值得赞赏。

请帮我解决这个问题。

【问题讨论】:

  • 几个问题——您使用的是什么 SDK 版本,您能否提供一个作业 ID?
  • 你能分享你得到的异常吗?您是在本地(在 appengine 上)看到 OOM,还是在管道启动后看到它?如果在 appengine 上发生 OOM,那么您可能会遇到 stackoverflow.com/questions/33647161/…
  • 我将 appengine 机器类型更改为 F2,现在我没有收到 OOM 错误。但它正在尝试暂存超过 10 毫米的文件,并且可以恢复上传。上传需要很多时间,我得到了超出执行时间限制的异常。对此有任何想法。
  • @SamMcVeety 我的 appengine SDK 是 1.9.30,Dataflow SDK 是 1.2.1。未创建作业。它在 appengine 本身中失败了。
  • @Bharathi 您是否最终列出了 filesToStage 以绕过导致失败的 appengine-api jar 上传?

标签: google-bigquery google-cloud-platform google-cloud-dataflow


【解决方案1】:

简短的回答是,如果您在 Managed VM 上使用 AppEngine,您将不会遇到 AppEngine 沙盒限制(使用 F1 or B1 instance class 时出现 OOM、执行时间限制问题、列入白名单的 JRE 类)。如果您真的想在 App Engine 沙箱中运行,那么您对 ​​Dataflow SDK 的使用最符合 AppEngine 沙箱的限制。下面我将解释常见问题以及人们为遵守 AppEngine 沙盒限制所做的工作。

Dataflow SDK 需要一个 AppEngine 实例类,该类具有足够的内存来执行用户应用程序来构建管道、暂存任何资源并将作业描述发送到 Dataflow 服务。通常我们已经看到用户需要使用具有超过 128mb 内存的 instance class 才能看不到 OOM 错误。

如果您的应用程序所需的资源已经准备好,通常构建管道并将其提交到 Dataflow 服务只需不到几秒钟的时间。将您的 JAR 和任何其他资源上传到 GCS 可能需要超过 60 秒。这可以通过预先将 JAR 预先暂存到 GCS 来手动解决(如果 Dataflow SDK 检测到它们已经存在,它将跳过暂存它们)或使用task queue 来获得 10 分钟的限制(请注意,对于大型应用程序, 10 分钟可能不足以展示所有资源)。

最后,在 AppEngine 沙盒环境中,您和您的所有依赖项都仅限于在 JRE 中使用 whitelisted 类,否则您将收到如下异常:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

编辑 1

我们在类路径上对 jar 的内容进行哈希处理,并使用修改后的文件名将它们上传到 GCS。 AppEngine 使用自己的 JAR 运行沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar 指的是 appengine-api.jar,它是沙盒环境添加的 jar。您可以从我们的PackageUtil#getUniqueContentName(...) 中看到,我们只是在 .jar 之前附加 -$HASH

我们正在努力解决您看到 RequestPayloadToLarge 异常的原因,目前建议您设置 filesToStage 选项并过滤掉执行您的数据流来解决您面临的问题。您可以看到我们如何构建文件以使用 DataflowPipelineRunner#detectClassPathResourcesToStage(...) 暂存。

【讨论】:

  • 谢谢,我试过你的建议,Java Heap size error 解决了。但面临暂存文件上传的问题。我已经在编辑中解释过了。请看一下。
  • 谢谢卢卡斯。我过滤掉了 appengine-api jar,它起作用了。如果 RequestPayloadToLarge 异常得到修复,那就太好了。否则,如果任何所需的 jar 超过 10 mb,用户将无法使用 appengine 启动管道。
  • 有没有一种简单的方法可以过滤掉单个 .jar,而不必像上面提到的那样按顺序列出所有资源?
  • 目前没有可以配置排除依赖的选项。当前唯一支持的选项是列出您需要的所有依赖项。
【解决方案2】:

我对 10MB 的限制也有同样的问题。我所做的是过滤掉大于该限制的 JAR 文件(而不是特定文件),然后将 DataflowPipelineOptions 中的重命名文件设置为 setFilesToStage

所以我只是从Dataflow SDK中复制了detectClassPathResourcesToStage的方法并进行了修改:

private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB

protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
    if (!(classLoader instanceof URLClassLoader)) {
        String message = String.format("Unable to use ClassLoader to detect classpath elements. "
                + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
        throw new IllegalArgumentException(message);
    }

    List<String> files = new ArrayList<>();
    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
        try {
            File file = new File(url.toURI());
            if (file.length() < FILE_BYTES_THRESHOLD) {
                files.add(file.getAbsolutePath());
            }
        } catch (IllegalArgumentException | URISyntaxException e) {
            String message = String.format("Unable to convert url (%s) to file.", url);
            throw new IllegalArgumentException(message, e);
        }
    }
    return files;
}

然后当我创建DataflowPipelineOptions:

DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));

【讨论】:

  • 这有助于突出对我来说至关重要的一点:10MiB 的限制是针对每个 .jar 文件,而不是针对所有 .jar 的总大小。
【解决方案3】:

这是 Helder 的 10MB 过滤 solution 的一个版本,即使它在 SDK 的未来版本中发生变化,它也会适应 DataflowPipelineOptions 的默认文件暂存行为。

它不会复制逻辑,而是将DataflowPipelineOptions 的一次性副本传递给DataflowPipelineRunner,以查看它会暂存哪些文件,然后删除任何太大的文件。

请注意,此代码假定您已经定义了一个名为 MyOptions 的自定义 PipelineOptions 类,以及一个名为 loggerjava.util.Logger 字段。

// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;

/**
 * Returns the list of .jar/etc files to stage based on the
 * Options, filtering out any files that are too large for
 * DataflowPipelineRunner.
 *
 * <p>If this accidentally filters out a necessary file, it should
 * be obvious when the pipeline fails with a runtime link error.
 */
private static ImmutableList<String> getFilesToStage(MyOptions options) {
  // Construct a throw-away runner with a copy of the Options to see
  // which files it would have wanted to stage. This could be an
  // explicitly-specified list of files from the MyOptions param, or
  // the default list of files determined by DataflowPipelineRunner.
  List<String> baseFiles;
  {
    DataflowPipelineOptions tmpOptions =
        options.cloneAs(DataflowPipelineOptions.class);
    // Ignore the result; we only care about how fromOptions()
    // modifies its parameter.
    DataflowPipelineRunner.fromOptions(tmpOptions);
    baseFiles = tmpOptions.getFilesToStage();
    // Some value should have been set.
    Preconditions.checkNotNull(baseFiles);
  }
  // Filter out any files that are too large to stage.
  ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
  for (String file : baseFiles) {
    long size = new File(file).length();
    if (size < MAX_STAGED_FILE_SIZE_BYTES) {
      filteredFiles.add(file);
    } else {
      logger.info("Not staging large file " + file + ": length " + size
          + " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
    }
  }
  return filteredFiles.build();
}

/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
    throws IOException, InterruptedException {
  // DataflowPipelineRunner can't stage large files;
  // remove any from the list.
  DataflowPipelineOptions dpOpts =
      options.as(DataflowPipelineOptions.class);
  dpOpts.setFilesToStage(getFilesToStage(options));

  // Run the pipeline as usual using "options".
  // ...
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-12-20
    • 2016-12-28
    • 2018-04-09
    • 1970-01-01
    • 2016-11-02
    相关资源
    最近更新 更多