【问题标题】:Google Cloud PubSub not being called properly in Python在 Python 中未正确调用 Google Cloud PubSub
【发布时间】:2019-02-06 01:35:52
【问题描述】:

我绞尽脑汁,但我不明白为什么会发生这个问题,我也无法找出原因。我正在尝试读取图像并将其传递给 pubsub。一旦通过 pubsub 发送消息,它就会被重定向到 AutoML 模型以识别或预测给定的图像。下面是代码sn-p

global val1   
@app.route("/", methods=['GET', 'POST'])
doc_type=request.form.get('submit_button')
        file_name = secure_filename(file.filename)
        blob=file.read()
        flash('File upload successful', 'info')
        # Initializing PubSub
        publisher,subscriber,topic,subscription=pubsub_init(doc_type) 
        blob=blob+bytes(doc_type,'utf-8')
        subscriber.subscribe(subscription,callback)
        publisher.publish(topic,blob)
        flash("the uploaded file is "+val1,'info')

初始化函数:

def pubsub_init(doctype):
    publisher=pubsub.PublisherClient()
    subscriber=pubsub.SubscriberClient()
    if doctype=="License":
        subscription=<<sub name>>
    elif doctype=="Credit":
        subscription=<<subname>>
    elif doctype=="Passport":
        subscription=<<subname>>
    else:
        print("invalid choice"
  topic=<<topic>>
print(subscription)
return (publisher,subscriber,topic,subscription)

我的回调:

def callback(message):
    #print("hello",flush=True)
     print("making global")
     project_id=<<proj id>>
     val1,val2=predict_value(new_message,model_id,project_id,compute_region)
     message.ack()

但我收到了类似 val1 未定义的错误。能否请您对此提出建议?

【问题讨论】:

    标签: python google-app-engine flask google-cloud-platform google-cloud-pubsub


    【解决方案1】:

    如果你要调用一个全局变量,你必须在函数中这样声明:

    def callback(message):
        global val1
        global val2
        ...
        val1, val2 = predict_value(new_message,model_id,project_id,compute_region)
    

    【讨论】:

    • 您好,感谢您的回复。即使那样我也面临同样的问题。 :( val1 未定义
    • 您是否检查过 val1 实际上被分配了一个值为 'val1,val2=predict_value(new_message,model_id,project_id,compute_region)' 的值?
    【解决方案2】:

    这里的问题是subscriber.subscribe(subscription, callback) 正在设置对callback 的异步调用。

    这意味着当您发布一个新主题时,您实际上是在对 flash(...) 的调用将首先执行还是回调之间设置竞争条件。由于回调可能需要一些时间才能完成,flash 行获胜,但尚未创建 val1,因此您的错误。

    There are ways to control the concurrency 这可能会阻止订阅者的未来,从而使您尝试做的事情成为可能。

    但是在尝试之前,我想先问一下您为什么要在这里尝试使用 pub/sub。似乎您只是在设置发布者和订阅者以发布单个消息,然后尝试对该消息的结果执行某些操作。为什么不直接内联呢?

    @app.route("/", methods=['GET', 'POST'])
    def your_function(request):
        doc_type=request.form.get('submit_button')
        file_name = secure_filename(file.filename)
        blob=file.read()
        flash('File upload successful', 'info')
        blob=blob+bytes(doc_type,'utf-8')
        # turn a blob into new_message, get model_id from somewhere?
        project_id=<<proj id>>
        val1,val2=predict_value(new_message,model_id,project_id,compute_region)
        flash("the uploaded file is "+val1,'info')
    

    【讨论】:

    • 嗨达斯汀..谢谢你的 cmets。这只是我想准备的演示,我认为会有很多人使用它。我已经尝试过内联,它很有效并且有效。但我正在尝试发送到 pub/sub,这样我就不会错过任何要尝试的概念。
    • 所以我需要在索引函数中一个接一个地添加future.result()和future.cancel()? blob=blob+bytes(doc_type,'utf-8') subscriber.subscribe(subscription,callback) publisher.publish(topic,blob) future.result() future,cancel() ?
    • google.api_core.exceptions.PermissionDenied: 403 Permission 'automl.models.predict' 在资源 'xxxx/models/' 上被拒绝(或者它可能不存在)
    猜你喜欢
    • 2018-06-22
    • 2019-02-10
    • 2022-01-23
    • 2019-07-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多