【发布时间】:2019-12-01 01:36:40
【问题描述】:
我第一次在 python 中使用 asyncio 并尝试将它与 ZMQ 结合起来。
基本上我的问题是我有一个 REP/REQ 系统,在一个 async def 中,我需要等待一个函数。如何不更新值。
下面是一段代码的 sn-p 来说明这一点:
#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")
我将这个对象发送给一个类并在这个函数中取回它
async def readsonar(self, trigger_pin, REP_server_django):
i= 0
while True:
ping_from_view = await REP_server_django.recv() # line.1
value = await self.board.sonar_read(trigger_pin) # line.2
print(value) # line.3
json_data = json.dumps(value) # line.4
#json_data = json.dumps(i) # line.4bis
REP_server_django.send(json_data.encode()) # line.5
i+=1 # line.6
await asyncio.sleep(1/1000) # line.7
sonar_read 正在使用 pymata_express 读取超声波传感器。如果我评论 line.2 和 line.4 我会得到 i 的正确值。如果我评论 line.1 和 line.5,print(value) 会从 sonar_read 打印正确的值。但是,当我按此处所示运行它时,value 没有更新。
我错过了什么吗?
编辑:
编辑了关于行 cmets 的类型。我的意思是,如果我只读取声纳并打印值。它工作正常。如果我只有.recv() 和.send(json.dumps(i).encode()),它可以工作。但是,如果我尝试从声纳发送值。它锁定到未更新的给定 value
EDIT2:(对 Alan Yorinks 的回答):这是 MWE,它考虑了您发送的有关 zmq 在课堂上的声明的内容。取自pymata_express 示例concurrent_tasks.py
要重现错误,请在两个不同的终端中运行这两个脚本。您将需要一个安装了Frimata_express 的arduino 板。如果一切顺利,
PART A. 应该只在 mve_req.py 结束时吐出相同的值。您可以编辑不同的块(部分 A、B 或 C)以查看行为。
mve_rep.py
#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress
class ConcurrentTasks:
def __init__(self, board):
self.loop = board.get_event_loop()
self.board = board
self.ctxsync = zmq.Context()
self.context = zmq.asyncio.Context()
self.rep = self.context.socket(zmq.REP)
self.rep.bind("tcp://*:5558")
self.trigger_pin = 53
self.echo_pin = 51
loop.run_until_complete(self.async_init_and_run())
async def readsonar(self):
i = 0
while True:
#PART. A. WHAT I HOPE COULD WORK
rep_recv = await self.rep.recv() # line.1
value = await self.board.sonar_read(self.trigger_pin) # line.2
print(value) # line.3
json_data = json.dumps(value) # line.4
# json_data = json.dumps(i) # line.4bis
await self.rep.send(json_data.encode()) # line.5
i += 1 # line.6
await asyncio.sleep(1 / 1000) # line.7
'''
#PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
value = await self.board.sonar_read(self.trigger_pin) # line.2
print(value) # line.3
json_data = json.dumps(value) # line.4
i += 1 # line.6
await asyncio.sleep(1 / 1000) # line.7
'''
'''
#PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
rep_recv = await self.rep.recv() # line.1
json_data = json.dumps(i) # line.4bis
await self.rep.send(json_data.encode()) # line.5
i += 1 # line.6
await asyncio.sleep(1 / 1000) # line.7
'''
async def async_init_and_run(self):
await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
readsonar = asyncio.create_task(self.readsonar())
await readsonar
# OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
my_board = PymataExpress()
try:
ConcurrentTasks(my_board)
except (KeyboardInterrupt, RuntimeError):
loop.run_until_complete(my_board.shutdown())
print('goodbye')
finally:
loop.close()
mve_req.py
import zmq
import time
import json
def start_zmq():
context = zmq.Context()
REQ_django = context.socket(zmq.REQ)
REQ_django.connect("tcp://localhost:5558")
return REQ_django, context
def get_sonar(REQ_django):
REQ_django.send(b"server_django")
ping_from_server_django = REQ_django.recv()
return ping_from_server_django.decode()
if __name__ == '__main__':
data = {"sensors":{}}
REQ_django, context = start_zmq()
while REQ_django:
data['sensors']['sonar'] = get_sonar(REQ_django)
json_data = json.dumps(data)
print(data)
#DO OTHER WORK
time.sleep(1)
REQ_django.close()
context.term()
【问题讨论】:
标签: python zeromq distributed-computing python-asyncio req