【问题标题】:Pass a Queue to handle_read function async tcp server将队列传递给 handle_read 函数异步 tcp 服务器
【发布时间】:2026-01-25 03:55:02
【问题描述】:

我正在使用多进程来执行 2 个任务。进程 1 是一个异步 tcp 服务器,它接收命令并且必须将这些命令传递给进程 2(是一个 while true 循环)。

我如何使用多处理,进程不共享全局变量,所以我必须使用队列。但是,进程 1 是一个简单的 asynctcp 服务器。我不知道如何将队列对象传递给 handle_read 函数。

有人有想法吗?非常感谢!

我正在尝试的代码:

#!usr/bin/env python3
#import dos módulos necessarios

import time
import collections
from datetime import datetime
from datetime import timedelta
from threading import Timer
import os
import sys
from smbus import SMBus
from struct import pack, unpack
import threading
from multiprocessing import Process, Queue
import asyncore
import socket


bstatus = 0
lastdata = {}

#bytechecksum para confirmação
chksum = 15


#funções auxiliares

def millis():
    dt = datetime.now()-start_time
    ms = (dt.days*24*60*60 + dt.seconds)*1000+dt.microseconds / 1000.0  
    return ms

def getbit(data,index):
    return(data & (1<<index)!=0)

def parseData(data):
    mydata = {}

    if data[8] == 27:
        mydata['Temp1'] = data[0]
        mydata['Temp2'] = data[1]
        mydata['Temp3'] = data[2]
        mydata['Temp4'] = data[3]
        mydata['HotFlow'] = data[4]
        mydata['ColdFlow'] = data[5]
        mydata['PumpSpeed'] = data[6]
        mydata['PumpStatus'] = getbit(data[7],0)
        mydata['HeaterStatus'] = getbit(data[7],1)
        mydata['ArduinoMode'] = getbit(data[7],2)
        mydata['TimeStamp'] = timezone.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

        #pegar o modo do arduino
        arduino_mode = mydata['ArduinoMode']      
        parseStatus = True
    else:
        parseStatus = False

    return parseStatus, mydata


#classes para implmmentar o servidor assincrono

class dataHandler(asyncore.dispatcher_with_send):

    #this function doesn't working
    def __init__(self,sock,queue):
        self.queue = queue
        self.sock = sock

    def handle_read(self):
        data = self.sock.recv(50)

        '''interpretar os comandos:
        operação: Ligar/Desligar Bomba, Ligar/Desligar Aquecedor, Alterar velocidade da bomba
        Modo: trocar de modo automático para remoto
        Armazenamento: ativar ou desativar o armazenamento de dados para o trend
        '''

        if(data == b'7'):
            operation_mode = 1
            queue.put(data)
            print(data)

        elif(data == b'8'):
            operation_mode = 0
            queue.put(data)
            print(data)          

        try:
            bytescommand = pack('=cb',data,chksum)
            bus.write_block_data(arduinoAddress,ord(data),list(bytescommand))
        except Exception as err:
            print(str(err))
        finally:
            pass
            #print(data)

class Server(asyncore.dispatcher):

    def __init__(self,host,port,queue):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
        self.bind((host,port))
        self.listen(1)
        self.queue = queue 

    def handle_accept(self):
        pair = self.accept()
        if pair is None:
            return
        else:
            sock,addr = pair
            handler = dataHandler(sock,self.queue) #doesn't working


#classe para implementar a função principal

def tcpserver(queue):
    server = Server('localhost',8080,queue)
    asyncore.loop()

def mainloop(stime,ftime,queue):
    prevmillis = stime
    prevmillis2 = ftime
    operation_mode = 1
    while True:
        try:
            currentmillis2 = millis()
            if(queue.empty):
                pass
            else:
                print(queue.get())

            if(currentmillis2 - prevmillis2 > readinterval):
                #do some stuff


#programa principal

if __name__=='__main__':


    prevmillis= millis()       #contador para solicitação de dados para o arduino
    prevmillis2 = prevmillis   #contador para envio do banco

    #create Queue
    queue = Queue()

    p1 = Process(target=tcpserver,args=(queue,))
    p1.start()
    p2 = Process(target=mainloop,args=(prevmillis,prevmillis2,queue,))
    p2.start()

    strstatus = 'Servidor rodando'

    print(strstatus)

【问题讨论】:

  • 当你的程序遇到你标记为不工作的语句时会做什么?是否引发异常?如果是这样,请发布回溯。
  • 不返回任何错误。但是 queue.get() 永远不会执行,所以队列总是空的。

标签: python event-handling python-multiprocessing tcpserver asyncore


【解决方案1】:

mainloop 中,您不测试queue.empty 的返回值,而是测试函数对象本身。这总是评估True,所以看起来queue 总是空的。改成函数调用:

def mainloop(stime,ftime,queue):
    prevmillis = stime
    prevmillis2 = ftime
    operation_mode = 1
    while True:
        try:
            currentmillis2 = millis()
            if(queue.empty()):   # Added () 
                pass
            else:
                print(queue.get())

            if(currentmillis2 - prevmillis2 > readinterval):
                #do some stuff

【讨论】: