【问题标题】:Asyncronous Threading Python OCPP异步线程 Python OCPP
【发布时间】:2022-01-16 00:14:48
【问题描述】:

我正在尝试在 python 中实现 ocpp 库。有两个函数在 while 循环中连续运行,cp.start() 用于记录日志,cp.heartbeat 作为协议实习生心跳。当我想在我的例程中正常实现它们时,while 循环会阻塞事件循环,所以我希望它们作为线程。但图书馆似乎有问题。

async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_3',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_3', ws)

        def start_logging(loop):
            asyncio.set_event_loop(loop)
            loop.create_task(cp.start())
            loop.run_forever()

        loop = asyncio.get_event_loop()
        t = threading.Thread(target=start_logging, args=(loop,))
        t.start()
   
        await asyncio.gather(cp.send_heartbeat())


if __name__ == '__main__':
    asyncio.run(main())

错误:

ConnectionResetError: [WinError 995] Der E/A-Vorgang wurde wegen eines Threadendes oder einer Anwendungsanforderung abgebrochen

AssertionError
ERROR:asyncio:Error on reading from the event loop self pipe
loop: <ProactorEventLoop running=True closed=False debug=False>

AssertionError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-5' coro=<ChargePoint.start() done, defined at C:\Users\sasko\AppData\Local\Programs\Python\Python39\lib\site-packages\ocpp\charge_point.py:121> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>

即使我将线程设置为守护进程,心跳也会起作用,但我不能再关闭程序了。 最终的目标是让 cp.start() 和 heartbeat 在一个线程中运行,这样我就可以在另一个逻辑中控制电动汽车的充电过程。

【问题讨论】:

    标签: python asynchronous async-await python-asyncio


    【解决方案1】:

    看github上的代码库,所有你想调用的函数都是协程。它们可能包含无限循环,但其中包含await 语句,这使它们将控制权交还给事件循环。因此,据我所知,任何事情都不需要使用线程。摘自 1.6 版的示例:

    async def main():
        async with websockets.connect(
            'ws://localhost:9000/CP_1',
            subprotocols=['ocpp1.6']
        ) as ws:
    
            cp = ChargePoint('CP_1', ws)
    
            await asyncio.gather(cp.start(), cp.send_boot_notification())
    

    我想这应该让你开始。

    编辑:

    好的,以上仍然有效。我回答了你的问题,但你真正需要的是了解这个 API 应该如何工作。我告诉你他们的例子有些令人困惑,我认为你不会绕过阅读他们的文档。但是我从代码中了解到的要点是,您需要将中心类 ChargePoint 子类化,这在示例中并不清楚,因为它们将子类命名为与基类​​相同。我会尽量让他们的例子更清楚。我希望我理解正确...:

    # simplified and commented version of the v1.6 example
    import asyncio
    import logging
    import websockets
    
    from ocpp.routing import on
    from ocpp.v16 import call
    from ocpp.v16 import ChargePoint as cp # this is the baseclass renamed to cp
    from ocpp.v16.enums import Action, RegistrationStatus
    
    logging.basicConfig(level=logging.INFO)
    
    
    class ChargePoint(cp): # inheriting from cp, now called ChargePoint (again)
    
        @on(Action.SomeMessage) # this decorator adds your function to a mapping of hooks for that message/event
        def on_some_message(*args, **kwargs):
            pass # do something which probably got something to do with charging something
            asyncio.create_task(self.some_coro()) # create async task from sync code
    
        # add more decorated functions to implement your logic
    
        async def some_coro(self):
            pass # do something with I/O
    
        async def send_boot_notification(self):
            request = call.BootNotificationPayload(
                charge_point_model="Optimus",
                charge_point_vendor="The Mobility House"
            )
    
            response = await self.call(request)
    
            if response.status == RegistrationStatus.accepted:
                print("Connected to central system.")
    
    
    async def main():
        async with websockets.connect(
            'ws://localhost:9000/CP_1',
            subprotocols=['ocpp1.6']
        ) as ws:
    
            cp = ChargePoint('CP_1', ws) # going full circle, naming the instance the same as the rebound baseclass :-/
    
            # this seems initializing, maybe not do it concurrently
            await cp.send_boot_notification()
    
            # this starts the infinite loop which receives and relays
            # messages to their respective hooks
            # (you get the concurrency you wanted out of threads by registering
            # your own hooks (pieces of code)
            await cp.start() # main() stays here until you somehow shut it down
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    

    显然我无法对此进行测试,也无法向您保证这是他们的意图,但我希望它有所帮助。

    【讨论】:

    • 是的,这是“文档”中的代码。与此相关的问题是,当您想要添加新事件时,例如可能来自协议的 StartTransaction.req,因为 cp.start() 包含无限循环,这件事永远不会完成。从技术上讲,它可以等待,但最后一行下面的所有内容都不会被执行。所以我的问题是,除了消息循环之外,如何实际运行一些充电逻辑。我对此的想法是创建一个单独的线程,然后在主循环中返回消息。这可能吗?
    • 异步代码是并发的,没有线程。我想我看到了你的问题。查看asyncio.create_task 将协程包装在任务中,该任务将很快执行,但对create_task 的调用将立即返回。也许我可以稍后为您的代码编写一个示例。
    • 使用来自 github repo 的扩展示例进行编辑。
    • 非常感谢,我想我现在明白了
    猜你喜欢
    • 2020-10-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-14
    • 2018-03-02
    • 1970-01-01
    相关资源
    最近更新 更多