【问题标题】:Using ADF REST connector to read and transform FHIR data使用 ADF REST 连接器读取和转换 FHIR 数据
【发布时间】:2021-09-17 22:39:43
【问题描述】:

我正在尝试使用 Azure 数据工厂从 FHIR 服务器读取数据并将结果转换为 Azure Blob 存储中以换行符分隔的 JSON (ndjson) 文件。具体来说,如果您查询 FHIR 服务器,您可能会得到如下信息:

{
    "resourceType": "Bundle",
    "id": "som-id",
    "type": "searchset",
    "link": [
        {
            "relation": "next",
            "url": "https://fhirserver/?ct=token"
        },
        {
            "relation": "self",
            "url": "https://fhirserver/"
        }
    ],
    "entry": [
        {
            "fullUrl": "https://fhirserver/Organization/1234",
            "resource": {
                "resourceType": "Organization",
                "id": "1234",
                // More fields
        },
        {
            "fullUrl": "https://fhirserver/Organization/456",
            "resource": {
                "resourceType": "Organization",
                "id": "456",
                // More fields
        },

        // More resources
    ]
}

基本上是一堆资源。我想将其转换为一个换行符分隔(又名 ndjson)文件,其中每一行只是资源的 json:

{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources

我能够设置 REST 连接器并且它可以查询 FHIR 服务器(包括分页),但无论我尝试什么,我似乎都无法生成我想要的输出。我设置了一个 Azure Blob 存储数据集:

{
    "name": "AzureBlob1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage1",
            "type": "LinkedServiceReference"
        },
        "type": "AzureBlob",
        "typeProperties": {
            "format": {
                "type": "JsonFormat",
                "filePattern": "setOfObjects"
            },
            "fileName": "myout.json",
            "folderPath": "outfhirfromadf"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

并配置复制活动:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy Data1",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "RestSource",
                        "httpRequestTimeout": "00:01:40",
                        "requestInterval": "00.00:00:00.010"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "schemaMapping": {
                            "resource": "resource"
                        },
                        "collectionReference": "$.entry"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "FHIRSource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

但最终(尽管配置了架构映射),blob 中的最终结果始终只是从服务器返回的原始包。如果我将输出 blob 配置为逗号分隔的文本,我可以提取字段并创建一个扁平的表格视图,但这并不是我真正想要的。

任何建议将不胜感激。

【问题讨论】:

  • 到目前为止,我对 Azure 数据工厂的复制活动的经验是,它只会将数据从一个地方复制到另一个地方,每次我需要某种转换时,我都会受伤 :) 你可以考虑接受吗Databricks 并使用一些 python/scala 脚本来完成您需要的转换?
  • @Kzrystof,感谢您的评论。那将是一种可能的选择。我想我想看看我能在这条道路上用 ADF 走多远。所以是的,绝对是一个选择,但我也想知道 ADF 是否(或将)可能。
  • 哦,我明白你的意思 :) 实际上我在几周前 asked a similar question 谈到了复制活动如何排除不符合特定条件的行...

标签: azure azure-data-factory hl7-fhir


【解决方案1】:

所以我找到了一个解决方案。如果我执行将捆绑包简单地转储到 JSON 文件中的原始步骤,然后再将 JSON 文件转换为我假装为文本文件的另一个 blob,则可以创建 njson 文件。

基本上,定义另一个 blob 数据集:

{
    "name": "AzureBlob2",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage1",
            "type": "LinkedServiceReference"
        },
        "type": "AzureBlob",
        "structure": [
            {
                "name": "Prop_0",
                "type": "String"
            }
        ],
        "typeProperties": {
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ",",
                "rowDelimiter": "",
                "quoteChar": "",
                "nullValue": "\\N",
                "encodingName": null,
                "treatEmptyAsNull": true,
                "skipLineCount": 0,
                "firstRowAsHeader": false
            },
            "fileName": "myout.json",
            "folderPath": "adfjsonout2"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

请注意这个TextFormat,还要注意quoteChar 是空白的。如果我再添加另一个复制活动:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy Data1",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "RestSource",
                        "httpRequestTimeout": "00:01:40",
                        "requestInterval": "00.00:00:00.010"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "schemaMapping": {
                            "['resource']": "resource"
                        },
                        "collectionReference": "$.entry"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "FHIRSource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ]
            },
            {
                "name": "Copy Data2",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "Copy Data1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "BlobSource",
                        "recursive": true
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": {
                            "resource": "Prop_0"
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob2",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

然后一切顺利。这并不理想,因为我现在有两份 blob 中的数据副本,但我想可以轻松删除一份。

如果有人有一站式解决方案,我仍然很想听听。

【讨论】:

    【解决方案2】:

    正如评论中简要讨论的那样,Copy Activity 除了映射数据之外没有提供太多功能。如文档中所述,Copy activity 执行以下操作:

    1. 从源数据存储中读取数据。
    2. 执行序列化/反序列化、压缩/解压缩、列映射等。它基于 输入数据集、输出数据集和复制的配置 活动。
    3. 将数据写入接收器/目标数据存储。

    看起来Copy Activity 除了有效地复制东西之外没有做任何其他事情。

    我发现正在工作的是使用 Databrick。

    步骤如下:

    1. 将 Databricks 帐户添加到您的订阅;
    2. 点击创作按钮进入 Databricks 页面;
    3. 创建一个笔记本;
    4. 编写脚本(Scala、Python 或.Net was recently announced)。

    脚本如下:

    1. 从 Blob 存储中读取数据;
    2. 根据需要过滤和转换数据;
    3. 将数据写回 Blob 存储;

    您可以从那里测试您的脚本,一旦准备就绪,您可以返回到您的管道并创建一个Notebook activity,它将指向您包含脚本的笔记本。

    我在 Scala 中编码时遇到了困难,但这是值得的 :)

    【讨论】:

      【解决方案3】:

      对于将来发现此帖子的任何人,您可以使用 $export api 调用来完成此操作。请注意,您必须有一个与 Fhir 服务器关联的存储帐户。

      https://build.fhir.org/ig/HL7/bulk-data/export.html#endpoint---system-level-export

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-12-21
        • 2016-08-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-08
        相关资源
        最近更新 更多