【问题标题】:How to use Azure Data Factory for importing data incrementally from MySQL to Azure Data Warehouse?如何使用 Azure 数据工厂将数据从 MySQL 增量导入 Azure 数据仓库?
【发布时间】:2017-11-02 06:36:48
【问题描述】:

我正在使用 Azure 数据工厂定期将数据从 MySQL 导入 Azure SQL 数据仓库。

数据通过 Azure 存储帐户上的暂存 blob 存储,但是当我运行管道时它失败了,因为它无法将 blob 文本分离回列。管道尝试插入目标的每一行都变成一个长字符串,其中包含由“⯑”字符分隔的所有列值。

我之前使用过数据工厂,没有尝试增量机制,效果很好。我看不出它会导致这种行为的原因,但我可能遗漏了一些东西。

我附上了描述管道的 JSON 并进行了一些小的命名更改,如果您看到任何可以解释这一点的内容,请告诉我。

谢谢!

编辑:添加异常消息:

执行失败 数据库操作失败。错误信息来自 数据库执行: ErrorCode=FailedDbOperation,'Type=Microsoft.DataTransfer.Com‌​mon.Shared.HybridDel‌​iveryException,Messa‌​ge=Error 将数据加载到 SQL 数据时发生 Warehouse.,Source=Microsoft.DataTransfer.ClientLibrary,''Typ‌​e=System.Data.SqlCli‌​ent.SqlException,Mes‌​sage=Query 中止 - 达到最大拒绝阈值(0 行),同时 从外部源读取:总共 1 行中拒绝了 1 行 处理。 (/f4ae80d1-4560-4af9-9e74-05de941725ac/Data.8665812f-fba1-40‌​7a-9e04-2ee5f3ca5a7e‌​.txt) 列序号:27,预期数据类型:VARCHAR(45) 整理 SQL_Latin1_General_CP1_CI_AS,违规值:* ROW OF VALUES *(标记化失败),错误:此列中的列不足 行。,},],'。

{
"name": "CopyPipeline-move_incremental_test",
"properties": {
    "activities": [
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "RelationalSource",
                    "query": "$$Text.Format('select * from [table] where InsertTime >= \\'{0:yyyy-MM-dd HH:mm}\\' AND InsertTime < \\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)"
                },
                "sink": {
                    "type": "SqlDWSink",
                    "sqlWriterCleanupScript": "$$Text.Format('delete [schema].[table] where [InsertTime] >= \\'{0:yyyy-MM-dd HH:mm}\\' AND [InsertTime] <\\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)",
                    "allowPolyBase": true,
                    "polyBaseSettings": {
                        "rejectType": "Value",
                        "rejectValue": 0,
                        "useTypeDefault": true
                    },
                    "writeBatchSize": 0,
                    "writeBatchTimeout": "00:00:00"
                },
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": "column1:column1,column2:column2,column3:column3"
                },
                "enableStaging": true,
                "stagingSettings": {
                    "linkedServiceName": "StagingStorage-somename",
                    "path": "somepath"
                }
            },
            "inputs": [
                {
                    "name": "InputDataset-input"
                }
            ],
            "outputs": [
                {
                    "name": "OutputDataset-output"
                }
            ],
            "policy": {
                "timeout": "1.00:00:00",
                "concurrency": 10,
                "style": "StartOfInterval",
                "retry": 3,
                "longRetry": 0,
                "longRetryInterval": "00:00:00"
            },
            "scheduler": {
                "frequency": "Hour",
                "interval": 1
            },
            "name": "Activity-0-_Custom query_->[schema]_[table]"
        }
    ],
    "start": "2017-06-01T05:29:12.567Z",
    "end": "2099-12-30T22:00:00Z",
    "isPaused": false,
    "hubName": "datafactory_hub",
    "pipelineMode": "Scheduled"
}

}

【问题讨论】:

  • 你能提供更多的步骤和例外吗?
  • 执行失败 数据库操作失败。来自数据库执行的错误消息:ErrorCode=FailedDbOperation,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=将数据加载到 SQL 数据仓库时发生错误。,Source=Microsoft.DataTransfer.ClientLibrary,''Type=System。 Data.SqlClient.SqlException,Message=Query aborted - 从外部源读取时达到最大拒绝阈值(0 行):在处理的总共 1 行中拒绝 1 行。 (/f4ae80d1-4560-4af9-9e74-05de941725ac/Data.8665812f-fba1-407a-9e04-2ee5f3ca5a7e.txt)
  • 列序号:27,预期数据类型:VARCHAR(45) 整理 SQL_Latin1_General_CP1_CI_AS,违规值:* TOO MANY CHARACTERS FOR RESPONSE*(标记化失败),错误:此行中的列数不足。} ,],'.
  • 关于我采取的步骤:我正在完成管道创建向导,通过 Azure 中的暂存 blob 存储创建具有 MySQL 源和 Azure 数据仓库目标的计划管道。生成的管道在上面的 JSON 中进行了描述。
  • 请将详细信息添加到有助于社区理解的问题中。正如保罗安德鲁所说,数据似乎格式不正确。请尝试使用自定义活动。

标签: azure azure-sql-database azure-storage azure-blob-storage azure-data-factory


【解决方案1】:

听起来您的做法是正确的,但数据格式不正确(常见问题,没有 UTF-8 编码),因此 ADF 无法按您的要求解析结构。当我遇到这种情况时,我经常需要向管道中添加一个自定义活动来清理和准备数据,以便下游活动可以以结构化的方式使用它。不幸的是,这在解决方案的开发中是一个很大的开销,并且需要您编写一个 C# 类来处理数据转换。

另外请记住,ADF 没有自己的计算,它只调用其他服务,因此您还需要 Azure Batch 服务来执行编译代码。

遗憾的是,这里没有神奇的解决方法。 Azure 非常适合提取和加载完美结构化的数据,但在现实世界中,我们需要其他服务来进行转换或清理,这意味着我们需要一个可以 ETL 的管道,或者我更喜欢 ECTL。

以下是创建 ADF 自定义活动的链接以帮助您入门:https://www.purplefrogsystems.com/paul/2016/11/creating-azure-data-factory-custom-activities/

希望这会有所帮助。

【讨论】:

    【解决方案2】:

    在使用数据工厂 v.2 使用暂存(这意味着 Polybase)从 Azure sql db 导入到 Azure DWH 时,我一直在苦苦挣扎。我了解到 Polybase 将失败并显示与不正确的数据类型等相关的错误消息。我收到的消息与提到的here 非常相似,即使我不是直接从 SQL 使用 Polybase,而是通过数据工厂。

    无论如何,对我来说,解决方案是避免十进制或数字类型的列使用 NULL 值,例如ISNULL(mynumericCol, 0) 作为 mynumericCol。

    【讨论】:

      猜你喜欢
      • 2019-08-05
      • 2020-06-25
      • 2021-12-21
      • 1970-01-01
      • 1970-01-01
      • 2023-04-11
      • 2023-01-30
      • 2020-07-06
      • 1970-01-01
      相关资源
      最近更新 更多