【问题标题】:Rule-based mapping on Copy Activity in Azure Data FactoryAzure 数据工厂中复制活动的基于规则的映射
【发布时间】:2021-04-28 19:02:52
【问题描述】:

我在 Azure 数据工厂上使用复制数据活动时尝试创建动态映射。 我想创建一个 parquet 文件,其中包含我从源读取的相同数据,但我想修改一些列名称以删除其上的空格 (It's a bug of Parquet format),我想自动执行此操作。

I have seen that this is possible in mapping data flow,但我在 Copy Activity 上看不到任何此类功能(映射数据流仅限于几个连接器作为源,因此我无法使用它)。

正如您在图像上看到的,我似乎只能修改个别列,而不是其中一些满足某些条件的列

我该怎么做?

提前致谢

【问题讨论】:

    标签: parquet azure-data-factory


    【解决方案1】:

    这是一种使用 ADF 应用动态列名映射的解决方案,这样即使源列名包含不支持的讨厌的空白字符,您仍然可以使用 parquet 格式的复制数据活动。

    解决方案涉及三个部分:

    1. 动态生成映射列名列表。下面的示例演示了如何使用查找活动(以下称为“查找列映射”)对 SQL 数据库表源数据集中的空白进行动态编码。
    ;with cols as (
    select 
        REPLACE(column_name, (' ', '__wspc__') as new_name,
        column_name as old_name
    from INFORMATION_SCHEMA.columns
    where table_name = '@{pipeline().parameters.SOURCE_TABLE}'
    and table_schema = '@{pipeline().parameters.SOURCE_SCHEMA}'
    )
    select ' |||'+old_name+'||'+new_name+'|' as mapping
    from cols;
    
    1. 使用表达式将步骤 1 中查找活动中派生的列映射重新打包为复制数据活动模板所期望的 json 语法。您可以将其插入到具有 Array 类型变量的集合变量活动中(以下称为“column_mapping_list”)。
    @json(
        concat(
            '[ ',
            join(
                split(
                    join(
                        split(
                            join(
                                split(
                                    join(
                                        xpath(
                                            xml(
                                                json(
                                                    concat( 
                                                    '{\"root_xml_node\": ', 
                                                        string(activity('lookup column mapping').output),
                                                    '}' 
                                                    )
                                                )
                                            ),
                                        '/root_xml_node/value/mapping/text()',
                                        )
                                    ','
                                    ),
                                '|||'
                                ),
                            '{\"source\": { \"name\": \"'
                            ), 
                        '||'
                        ), 
                    '\" },\"sink\": { \"name\": \"'
                    ),
                '|'
                ), 
            '\" }}'
            ),  
        ' ]'
        )
    )
    

    不幸的是,表达式比我们想要的更复杂,因为 xpath 函数需要一个查找活动输出未提供的单个根节点,并且 ADF json 模板的字符串转义对简化这一点提出了一些挑战。

    1. 最后,在复制数据活动的映射部分使用列映射列表变量作为“动态内容”,表达式如下
    @json(
        concat(
            '{ \"type\": \"TabularTranslator\", \"mappings\":',
            string(variables('column_mapping_list')),
        '}'
        )
    )
    

    预期结果:
    第 1 步。
    '我的 wspccol' -> '|||我的 wspccol||my__wspc__wspcol|'

    第 2 步。
    '|||我的 wspccol||my__wspc__wspccol|' -> ['{ "source": { "name": "my wspccol" }, "sink": { "name": "my__wspc__wspccol" } }']

    第 3 步。

    { 
        "type": "TabularTranslator", 
        "mappings": [
            { 
                "source": { "name": "my wspccol" }, 
                "sink": { "name": "my__wspc__wspccol" } 
            }
        ] 
    }
    

    另外:

    【讨论】:

      【解决方案2】:

      复制活动可以从一种文件类型更改为另一种文件类型,例如 csv 到 json,parquet 到数据库,但它本质上不允许任何转换,例如更改任何列的内容,甚至添加额外的列。

      或者考虑将ADF to call a Databricks notebook 用于这些基于规则的复杂转换。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-01-26
        • 1970-01-01
        • 2021-10-02
        • 2022-01-04
        • 1970-01-01
        • 2020-09-15
        • 1970-01-01
        • 2019-03-28
        相关资源
        最近更新 更多