【问题标题】:Mapreduce on GAE Python - Cause ReducePipeline to issue callback on finalize?GAE Python 上的 Mapreduce - 导致 ReducePipeline 在完成时发出回调?
【发布时间】:2013-12-19 18:01:01
【问题描述】:

我想在 mapreduce 作业完成/完成后执行自定义回调函数。

我为这个问题找到的唯一有用的参考是a somewhat outdated Google site 和一个相关的,但又看似过时的Stackoverflow question

这两个来源都假设我使用 control.start_map 来启动 Mapreduce 作业,并依赖于 start_map 接受关键字参数 mapreduce_parameters 的事实,其中可以指定 done_callback 参数来指定 URL应该在完成时调用。但是,我使用了另一种方法(afaik 是最近的首选方法),其中自定义管道的 run 方法会生成 Mapreduce 管道:

yield mapreduce_pipeline.MapreducePipeline(
    "word_count",
    "main.word_count_map",
    "main.word_count_reduce",
    "mapreduce.input_readers.BlobstoreZipInputReader",
    "mapreduce.output_writers.BlobstoreOutputWriter",
    mapper_params={
        "blob_key": blobkey,
    },
    reducer_params={
        "mime_type": "text/plain",
    },
    shards=16)

MapreducePipeline 的签名不允许mapreduce_parameters 参数。我可以在源代码中看到对回调的引用的唯一地方是mapper_pipeline.MapperPipeline.run,但它似乎只在内部使用。

那么,有没有办法在其中获取回调参数?

如果没有,是否有人对在何处以及如何扩展库以提供此类功能有好的想法?

【问题讨论】:

    标签: python google-app-engine mapreduce


    【解决方案1】:

    我将我的 Mapreduce 管道范例设置为如下所示:

    class MRRecalculateSupportsPipeline(base_handler.PipelineBase):
    
        def run(self, user_key):
            # ...
            yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports',
                    'myapp.mapreduces.user_recalculate_supports_map',
                    'myapp.mapreduces.user_recalculate_supports_reduce',
                    'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None,
                    mapper_params={"""..."""})
    

    如果您想捕获此管道的完成情况,您有两种选择。

    A) 使用 pipeline.After 在 MR 流水线完成后运行完成流水线。

            pipe_future = yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports',
                    'myapp.mapreduces.user_recalculate_supports_map',
                    'myapp.mapreduces.user_recalculate_supports_reduce',
                    'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None,
                    mapper_params={"""..."""})
            with pipeline.After(pipe_future):
                yield CalcCompletePipeline(...)  # this could be a mapreduce pipeline, or any pipeline using the same base_handler.PipelineBase parent class.
    

    B) 使用顶级管道的finalized 方法来处理完成。就个人而言,我会坚持使用选项 A,因为您可以在 /_ah/*/status?root= 视图中跟踪路径。

    class EmailNewReleasePipeline(base_handler.PipelineBase):
        """Email followers about a new release"""
        # TODO: product_key is the name of the parameter, but it's built for albums ...
    
        def run(self, product_key, testing=False):
                # Send those emails ...
                yield mapreduce_pipeline.MapreducePipeline(...)
    
        def finalized(self):
            """Save product as launched"""
            ...
            product.launched = True
            product.put()
    

    这是finalization of a pipeline 上的文档。

    【讨论】:

    • 您知道 pipeline.After 到底是什么或为什么需要它吗?以我的经验,简单地产生后处理管道将在主管道完成后执行其代码。这也是GAE MR Hello World app中的输出文件blobkey的检索方式,所以我没有考虑。
    • pipeline.After 是确保产生的管道的正确执行顺序所必需的。到目前为止,我最喜欢的 Pipeline 功能之一,在文档中:code.google.com/p/appengine-pipeline/wiki/…
    【解决方案2】:

    对于这个问题,至少投资不多的解决方法是简单地产生另一个 Map/Mapreduce 管道来执行所需的后处理。

    例如:

    class MainPipeline(base_handler.PipelineBase):
        def run(self):
            mapper_params = { ... }
            reducer_params = { ... }
            yield mapreduce_pipeline.MapReducePipeline(
                ...,
                mapper_params=mapper_params,
                reducer_params=reducer_params)
            yield PostprocessPipeline(reducer_params)
    
    
    class PostprocessPipeline(base_handler.PipelineBase):
        def run(self, reducer_params):
            do_some_postprocessing(reducer_params)
    

    该解决方法无法访问 Mapreduce 状态,我想可以从管道 ID 中以某种方式检索到该状态,但我还不清楚如何。因此,您必须设置另一个标志/memcache/ds 条目来检查管道是否成功完成(如果这与后处理相关)。

    【讨论】:

    • 为此目的使用 memcache 令人担忧 - 永远无法保证结果可用。但是,我同意产生另一个管道的想法。上面肯定缺少的一件事是使用pipeline.After((yield mapreduce_pipeline.MapReducePipeline...
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-20
    • 2018-12-29
    • 1970-01-01
    • 1970-01-01
    • 2023-03-31
    • 2013-08-18
    相关资源
    最近更新 更多