【问题标题】:Python Azure Data Factory Update PipelinePython Azure 数据工厂更新管道
【发布时间】:2019-10-16 16:06:30
【问题描述】:

我想使用 Python 将活动添加到 Azure 数据工厂中的管道。使用以下代码,我将替换实际活动,但不添加新活动:

p_name = 'test'
act_name = 'Wait4'

Wait_activity = WaitActivity(name=act_name,wait_time_in_seconds=5)


p_obj = PipelineResource(activities=[Wait_activity])
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)

这是运行代码之前的管道:

运行代码后:

预期:

【问题讨论】:

    标签: python azure azure-data-factory


    【解决方案1】:

    研究了源码中的语句:

    因此,当您更新管道时,activities 属性应该是管道中的活动列表,而不是单个。

    例如:

    wait_activity = WaitActivity(name="waittest", type="Wait", wait_time_in_seconds=100, )
    ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]
    wait_activity1 = WaitActivity(name="waittest1", type="Wait", wait_time_in_seconds=100,depends_on=ActivityDependency)
    
    
    p_name = 'testforadf'
    p_obj = PipelineResource(
            activities=[wait_activity, wait_activity1])
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    

    请注意两行:

    activities=[wait_activity, wait_activity1])
    

    此属性应包含您的所有活动。

    ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]
    

    这是你的活动之间的依赖条件。

    我的输出:

    如有任何疑问,请告诉我。


    好吧,请看我的示例代码:

    前提是我已经有了以上两个等待活动

    adftest = adf_client.pipelines.get(rg_name,df_name,p_name)
    print(adftest)
    for activity in adftest.activities :
        print(activity.name)
        print(activity.type)
    

    然后输出是:

    {'additional_properties': None, 'id': '/subscriptions/b83c1ed3-c5b6-44fb-b5ba-2b83a074c23f/resourceGroups/v-jugong-ChinaCXPTeam/providers/Microsoft.DataFactory/factories/jaygongadf/pipelines/testforadf', 'name': 'testforadf', 'type': 'Microsoft.DataFactory/factories/pipelines', 'etag': 'ed006cf3-0000-0800-0000-5da970600000', 'description': None, 'activities': [<azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0>, <azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0>], 'parameters': None, 'variables': None, 'concurrency': None, 'annotations': None, 'folder': None}
    waittest
    Wait
    waittest1
    Wait
    

    然后你可以看到上面activities属性中的对象。此外,你可以看到他们的类型:'activities': [&lt;azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0&gt;, &lt;azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0&gt;]

    它们是 WaitActivity 类型,因此您可以查看它们的循环活动以获取其中的每个项目:

    for activity in adftest.activities :
            print(activity.name)
            print(activity.type)
    

    你可以在源码语句中查看WaitActivity类型包含哪些属性,比如name,type。(我用Pycharm测试代码,IDE可以直接检测源码)

    那如果你想多加一个activity,比如多一个WaitActivity:

    wait_activity2 = WaitActivity(name="waittest2", type="Wait", wait_time_in_seconds=100, )
    adftest.activities.append(wait_activity2)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, adftest)
    

    请参见上面的代码,我创建了一个名为 wait_activity2 的新 WaitActivity,然后将其附加到 activities 数组中。然后像往常一样更新管道,你会发现新的活动:

    【讨论】:

    • 这不是我想要的。假设我有一个已经存在多个 activity 的管道,并且我想使用 Python 向该管道添加一个 activity。我不想重新创建整个管道。因此,对于您的解决方案,如果我是对的,您建议我获取管道的所有当前 活动 并重新创建整个管道。那么有没有办法获取具有 Pipeline 的所有属性的 activities 列表?
    • 首先,sdk只提供create_or_update方法,没有appendadd等方法。我认为您可以使用adf_client.pipelines.get() 获取管道,然后替换其中的activities 属性,然后更新整个管道。
    • 获取管道后,可以获取activities的值,然后替换该值并更新管道。
    • 如果您已经有一些活动,activities 属性应该是一个数组。只需将新活动附加到其中并更新管道。希望我在这里很清楚。
    • 是的,谢谢您提供所有这些信息,我将尝试adf_client.pipelines.get() 方法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-10-24
    • 2020-08-15
    • 1970-01-01
    • 1970-01-01
    • 2021-07-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多