【问题标题】:Chaining MapReduces - Google AppEngine链接 MapReduce - Google App Engine
【发布时间】:2017-03-26 17:43:35
【问题描述】:

我正在尝试通过在管道中链接,将 reduce 的输出发送到地图,类似于这个人: I would like to chain multiple mapreduce jobs in google app engine in Python 我尝试了他的解决方案,但没有奏效。 我的管道的流程是:
地图1
减少1
地图2
减少2
我将 Reduce1 的输出保存到 blob_key 下的 blobstore,然后尝试从 Map2 访问 blob。但是在执行第二张地图时出现以下错误:"BadReaderParamsError: Could not find blobinfo for key <blob_key here>"

这是管道代码:

class SongsPurchasedTogetherPipeline(base_handler.PipelineBase):

  def run(self, filekey, blobkey):
    bucket_name = app_identity.get_default_gcs_bucket_name()
    intermediate_output = yield mapreduce_pipeline.MapreducePipeline(
        "songs_purchased_together_intermediate",
        "main.songs_purchased_together_map1",
        "main.songs_purchased_together_reduce1",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.GoogleCloudStorageOutputWriter",
        mapper_params={
            "blob_keys": blobkey,
        },
        reducer_params={
            "output_writer": {
                "bucket_name": bucket_name,
                "content_type": "text/plain",
            }
        },
        shards=1)
    yield StoreOutput("SongsPurchasedTogetherIntermediate", filekey, intermediate_output)

    intermediate_output_key = yield BlobKey(intermediate_output)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "songs_purchased_together",
        "main.songs_purchased_together_map2",
        "main.songs_purchased_together_reduce2",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.GoogleCloudStorageOutputWriter",
        mapper_params=(intermediate_output_key),
        reducer_params={
            "output_writer": {
                "bucket_name": bucket_name,
                "content_type": "text/plain",
            }
        },
        shards=1)
    yield StoreOutput("SongsPurchasedTogether", filekey, output)

这是 BlobKey 类,它接受中间输出并生成供 Map2 使用的 blob 密钥:

class BlobKey(base_handler.PipelineBase):

  def run(self, output):
    blobstore_filename = "/gs" + output[0]
    blobstore_gs_key = blobstore.create_gs_key(blobstore_filename)
    return {
      "blob_keys": blobstore_gs_key
    }

StoreOutput 类与 Google 的 MapReduce 演示 https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/demo/main.py 中的类相同,并且与 BlobKey 类执行相同的操作,但另外将 blob 的 URL 作为链接发送到 HTML。

手动访问 URL appname/blobstore/<blob_key>,通过在浏览器中键入它(Reduce1 成功后,但 Map2 失败)显示 Reduce1 的预期输出。为什么 Map2 找不到 blob?抱歉,我是 AppEngine 的新手,我可能在某个地方出错,因为我不完全了解 blob 存储。

【问题讨论】:

    标签: python google-app-engine mapreduce


    【解决方案1】:

    好的,我发现 Google 已从 GAE GitHub 存储库的标准编写器列表中删除了 BlobstoreOutputWriter,这使事情变得更复杂了。我必须写信到 Google Cloud Store,然后从那里阅读。我编写了一个帮助类,它为 GoogleCloudStorageInputReader 生成映射器参数。

    class GCSMapperParams(base_handler.PipelineBase):
    
      def run(self, GCSPath):
        bucket_name = app_identity.get_default_gcs_bucket_name()
        return {
                "input_reader": {
                    "bucket_name": bucket_name,
                    "objects": [path.split('/', 2)[2] for path in GCSPath],
                }
            }
    

    该函数将使用 GoogleCloudStorageOutputWriter 的一个 MapReduce 阶段的输出作为参数,并返回一个字典,该字典可以分配给下一个 MapReduce 阶段的 mapper_params。

    基本上,第一个 MapReduce 阶段的输出值是一个包含<app_name>/<pipeline_name>/key/output-[i] 的列表,其中 i 是分片的数量。为了使用 GoogleCloudStorageInputReader,数据的密钥应该通过mapper_params 中的变量objects 传递。键必须是key/output-[i] 的形式,所以助手类简单地从中删除<app_name>/<pipeline_name>/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-07-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-07-09
      相关资源
      最近更新 更多