【问题标题】:pyzmq REQ/REP with asyncio await for variable带有异步等待变量的 pyzmq REQ/REP
【发布时间】:2019-12-01 01:36:40
【问题描述】:

我第一次在 python 中使用 asyncio 并尝试将它与 ZMQ 结合起来。

基本上我的问题是我有一个 REP/REQ 系统,在一个 async def 中,我需要等待一个函数。如何不更新值。 下面是一段代码的 sn-p 来说明这一点:

#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")

我将这个对象发送给一个类并在这个函数中取回它

async def readsonar(self, trigger_pin, REP_server_django):
        i= 0
        while True:

            ping_from_view = await REP_server_django.recv()  # line.1
            value = await self.board.sonar_read(trigger_pin) # line.2
            print(value)                                     # line.3
            json_data = json.dumps(value)                    # line.4
            #json_data = json.dumps(i)                       # line.4bis
            REP_server_django.send(json_data.encode())       # line.5
            i+=1                                             # line.6
            await asyncio.sleep(1/1000)                      # line.7

sonar_read 正在使用 pymata_express 读取超声波传感器。如果我评论 line.2line.4 我会得到 i 的正确值。如果我评论 line.1line.5print(value) 会从 sonar_read 打印正确的值。但是,当我按此处所示运行它时,value 没有更新。

我错过了什么吗?


编辑:
编辑了关于行 cmets 的类型。我的意思是,如果我只读取声纳并打印值。它工作正常。如果我只有.recv().send(json.dumps(i).encode()),它可以工作。但是,如果我尝试从声纳发送值。它锁定到未更新的给定 value


EDIT2:(对 Alan Yorinks 的回答):这是 MWE,它考虑了您发送的有关 zmq 在课堂上的声明的内容。取自pymata_express 示例concurrent_tasks.py

要重现错误,请在两个不同的终端中运行这两个脚本。您将需要一个安装了Frimata_express 的arduino 板。如果一切顺利, PART A. 应该只在 mve_req.py 结束时吐出相同的值。您可以编辑不同的块(部分 A、B 或 C)以查看行为。

mve_rep.py

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress


class ConcurrentTasks:

    def __init__(self, board):


        self.loop = board.get_event_loop()
        self.board = board

        self.ctxsync = zmq.Context()
        self.context = zmq.asyncio.Context()
        self.rep = self.context.socket(zmq.REP)
        self.rep.bind("tcp://*:5558")

        self.trigger_pin = 53
        self.echo_pin = 51

        loop.run_until_complete(self.async_init_and_run())

    async def readsonar(self):
        i = 0
        while True:


            #PART. A. WHAT I HOPE COULD WORK
            rep_recv = await self.rep.recv()                       # line.1
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            # json_data = json.dumps(i)                            # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7


            '''
            #PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''

            '''
            #PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
            rep_recv = await self.rep.recv()                       # line.1
            json_data = json.dumps(i)                              # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''



    async def async_init_and_run(self):

        await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)

        readsonar = asyncio.create_task(self.readsonar())
        await readsonar

        # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    my_board = PymataExpress()
    try:
        ConcurrentTasks(my_board)
    except (KeyboardInterrupt, RuntimeError):
        loop.run_until_complete(my_board.shutdown())
        print('goodbye')
    finally:
        loop.close()

mve_req.py

import zmq
import time
import json

def start_zmq():
    context = zmq.Context()
    REQ_django  = context.socket(zmq.REQ)
    REQ_django.connect("tcp://localhost:5558")

    return REQ_django, context

def get_sonar(REQ_django):
    REQ_django.send(b"server_django")
    ping_from_server_django = REQ_django.recv()
    return ping_from_server_django.decode()

if __name__ == '__main__':

    data = {"sensors":{}}

    REQ_django, context = start_zmq()
    while REQ_django:

            data['sensors']['sonar'] = get_sonar(REQ_django)
            json_data = json.dumps(data)
            print(data)

            #DO OTHER WORK
            time.sleep(1)

    REQ_django.close()
    context.term()

【问题讨论】:

    标签: python zeromq distributed-computing python-asyncio req


    【解决方案1】:

    完全公开,我是pymata-expresspython-banyan.的作者,OP要求我发布这个解决方案,所以这并不是一个无耻的插件。

    自从在 Python 3 中首次引入 asyncio 以来,我一直在使用它进行开发。当 asyncio 代码工作时,asyncio (恕我直言) 可以简化并发和代码。但是,当出现问题时,调试和了解问题的原因可能会令人沮丧。

    我提前道歉,因为这可能有点冗长,但我需要提供一些背景信息,以免示例看起来像一些随机代码。

    python-banyan 框架的开发是为了提供线程、多处理和异步的替代方案。简而言之,Banyan 应用程序由小型目标可执行文件组成,这些可执行文件使用通过 LAN 共享的协议消息相互通信。它的核心使用 Zeromq。它的设计目的不是让流量通过 WAN 移动,而是将 LAN 用作“软件背板”。在某些方面,Banyan 类似于 MQTT,但在 LAN 中使用时速度要快得多。如果需要,它确实能够连接到 MQTT 网络。

    Banyan 的一部分是一个叫做 OneGPIO 的概念。它是一种协议消息规范,将 GPIO 功能抽象为独立于任何硬件实现。为了实现硬件细节,开发了专门的 Banyan 组件,称为 Banyan 硬件网关。有适用于 Raspberry Pi、Arduino、ESP-8266 和 Adafruit Crickit Hat 的网关。 GPIO 应用程序发布任何或所有网关可以选择接收的通用 OneGPIO 消息。要从一个硬件平台移动到另一个硬件平台,启动硬件关联网关,无需修改,启动控制组件(如下所示的代码)。从一个硬件平台到另一个硬件平台,任何组件都不需要修改代码,控制组件和网关都没有修改。启动控制组件时,可以通过命令行选项指定变量,例如引脚号。对于 Arduino 网关,pymata-express 用于控制 Arduino 的 GPIO。 Pymata-express 是 StandardFirmata 客户端的异步实现。需要注意的是,下面的代码不是 asyncio。 Banyan 框架允许使用适合问题的工具进行开发,同时允许解耦部分解决方案,在这种情况下,应用程序允许将 asyncio 与 non-asyncio 混合使用,而不会遇到通常在做的过程中遇到的任何麻烦。所以。

    在提供的代码中,类定义下面的所有代码都用于提供对命令行配置选项的支持。

    import argparse
    import signal
    import sys
    import threading
    import time
    
    from python_banyan.banyan_base import BanyanBase
    
    
    class HCSR04(BanyanBase, threading.Thread):
        def __init__(self, **kwargs):
            """
            kwargs contains the following parameters
            :param back_plane_ip_address: If none, the local IP address is used
            :param process_name: HCSR04
            :param publisher_port: publishing port
            :param subscriber_port: subscriber port
            :param loop_time: receive loop idle time
            :param trigger_pin: GPIO trigger pin number
            :param echo_pin: GPIO echo pin number
            """
    
            self.back_plane_ip_address = kwargs['back_plane_ip_address'],
            self.process_name = kwargs['process_name']
            self.publisher_port = kwargs['publisher_port']
            self.subscriber_port = kwargs['subscriber_port'],
            self.loop_time = kwargs['loop_time']
            self.trigger_pin = kwargs['trigger_pin']
            self.echo_pin = kwargs['echo_pin']
            self.poll_interval = kwargs['poll_interval']
    
            self.last_distance_value = 0
    
            # initialize the base class
            super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
                                         subscriber_port=kwargs['subscriber_port'],
                                         publisher_port=kwargs['publisher_port'],
                                         process_name=kwargs['process_name'],
                                         loop_time=kwargs['loop_time'])
    
            threading.Thread.__init__(self)
            self.daemon = True
    
            self.lock = threading.Lock()
    
            # subscribe to receive messages from arduino gateway
            self.set_subscriber_topic('from_arduino_gateway')
    
            # enable hc-sr04 in arduino gateway
            payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
                       'echo_pin': self.echo_pin}
            self.publish_payload(payload, 'to_arduino_gateway')
    
            # start the thread
            self.start()
    
            try:
                self.receive_loop()
            except KeyboardInterrupt:
                self.clean_up()
                sys.exit(0)
    
        def incoming_message_processing(self, topic, payload):
            print(topic, payload)
            with self.lock:
                self.last_distance_value = payload['value']
    
        def run(self):
            while True:
                with self.lock:
                    distance = self.last_distance_value
                payload = {'distance': distance}
                topic = 'distance_poll'
                self.publish_payload(payload, topic)
                time.sleep(self.poll_interval)
    
    
    def hcsr04():
        parser = argparse.ArgumentParser()
        # allow user to bypass the IP address auto-discovery.
        # This is necessary if the component resides on a computer
        # other than the computing running the backplane.
        parser.add_argument("-b", dest="back_plane_ip_address", default="None",
                            help="None or IP address used by Back Plane")
        parser.add_argument("-i", dest="poll_interval", default=1.0,
                            help="Distance polling interval")
        parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
                            help="Set process name in banner")
        parser.add_argument("-p", dest="publisher_port", default="43124",
                            help="Publisher IP port")
        parser.add_argument("-s", dest="subscriber_port", default="43125",
                            help="Subscriber IP port")
        parser.add_argument("-t", dest="loop_time", default=".1",
                            help="Event Loop Timer in seconds")
        parser.add_argument("-x", dest="trigger_pin", default="12",
                            help="Trigger GPIO pin number")
        parser.add_argument("-y", dest="echo_pin", default="13",
                            help="Echo GPIO pin number")
    
        args = parser.parse_args()
    
        if args.back_plane_ip_address == 'None':
            args.back_plane_ip_address = None
        kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
                      'publisher_port': args.publisher_port,
                      'subscriber_port': args.subscriber_port,
                      'process_name': args.process_name,
                      'loop_time': float(args.loop_time),
                      'trigger_pin': int(args.trigger_pin),
                      'echo_pin': int(args.echo_pin),
                      'poll_interval': int(args.poll_interval)
                      }
    
        # replace with the name of your class
        HCSR04(**kw_options)
    
    
    # signal handler function called when Control-C occurs
    def signal_handler(sig, frame):
        print('Exiting Through Signal Handler')
        raise KeyboardInterrupt
    
    
    # listen for SIGINT
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    if __name__ == '__main__':
        hcsr04()
    

    【讨论】:

    • 也许我漏掉了什么,但你为什么需要在课堂上加入threading.Thread
    • 我在玩弄它并错误地将其留在了其中,但是,我只是修改了上面的脚本以使用线程来允许您轮询最新的值。您可以使用 -i 选项设置轮询时间。每次达到轮询时间时,都会发布一条新的 Banyan 消息。
    【解决方案2】:

    我让它工作了,虽然我不得不承认,我不明白它为什么工作的原因。基本上我必须创建一个新的async def,它只轮询来自sonar_read 的读数并使用asyncio.wait 来返回值。这是代码:

    #ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
    #https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
    import asyncio
    import zmq
    import json
    import zmq.asyncio as zmq_asyncio
    from pymata_express.pymata_express import PymataExpress
    
    
    class ConcurrentTasks:
    
        def __init__(self, board):
    
    
            self.loop = board.get_event_loop()
            self.board = board
    
            self.ctxsync = zmq.Context()
            self.context = zmq.asyncio.Context()
            self.rep = self.context.socket(zmq.REP)
            self.rep.bind("tcp://*:5558")
    
            self.trigger_pin = 53
            self.echo_pin = 51
    
            loop.run_until_complete(self.async_init_and_run())
    
        ### START:  NEW CODE THAT RESOLVED THE ISSUE
        async def pingsonar(self):
            value = await self.board.sonar_read(self.trigger_pin)
            return value
    
        async def readsonar(self):
            while True:
                rep_recv = await self.rep.recv() 
                value = await asyncio.wait([self.pingsonar()])
                valuesonar = list(value[0])[0].result()
                json_data = json.dumps(valuesonar) 
                await self.rep.send(json_data.encode()) 
                await asyncio.sleep(1 / 1000) #maybe this line isn't necessary
    
        ### END : NEW CODE THAT RESOLVED THE ISSUE
    
        async def async_init_and_run(self):
    
            await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
    
            readsonar = asyncio.create_task(self.readsonar())
            await readsonar
    
            # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        my_board = PymataExpress()
        try:
            ConcurrentTasks(my_board)
        except (KeyboardInterrupt, RuntimeError):
            loop.run_until_complete(my_board.shutdown())
            print('goodbye')
        finally:
            loop.close()
    

    仍然感谢您的帮助。

    【讨论】:

      【解决方案3】:

      (O/P MCVE 问题的定义更进一步——然而,无论是否优先考虑,{sensors|actors}-control-systems 的协调问题,设计使用的系统越多分布式自治代理在专业上非常复杂,很容易制作有缺陷的“快捷方式”或进入system-wide blocking-state

      最好先阅读至少this 关于ZeroMQ Hierarchy in Less Than 5 secondsthis 关于相互死锁 阻塞

      阅读神话般的 Pieter HINTJENS 的书“连接代码:第 1 卷”对于任何系统设计师来说都具有巨大的价值)

      "...seams 很有趣,因为它已经实现了异步,所以我可以像我一样添加异步 zmq。我错了吗?"

      是的,没有“只需添加异步”快捷方式,控制系统是非常有趣的学科,而是一个复杂的学科。总是。很抱歉不得不直截了当地听到。在教科书示例或琐碎的创客项目中,用户可能会隐藏一些复杂性。然后,通过添加一个或几个更琐碎的功能来尝试扩展它们,锤子就来了。复杂性突然浮出水面,前所未见。


      O/P multi-agent-[A,B,C,D]-系统代码的形式图(原样)

      将正式地图放在全屏编辑器上,以便更全面地了解所有相互冲突的依赖关系和相互竞争的控制循环。延迟是最容易的部分。无法解决的死锁阻塞风险的几个地方是核心之一。 ZeroMQ,因为 v2.x 有避免其中一些的工具,软件设计师有责任适当地减轻所有其他的。控制系统(机器人或其他)必须证明这种鲁棒性和对错误的适应能力,并安全地“生存”所有“外部”事故。

      最好的起点是第 1 行汇编语言指令中所表达的旧黄金法则:

      ;ASSUME NOTHING
      

      并努力精心设计所有其余部分。


      multi-agent-[A,B,C,D]-system coordination
                   | | | |
                   +-|-|-|--------------------- python while   ~ 100 [ms] GIL-lock enforced quota for pure-[SERIAL]-ised code-execution, imposed on all python-threads ( be it voluntarily or involuntarily interruped by the python GIL-lock mechanics, O/S-specific )
                     +-|-|--------------------- hardware ~  64 - 147 [ms] self.board proxy-driven, responding to python code
                       +-|--------------------- python asynchronous, strict sequence of remote/local events dependent ZeroMQ dFSA, distributed among local-code operated REP and remote-code operated REQ-side(s) - enforcing a mutually ordered sequence of distributed behaviour as REQ/REP Scalable Formal Communication Archetype Pattern defines
                         +--------------------- python asyncio.get_event_loop() instantiated another event-loop that may permit to defer an execution(s) of some parts of otherwise imperative python-code to some later time
      
      multi-agent-[A,B,C,D]-system code (as-is)
                   : : : :
                   : : : +---------------------------------------------------------+
                   : : +-----------------------------------------------------------:-------------------+ - - - - - - - - - - - - - - - - -<?network?>- - - - - - - - - - - - - - +
                   : +-------------------------------------------------------------:----------+        :                                                                         :
                   :                                                               :          :        :                                                                         :
                   :                                                               :          :        :                                                                         :
                   !                                                               :          :        :                                                                         :
      ____PYTHON___!                                                               :          :        :                                                                         :
                   !                                                               ?          ?        ?                                                                         ?
                +->!                                                              D?         B?       C?REP-1:{0:N}-remote---------------<?network?>------------------------REQ.C? dFSA-state?dependent
                ^  !                                                              D?         B?       C?REP-1:{0:N}                                                            .C?
                ^ A!: IMPERATIVE LOOP-HEAD: while True:                           D?AWAIT    B?       C?REP-1:{0:N}-distributed-Finite-State-Automaton (dFSA) BEHAVIOUR, local .C? side depends also on EVOLUTION OF A FUZZY, DYNAMIC, MULTIPARTY, network-wide dFSA-STATE(s) inside such ECOSYSTEM
                ^  !                                                              D?         B?       C?                                                                        
                ^  !                                                              D?         B?       C?                    REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
                ^  !                                                              D?         B?       C?                       vC?                                             ^C?
                ^  !_______.SET DEFERRED:         P_D?C?_deferred_yield_ping     =D?await ...         C?REP.recv()---<--?---?--vC?-----<--<network>--------<--?remote-REQ-state-C?-( ^C?-dFSA-state && C?.recv()-blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then, just deferred via D?await )
                ^  !                                                              D?         B?                                vC?                                             ^C?
                ^  !_______.SET DEFERRED:         S_D?B?_deferred_yield_sonar    =D?await ...B?.board.sonar_read()-o-<--?-+    vC?                                             ^C?
                ^  !                                                                                               :      |    vC?                                             ^C?
                ^  !_______.GUI OUTPUT:           print( deferred_yield_sonar )  #A!->-----------------------------+->----?->---:?--->[ a last-known (if any) S_D?B?_deferred_yield_sonar value put "now" on GUI-screen ]
                ^  !                                                                                               :      ^    vC?                                             ^C?
                ^  !_______.SET TRANSFORMED:      S_D?B?_dependent_tranformed    =A!json.dumps( S_D?B? )--<--<--<--+      |    vC? <--[ a last-known (if any) S_D?B?_deferred_yield_sonar value transformed and assigned]
                ^  !                                                                                               :      |    vC?                                             ^C?
                ^  !_______.BLOCKING-MODE-SEND()  REP.send( S_D?B?_dependent_transformed.encode() )  #C? .send( S_D?B? )--?---->C?-->----<?network?>-->-------?remote-REQ-state-C?-( +C?-indeterministic and blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then )
                ^  !X:C?                                                                                                  ^    vC?                                             ^C?
                ^  !X:C?___.SET IMPERATIVE:       i += 1                                                                  | REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
                ^  !X:C?                                                                                                  ?                                                       
                ^  !X:C?___.NOP/SLEEP() DEFERRED: await sleep( ... )             #D?AWAIT                                 ^                                                      :
                ^  !X:C?D?+0ms                                                                                            |                                                      :
                ^  !X:C?D?_.JUMP/LOOP                                                                                     ?                                                      :
                ^__!X:C?D?+0ms                                                                                            ^                                                      :
                                                                                                                          |                                                      :
                                                                                                                          |                                                      :
                                                                                                                          |                                                      :
      ____SONAR___________________________________________________________________________________________________________B? REQUEST T0: + EXPECT ~64 - ~147 [ms] LATENCY        :
                                                                                                                          B? hardware value acquisition latency can be masked    :
                                                                                                                             via await or other concurrency-trick )              :
                                                                                                                                                                                 :
      ____REQ-side(s)_?{0:N} __________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
      _____REQ-side(s)_?{0:N} _________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
      ______REQ-side(s)_?{0:N} ________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
      _______REQ-side(s)_?{0:N} _______________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
           ...                                                                                                                                                                 ::: ...
      ______...REQ-side(s)_?{0:N} _____________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
      

      正如 O/P 的 EDIT : 2 小时前解释的那样,

      现在问题很明显了。无限的while True:-循环指示硬步通过,逐行,循环-再次“旋转”所有步骤,一个接一个,而任何asyncio @987654331 @-装饰的仿函数存在,异步独立于这个“主要”A:while True:-命令式代码执行的循环块。同样的方式 a B:self.board-设备的外部声纳设备是一个独立的定时设备,在 python 代码之外,具有一些无法管理的硬件/读取/解码延迟,固定的协调-循环 + C: ZeroMQ-REQ/REP-Archetype-behaviour(再次与分散的“外国”REQ-actor(s)/agent(s) 进行外部协调 - 是的,你不知道,其中有多少... - 但都超出了您的控制范围,所有REQ-side(s) 和您本地实例化的REP-side 分布式有限状态机状态完全独立于“框架”-python 循环的意愿来推动步骤并执行下一步,下一步,下一步......)+另一个,这里 D:asyncio.get_event_loop() - 实例化的“第三个”-event_loop,这会影响 await-decorated 仿函数实际上被允许延迟产生结果并在稍后交付它们的方式----- 而且,这就是“交叉面包”-event_loops 的问题。

      如果这个问题的设置是由任何计算机科学教授详细阐述的,那么她/他应该得到起立鼓掌,因为他使这项任务成为分布式系统问题的最佳示例——几乎可以作为对 Margaret HAMILTON 夫人工作的致敬关于阿波罗 AGC 计算机系统的正确设计,她的工作解决了这类问题,从而挽救了机组人员的生命和 50 年前登月的所有自豪感。很棒的演讲,汉密尔顿夫人,很棒的演讲。

      微不足道,但就在现场。

      确实是一项可爱且科学上奇妙的任务:

      为一组独立定时和操作的代理 [A, B, C, D], A 是一种命令式解释 Python 语言,主要具有 GIL-lock prevented zero-concurrency,设计一个稳健、具有故障恢复能力和协调工作的策略,但是一个纯粹的[SERIAL] 过程流,C 是一个模糊的半持久网络分布式 REQ/REP-agents 集合,B 是一个独立操作的硬件设备,与A 有一些有限的 I/O 接口-inspectable self.board-proxy,所有这些都是相互独立的,并且物理分布在给定的软件、硬件和网络生态系统中。

      昨天已经提出了硬件诊断 + 提议的系统架构方法。如果不测试self.board-托管的声纳设备延迟,没有人可以决定下一个最佳步骤,因为现实的(体内基准测试)硬件响应时间(最好也是.board的文档和它的外围传感器device(s) MUX-ed or not?PRIO-driven or MUTEX-locked or static, non-shared peripheral device, register-read-only abstracted, ... ) 是决定可能的[A, B, C, D]-coordination 的关键战略。


      ZeroMQ 部分:

      如果您评论 l.5 - REP_server_django.send(json_data.encode()) # l.5 您将进入最终块,作为 REQ/REP ZeroMQ 可扩展正式通信原型模式的原始严格形式不能再次.recv(),如果在收到第一个.recv() .send() 之后没有回复REQ 端。

      这是一个简单的问题。


      其余的不是可重现的代码。

      您可能想要:

      • 验证self.board.sonar_read( trigger_pin ) 是否收到任何值并测试这样做的延迟:

         import numpy as np
         from zmq import Stopwatch
         aClk = Stopwatch()
      
         def sonarBeep():
             try:
                  a_value   = -1
                  aClk.start()
                  a_value   = self.board.sonar_read( trigger_pin )
                  a_time_us = aClk.stop()
             except:
                  try:
                      aClk.stop()
                  finally:
                      a_time_us = -1
             finally:
                 return( a_value, a_time_us )
      

      并运行一系列 100 次声纳测试,以获得关于延迟时间的 min、Avg、StDev、MAX 读数,所有这些读数都在 [us] 中,因为这些值是基数要知道,如果要根据声纳传感器数据设计一些控制回路。

      [ aFun( [ sonarBeep()[1] for _    in range( 100 ) ]
              )                for aFun in ( np.min, np.mean, np.std, np.max )
        ]
      

      系统架构和子系统协调:

      最后但同样重要的是,可以让读取和存储声纳数据,在一个绝对独立的事件循环中,不与任何其他操作协调,只从这样的存储中读取一个状态变量,设置在一个独立工作的子系统中(如果不是作为独立的系统行为来非常节省电力)

      每当尝试紧密协调独立事件流时(在分布式系统中最糟糕的代理不协调或弱协调)设计必须在对错误和时间错位和错误的鲁棒性方面有所提高-弹力。否则系统可能很快就会死锁/活锁。

      如果有疑问,可以学习 XEROX Palo Alto 研究中心 MVC 分离的原始哲学,其中 MODEL 部分可以(并且大部分时间在 GUI 框架中) ,因为 198x+ 确实 ) 接收许多独立于其他系统组件更新的状态变量,如果他们需要它们并且需要它们,它们只会读取/使用实际状态变量的数据。同样,如果功率预算允许,SONAR 可以连续扫描场景并将读数写入任何本地寄存器,并让其他组件来询问或接受他们对最后一次实际 SONAR 读数的请求。

      ZeroMQ zen-of-zero 也是如此。

      如果这可能有帮助,请检查以这种方式正常工作的本地消息存储的 zmq.CONFLATE 模式。

      一个小提示:有人可能已经注意到,sleep( 1 / 1000 ) 是一个相当昂贵、重复执行的步骤并且很危险,因为它在 py2.x 中实际上不会休眠,因为整数除法。

      【讨论】:

      • 这是一个非常有趣的答案,可能有点超出我的能力范围。尽管如此,我已经运行了你的延迟测试。这是值:min = 64;最大值 = 147; sdtdev = 7.48;平均值 = 74.5。我运行它超过 1000 个。我对这种方法的想法是让 zmq 等待从另一个脚本接收消息以查找声纳值,并且在等待时,它可能正在运行并发任务,例如启用电机等。 .. 这是 pymata express seams 非常有趣,因为它已经实现了异步,所以我可以像我一样添加异步 zmq。我错了吗?
      • 我非常感谢您回答中的细节,但我不得不承认我并不完全了解所有内容。你是在告诉我我想做的事情是不可能的吗?有没有其他方法可以解决这个问题?基本上将声纳读取的值通过管道传输到另一个脚本。
      【解决方案4】:

      我不确定这是否能解决您的问题,但我确实发现了一些潜在问题。

      1. 不清楚如何调用 readsonar。
      2. 上下文的创建有错别字。
      3. REP_server_django.send 没有等待。

      以下是我对代码的返工(未经测试):

      import asyncio
      import zmq
      import json
      
      
      class Play:
          def __init__(self):
              self.context = zmq.asyncio.Context()
              self.REP_server_django = self.context.socket(zmq.REP)
              self.REP_server_django.bind("tcp://*:5558")
              self.event_loop = asyncio.get_event_loop()
              self.event_loop.run_until_complete(self.readsonar(4))
      
          async def readsonar(self, trigger_pin):
              i = 0
              while True:
                  ping_from_view = await self.REP_server_django.recv()  # l.1
                  value = await self.board.sonar_read(trigger_pin)  # l.2
                  print(value)  # l.3
                  json_data = json.dumps(value)  # l.4
                  # json_data = json.dumps(i) # l.4bis
                  await self.REP_server_django.send(json_data.encode())  # l.5
                  i += 1  # l.6
                  await asyncio.sleep(1 / 1000)  # l.6
      

      【讨论】:

      • 与 ZeroMQ 合作多年,如果尝试安全“杂交”{同步(阻塞)| ZeroMQ Context()-instance 的异步 } 模式(一个异步引擎,带有信号/消息传递/队列存储-mgrs/多协议工厂/接口驱动逻辑,都在一个完全独立的线程池中运行)服务与任何其他asyncio(现在包含 py3)或其他事件循环。即使尝试“重用”Tkinter-mainloop,这一直是一个技巧,如何重用不协调的演员/函子的智能框架也很难与 zmq 混合
      • 更新了原始问题以解决这一点。如果 ZMQ 不适合这里的工作。你会如何解决这个问题。基本上,我有“服务器”脚本使用 REQ/REP 请求该脚本的声纳值。我不知道如何管理这些数据
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-25
      • 2017-02-23
      • 2017-03-13
      • 2011-10-10
      • 2019-10-29
      • 1970-01-01
      相关资源
      最近更新 更多