【问题标题】:Execute stored procedure from Azure datafactory从 Azure 数据工厂执行存储过程
【发布时间】:2017-12-20 10:07:15
【问题描述】:

我正在尝试从 Azure DataFactory V2 执行 Azure SQL 数据库中的存储过程。该过程将使用平面表中的数据对不同的表进行一些更新插入。根据MS specifications,您需要有一个表值参数来制作这样的东西,但这会将管道活动与过程和所有模型耦合起来。有什么方法可以定义数据集和复制活动,使其只执行存储过程?

以下 json 来自 arm 模板:

DataSet:    
{"type": "datasets",
          "name": "AzureSQLProcedureDS",
          "dependsOn": [
            "[parameters('dataFactoryName')]",
            "[parameters('destinationLinkedServiceName')]"
          ],
          "apiVersion": "[variables('apiVersion')]",
          "properties": {
            "type": "AzureSqlTable",
            "linkedServiceName": {
              "referenceName": "[parameters('destinationLinkedServiceName')]",
              "type": "LinkedServiceReference"
            },
            "typeProperties": {
              "tableName": "storedProcedureExecutions"
            }
          }}




    Activity:
    {"name": "ExecuteHarmonizationProcedure",
                    "description": "Executes the procedure that Harmonizes the Data",
                    "type": "Copy",
                    "inputs": [
                      {
                        "referenceName": "[parameters('destinationDataSetName')]",
                        "type": "DatasetReference"
                      }
                    ],
                    "outputs": [
                      {
                        "referenceName": "AzureSQLProcedureDS",
                        "type": "DatasetReference"
                      }
                    ],
                    "typeProperties": {
                      "source": {
                        "type": "SqlSink"
                      },
                      "sink": {
                        "type": "SqlSink",
                        //"SqlWriterTableType": "storedProcedureExecutionsType",
                        "SqlWriterStoredProcedureName": "@Pipeline().parameters.procedureName",
                        "storedProcedureParameters": {
                          "param1": {
                            "value": "call from adf" 
                          }
                        }
                      }
                    }
}

考虑到 MS 没有为此主题提供太多帮助,我们将不胜感激。

【问题讨论】:

  • 您能描述一下您遇到困难的地方以及收到的错误信息吗?
  • 目的是找到不需要创建虚拟表的东西,因为在这种情况下,这更容易发生 sql 注入。如果有办法只指定过程的名称和所需的参数,那就更好了。

标签: azure-sql-database azure-data-factory


【解决方案1】:

我不确定我是否正确理解了这个问题,您只是想从复制活动中调用存储过程?

这样做很容易,在复制活动中,您可以在源代码中定义 sqlReaderQuery 属性。这个属性可以让你输入一个 t-sql 命令,所以你可以做这样的事情:

 "typeProperties": {
        "source": {
            "type": "SqlSource",
            "sqlReaderQuery": "EXEC sp_Name; select 1 as test"
        },
 . . .

复制活动总是需要查询的结果,所以如果你只包括对存储过程的调用,那不是我包括查询的第二部分的原因。

替换成你要使用的参数就可以了。

【讨论】:

  • 我试过了,还是不行。该活动运行了约 4 分钟,然后失败,因为它尝试将结果(可能是 select 语句)映射到源/目标。
  • 是的“选择 1 作为测试”将返回一个名为 test 的列,其中只有一个 1 作为值。创建一个虚拟数据集或更改查询以满足您的需求,我给您的示例只是为了展示如何从复制活动中调用存储过程..
  • 我按照你的建议做了,它奏效了,但正如我在下面发布的那样,它更像是一种解决方法而不是直接解决方案。如果没有其他方法,至少在 DF v2 的当前状态下,我将接受您的回答作为解决方案。
  • 刚刚测试了调用 azure 集成服务(您可以使用 ADFv2 运行)的 SP 的这些步骤,并且成功了!官方文档说您需要使用存储过程活动从 ADF 版本 1 调用它。鉴于您只使用一个数据工厂实例,这看起来是一种更好的方法。
【解决方案2】:

按照@Martin 的建议,我们设法使执行工作正常进行。以下是我们所做的:

  1. 在 SQL 中创建虚拟表:

    CREATE TABLE [dbo].[dummyTable]( [col1] [NVARCHAR](100) NULL)
    
  2. 创建存储过程:

    CREATE PROCEDURE [dbo].[sp_testHarmonize]
         @param1 NVARCHAR(200)
    AS
    BEGIN
        INSERT INTO storedProcedureExecutions
        VALUES (@param1, GETDATE());
    END
    
  3. 存储过程的数据集:

    {
        "type": "datasets",
        "name": "[parameters('dummySQLTableDataSet')]",
        "dependsOn": ["[parameters('dataFactoryName')]",
        "[parameters('datalakeLinkedServiceName')]"],
        "apiVersion": "[variables('apiVersion')]",
        "properties": {
                "type": "AzureSqlTable",
                "linkedServiceName": {
                    "referenceName": "[parameters('databaseLinkedServiceName')]",
                    "type": "LinkedServiceReference"
                },
        "typeProperties": {
                "tableName": "dummyTable"
        }
    }
    
  4. 管道活动:

    {
        "name": "ExecuteHarmonizationProcedure",
        "dependsOn": [{
            "activity": "CopyCSV2SQL",
            "dependencyConditions": ["Succeeded"]
        }],
        "description": "Executes the procedure that Harmonizes the Data",
        "type": "Copy",
        "inputs": [{
            "referenceName": "[parameters('dummySQLTableDataSet')]",
            "type": "DatasetReference"
        }],
        "outputs": [{
            "referenceName": "[parameters('dummySQLTableDataSet')]",
            "type": "DatasetReference"
        }],
        "typeProperties": {
            "source": {
                "type": "SqlSource",
                "sqlReaderQuery": "@Pipeline().parameters.SQLCommand"
            },
            "sink": {
                "type": "SqlSink"
            }
        }
    }
    
  5. 使用 SQL 命令的以下参数运行管道:

    $"EXEC sp_testHarmonize 'call from ADF at {DateTime.Now}'; select top 1 * from dummyTable;"
    

这使它起作用了,但考虑到它是在虚拟表上插入一行,它看起来更像是一种解决方案而不是直接解决方案。如果没有更直接的解决方案,这是最简单的方法。

【讨论】:

    猜你喜欢
    • 2021-05-03
    • 1970-01-01
    • 1970-01-01
    • 2019-03-29
    • 1970-01-01
    • 1970-01-01
    • 2020-11-22
    • 1970-01-01
    • 2022-09-25
    相关资源
    最近更新 更多