我尝试了您的用例,并通过在 GCP 中使用 Cloud Functions、Cloud Storage、Cloud Scheduler 和 Cloud Healthcare API 服务,成功地将文件从云存储移动到 FHIR 存储。
在Cloud Function 中,我使用了官方文档中的这个sample code 进行导入高频头来自云存储的资源。只需确保您已安装 dependencies 即可运行代码示例。下面是我的 Cloud Functions 上的示例代码。 (笔记我使用云功能default service account)
主文件
def hello_world(request):
# Imports the Google API Discovery Service.
from googleapiclient import discovery
api_version = "v1"
service_name = "healthcare"
# Instantiates an authorized API client by discovering the Healthcare API
# and using GOOGLE_APPLICATION_CREDENTIALS environment variable.
client = discovery.build(service_name, api_version)
# TODO(developer): Uncomment these lines and replace with your values.
project_id = 'xxxx-xxxxx-' # replace with your GCP project ID
location = 'us-central1' # replace with the parent dataset's location
dataset_id = 'xxxxx-xxxxx' # replace with the parent dataset's ID
fhir_store_id = 'xxxx-xxxx' # replace with the FHIR store ID
gcs_uri = 'xxxx-xxxxx' # replace with a Cloud Storage bucket
fhir_store_parent = "projects/{}/locations/{}/datasets/{}".format(
project_id, location, dataset_id
)
fhir_store_name = "{}/fhirStores/{}".format(fhir_store_parent, fhir_store_id)
body = {
"contentStructure": "CONTENT_STRUCTURE_UNSPECIFIED",
"gcsSource": {"uri": "gs://{}".format(gcs_uri)},
}
# Escape "import()" method keyword because "import"
# is a reserved keyword in Python
request = (
client.projects()
.locations()
.datasets()
.fhirStores()
.import_(name=fhir_store_name, body=body)
)
response = request.execute()
print("Imported FHIR resources: {}".format(gcs_uri))
print(response)
return response
要求.txt
google-api-python-client==2.47.0
google-auth-httplib2==0.1.0
google-auth==2.6.2
google-cloud==0.34.0
google-cloud-storage==2.0.0; python_version < '3.7'
google-cloud-storage==2.1.0; python_version > '3.6'
然后按照这个link 创建一个Cloud Scheduler 作业。这取决于你将如何schedule your job。请注意选择HTTP作为目标类型然后粘贴触发网址您的云功能并选择得到HTTP 方法。
可以看到导入是否成功医疗保健页面 -> 数据集 -> 操作选项卡.
样本输出: