【问题标题】:Design pattern for a data parsing&feature engineering pipeline数据解析和特征工程管道的设计模式
【发布时间】:2020-12-26 12:28:57
【问题描述】:

我知道这个问题过去曾在这些板上被问过几次,但我有一个更通用的版本,将来也可能适用于其他人的项目。

简而言之 - 我正在构建一个 ML 系统(使用 Python,但在这种情况下语言选择不是很关键),它的 ML 模型位于发生的一系列操作的末端:

  • 数据上传
  • 数据解析
  • 特征工程
  • 特征工程(在与上一步不同的逻辑括号中)
  • 特征工程(在与前面步骤不同的逻辑括号中)

...(更多步骤,如前 3 步)

  • 数据传递到 ML 模型

上述每个步骤都有自己必须采取的一系列操作,以构建正确的输出,然后将其用作下一个等的输入。这些子步骤依次可以是彼此完全解耦,或者其中一些可能需要在那个大步骤中的一些步骤,首先完成,以生成这些后续步骤使用的数据。 现在的问题是,我需要构建一个自定义管道,这将使添加新步骤(无论大小)变得非常容易,而不会破坏现有步骤。

到目前为止,我对从架构的角度来看这可能是什么样子有这样的概念,如下所示:

在查看此架构时,我立即想到了一种责任链设计模式,它管理 BIG STEPS (1, 2, ..., n),每个 BIG STEPS 都有自己的小版本 Chain责任发生在他们的内心,这对于 NO_REQ 步骤独立发生,然后对于 REQ 步骤(REQ 步骤循环直到它们全部完成)。通过在大小步骤中运行逻辑的共享接口,它可能会运行得相当整洁。

但是,我想知道,是否有更好的方法呢?此外,我不喜欢责任链,它需要一个人添加新的 BIG/SMALL 步骤,始终编辑逻辑设置步骤包的“胆量”,手动包含新添加的步骤。我很想构建一些东西,它只会扫描每个 BIG STEP 下特定于步骤的文件夹,并自行构建 NO_REQ 和 REQ 步骤列表(以维护 Open/Closed SOLID 原则)。

如有任何想法,我将不胜感激。

【问题讨论】:

    标签: python parsing machine-learning design-patterns pipeline


    【解决方案1】:

    我最近做了类似的事情。挑战是允许“步骤”轻松插入而无需太多更改。所以,我所做的是,如下所示。核心思想是定义接口“进程”并修复输入和输出格式。

    代码:

    class Factory:
        def process(self, input):
            raise NotImplementedError
    
    class Extract(Factory):
        def process(self, input):
            print("Extracting...")
            output = {}
            return output
    
    class Parse(Factory):
        def process(self, input):
            print("Parsing...")
            output = {}
            return output
    
    class Load(Factory):
        def process(self, input):
            print("Loading...")
            output = {}
            return output
    
    pipeline = {
        "Extract" : Extract(),
        "Parse" : Parse(),
        "Load" : Load(),
    }
    
    input_data = {} #vanilla input
    for process_name, process_instance in pipeline.items():
        output = process_instance.process(input_data)
        input_data = output
    

    输出:

    Extracting...
    Parsing...
    Loading...
    

    所以,如果你需要在解析后添加一个'step','append_headers',你需要做的就是,

    #Defining a new step.
    class AppendHeaders(Factory):
        def process(self, input):
            print("adding headers...")
            output = {}
            return output
    
    pipeline = {
        "Extract" : Extract(),
        "Append headers": AppendHeaders(), #adding a new step
        "Parse" : Parse(),
        "Load" : Load(),
    }
    

    新输出:

    Extracting...
    adding headers...
    Parsing...
    Loading...
    

    在您的情况下,您可能想要扫描特定文件夹的 REQ/NOT_REQ 的附加要求,可以添加为 json 中的字段并将其加载到管道中,这意味着,仅当标志设置为 REQ 时才创建那些“步骤”对象.

    不确定这个想法有多大帮助。想,我会传达我的想法。

    【讨论】:

    • 感谢您的回复drd,但它并没有解决我在添加新的小步骤时打破打开/关闭原则的担忧。大/小步骤之间的 process() 方法的共享接口肯定会减少添加新步骤的麻烦,但它仍然需要我,例如在添加新的小步骤时修改大步骤的逻辑,以及在添加新的大步骤时修改 CoR 的逻辑。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-20
    • 1970-01-01
    • 1970-01-01
    • 2010-10-17
    • 1970-01-01
    相关资源
    最近更新 更多