【问题标题】:Vertex AI Model Batch prediction, issue with referencing existing model and input file on Cloud StorageVertex AI 模型批量预测,在 Cloud Storage 上引用现有模型和输入文件时出现问题
【发布时间】:2022-01-18 05:39:33
【问题描述】:

我正在努力正确设置执行以下操作的 Vertex AI 管道:

  1. 从 API 读取数据并存储到 GCS 并作为批量预测的输入。
  2. 获取现有模型(Vertex AI 上的视频分类)
  3. 使用来自点 1 的输入创建批量预测作业。
    可以看出,我对 Vertex Pipelines/Kubeflow 没有太多经验,因此我寻求帮助/建议,希望这只是一些初学者的错误。 这是我用作管道的代码的要点
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl

from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Output,
    Artifact,
    Model,
)

PROJECT_ID = 'my-gcp-project'
BUCKET_NAME = "mybucket"
PIPELINE_ROOT = "{}/pipeline_root".format(BUCKET_NAME)


@component
def get_input_data() -> str:
    # getting data from API, save to Cloud Storage
    # return GS URI
    gcs_batch_input_path = 'gs://somebucket/file'
    return gcs_batch_input_path


@component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.8.0']
)
def load_ml_model(project_id: str, model: Output[Artifact]):
    """Load existing Vertex model"""
    import google.cloud.aiplatform as aip

    model_id = '1234'
    model = aip.Model(model_name=model_id, project=project_id, location='us-central1')



@dsl.pipeline(
    name="batch-pipeline", pipeline_root=PIPELINE_ROOT,
)
def pipeline(gcp_project: str):
    input_data = get_input_data()
    ml_model = load_ml_model(gcp_project)

    gcc_aip.ModelBatchPredictOp(
        project=PROJECT_ID,
        job_display_name=f'test-prediction',
        model=ml_model.output,
        gcs_source_uris=[input_data.output],  # this doesn't work
        # gcs_source_uris=['gs://mybucket/output/'],  # hardcoded gs uri works
        gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
    )


if __name__ == '__main__':
    from kfp.v2 import compiler
    import google.cloud.aiplatform as aip
    pipeline_export_filepath = 'test-pipeline.json'
    compiler.Compiler().compile(pipeline_func=pipeline,
                                package_path=pipeline_export_filepath)
    # pipeline_params = {
    #     'gcp_project': PROJECT_ID,
    # }
    # job = aip.PipelineJob(
    #     display_name='test-pipeline',
    #     template_path=pipeline_export_filepath,
    #     pipeline_root=f'gs://{PIPELINE_ROOT}',
    #     project=PROJECT_ID,
    #     parameter_values=pipeline_params,
    # )

    # job.run()

运行管道时,它会在运行批量预测时抛出此异常:
details = "List of found errors: 1.Field: batch_prediction_job.model; Message: Invalid Model resource name. 所以我不确定有什么问题。我尝试将模型加载到笔记本中(组件外部)并且它正确返回。

我遇到的第二个问题是将 GCS URI 引用为从组件到批处理作业输入的输出。

   input_data = get_input_data2()
   gcc_aip.ModelBatchPredictOp(
        project=PROJECT_ID,
        job_display_name=f'test-prediction',
        model=ml_model.output,
        gcs_source_uris=[input_data.output],  # this doesn't work
        # gcs_source_uris=['gs://mybucket/output/'],  # hardcoded gs uri works
        gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
    )

在编译期间,我得到以下异常 TypeError: Object of type PipelineParam is not JSON serializable,尽管我认为这可能是 ModelBatchPredictOp 组件的问题。

再次感谢任何帮助/建议,我从昨天开始处理这个问题,所以也许我错过了一些明显的东西。

我正在使用的库:

google-cloud-aiplatform==1.8.0  
google-cloud-pipeline-components==0.2.0  
kfp==1.8.10  
kfp-pipeline-spec==0.1.13  
kfp-server-api==1.7.1

更新 在 cmets 之后,一些研究和调整,对于参考模型这工作:

@component
def load_ml_model(project_id: str, model: Output[Artifact]):
    region = 'us-central1'
    model_id = '1234'
    model_uid = f'projects/{project_id}/locations/{region}/models/{model_id}'
    model.uri = model_uid
    model.metadata['resourceName'] = model_uid

然后我可以按预期使用它:

batch_predict_op = gcc_aip.ModelBatchPredictOp(
        project=gcp_project,
        job_display_name=f'batch-prediction-test',
        model=ml_model.outputs['model'],
        gcs_source_uris=[input_batch_gcs_path],
gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/test'
    )

更新 2 关于 GCS 路径,一种解决方法是在组件外部定义路径并将其作为输入参数传递,例如(缩写):

@dsl.pipeline(
    name="my-pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
        gcp_project: str,
        region: str,
        bucket: str
):
    ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    
    gcs_prediction_input_path = f'gs://{BUCKET_NAME}/prediction_input/video_batch_prediction_input_{ts}.jsonl'
    batch_input_data_op = get_input_data(gcs_prediction_input_path)  # this loads input data to GCS path

    batch_predict_op = gcc_aip.ModelBatchPredictOp(
        project=gcp_project,
        model=training_job_run_op.outputs["model"],
        job_display_name='batch-prediction',
        # gcs_source_uris=[batch_input_data_op.output],
        gcs_source_uris=[gcs_prediction_input_path],
        gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/',
    ).after(batch_input_data_op)  # we need to add 'after' so it runs after input data is prepared since get_input_data doesn't returns anything

仍然不确定,为什么当我从 get_input_data 组件返回 GCS 路径时它不起作用/编译

【问题讨论】:

  • 我想了解更多关于您案件的详细信息。关于您的第一个问题,您的代码是否适合此description,检查 id、上/下和区域在您的项目中是否常见。关于第二个问题,您能否发布完整的堆栈跟踪或引发此错误的文件的名称。
  • 感谢您的评论和参考,我使用可参考 ML 模型的解决方案更新了描述
  • 那么,对于第二期,您能否详细说明实际发生的情况?它只是关于input_data.output,对吧?你有没有尝试在上一步调用函数之前打印 input_data.output 并且只打印没有.output的 input_data??
  • 或者你指的是gcs_source_uris还是gcs_destination_output_uri_prefix??
  • 对此有任何更新吗?您能帮忙定义您的存储问题吗?

标签: kubeflow-pipelines google-cloud-vertex-ai


【解决方案1】:

很高兴您解决了大部分主要问题并找到了模型声明的解决方法。

对于您对gcs_source_urisinput.output 观察,其背后的原因是函数/类返回值的方式。如果你深入了解google_cloud_pipeline_components 的类/方法,你会发现它实现了一个结构,允许你从被调用函数的返回值中使用.outputs

如果你去实现管道的一个组件,你会发现它从convert_method_to_component 函数返回一个输出数组。因此,为了在您的自定义类/函数中实现它,您的函数应该返回一个可以作为属性调用的值。下面是它的基本实现。

class CustomClass():
     def __init__(self):
       self.return_val = {'path':'custompath','desc':'a desc'}
      
     @property
     def output(self):
       return self.return_val 

hello = CustomClass()
print(hello.output['path'])

如果您想深入了解它,可以转到以下页面:

【讨论】:

    猜你喜欢
    • 2022-01-25
    • 2021-10-29
    • 1970-01-01
    • 2021-12-23
    • 2021-11-19
    • 1970-01-01
    • 2021-12-09
    • 2022-11-15
    • 2022-01-01
    相关资源
    最近更新 更多