【问题标题】:Batch Prediction Job non-blocking批量预测作业非阻塞
【发布时间】:2022-02-04 21:22:11
【问题描述】:

我正在使用 python API 运行 Vertex AI 批量预​​测。 我使用的功能来自谷歌云文档:

def create_batch_prediction_job_dedicated_resources_sample(
    key_path,
    project: str,
    location: str,
    model_display_name: str,
    job_display_name: str,
    gcs_source: Union[str, Sequence[str]],
    gcs_destination: str,
    machine_type: str = "n1-standard-2",
    sync: bool = True,
):
    credentials = service_account.Credentials.from_service_account_file(
    key_path)

# Initilaize an aiplatfrom object
 aiplatform.init(project=project, location=location, credentials=credentials)

# Get a list of Models by Model name
 models = aiplatform.Model.list(filter=f'display_name="{model_display_name}"')
 model_resource_name = models[0].resource_name

# Get the model
 my_model = aiplatform.Model(model_resource_name)

 batch_prediction_job = my_model.batch_predict(
    job_display_name=job_display_name,
    gcs_source=gcs_source,
    gcs_destination_prefix=gcs_destination,
    machine_type=machine_type,
    sync=sync,
)

 #batch_prediction_job.wait_for_resource_creation()
 batch_prediction_job.wait()

 print(batch_prediction_job.display_name)
 print(batch_prediction_job.resource_name)
 print(batch_prediction_job.state)
 return batch_prediction_job

datetime_today = datetime.datetime.now()
model_display_name = 'test_model'
key_path = 'vertex_key.json'
project = 'my_project'
location = 'asia-south1'
job_display_name = 'batch_prediction_' + str(datetime_today)
model_name = '1234'
gcs_source = 'gs://my_bucket/Cleaned_Data/user_item_pairs.jsonl'
gcs_destination = 'gs://my_bucket/prediction'

create_batch_prediction_job_dedicated_resources_sample(key_path,project,location,model_display_name,job_display_name,
                                                      gcs_source,gcs_destination)

输出:

92 current state:
JobState.JOB_STATE_RUNNING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/my_project/locations/asia-south1/batchPredictionJobs/37737350127597649

上面的输出每隔几秒钟就会在终端上一遍又一遍地打印出来。

我遇到的问题是调用此函数的 python 程序一直运行,直到它被强制停止。我尝试了batch_prediction_job.wait()batch_prediction_job.wait_for_resource_creation(),结果相同。

如何在不等待它完成的情况下启动 batch_prediction_job 并在创建作业后立即终止程序?

【问题讨论】:

  • 检查 batch_predict() 方法的文档它说:Creates a batch prediction job using this Model and outputs prediction results to the provided destination prefix in the specified predictions_format. 所以我知道你不需要等待功能。致电batch_predict() 后,工作已经开始。无需等待即可运行它并检查是否在终端上创建了 jod。
  • 是的,你是对的。该作业是在没有等待函数调用的情况下创建的。但是终端仍然被阻塞,三个打印命令仍然每隔几秒执行一次。
  • 您是从另一个函数或脚本调用它吗?我不明白为什么它会根据您发布的代码循环重复 3 次打印。
  • 我只是使用所需参数使用相同的脚本调用此函数。我将使用完整的脚本编辑问题
  • 能否请您输出带有指示问题的输出

标签: google-cloud-platform google-cloud-vertex-ai


【解决方案1】:

我在cmets上给你错误的指令,更改参数sync=False,函数执行后应该返回。

这个函数调用应该是同步的(等待管道运行完成后再终止)还是异步的(立即返回)

sync=False

def create_batch_prediction_job_dedicated_resources_sample(
# ...
    sync: bool = False,
):

更新 - 添加更多细节:

Check here my notebook code 我在其中对其进行了测试及其工作: 您必须更改 sync=False AND 删除/注释以下打印行:

#batch_prediction_job.wait()
#print(batch_prediction_job.display_name)
#print(batch_prediction_job.resource_name)
#print(batch_prediction_job.state)

您的代码已编辑:

def create_batch_prediction_job_dedicated_resources_sample(
    key_path,
    project: str,
    location: str,
    model_display_name: str,
    job_display_name: str,
    gcs_source: Union[str, Sequence[str]],
    gcs_destination: str,
    machine_type: str = "n1-standard-2",
    sync: bool = False,
):
    credentials = service_account.Credentials.from_service_account_file(key_path)

# Initilaize an aiplatfrom object
 aiplatform.init(project=project, location=location, credentials=credentials)

# Get a list of Models by Model name
 models = aiplatform.Model.list(filter=f'display_name="{model_display_name}"')
 model_resource_name = models[0].resource_name

# Get the model
 my_model = aiplatform.Model(model_resource_name)

 batch_prediction_job = my_model.batch_predict(
    job_display_name=job_display_name,
    gcs_source=gcs_source,
    gcs_destination_prefix=gcs_destination,
    machine_type=machine_type,
    sync=sync,
)


 return batch_prediction_job

datetime_today = datetime.datetime.now()
model_display_name = 'test_model'
key_path = 'vertex_key.json'
project = '<my_project_name>'
location = 'asia-south1'
job_display_name = 'batch_prediction_' + str(datetime_today)
model_name = '1234'
gcs_source = 'gs://<my_bucket_name>/Cleaned_Data/user_item_pairs.jsonl'
gcs_destination = 'gs://<my_bucket_name>/prediction'

create_batch_prediction_job_dedicated_resources_sample(key_path,
                         project,location,
                         model_display_name,
                         job_display_name,
                         gcs_source,
                         gcs_destination,
                         sync=False,)

结果 sync=False: 结果 sync=True:

【讨论】:

  • 即使同步参数为 False,上面 OUTPUT 中提到的相同语句也会一遍又一遍地打印出来。
  • 嗨@Tarique,你删除了打印行吗?它的要求。您必须删除打印并使用sync=False 调用该函数。我已经根据您的正确代码库更新了答案,并且还打印了我的测试结果。
  • 我仍然得到相同的行为。我正在通过 VSCode 运行它,并且终端每隔几秒就会被状态更新阻塞,如上所示。
猜你喜欢
  • 2017-12-29
  • 2015-07-19
  • 2015-09-19
  • 2017-05-03
  • 2016-07-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多