【问题标题】:How to create channel with socket in Python如何在 Python 中使用套接字创建通道
【发布时间】:2014-09-23 09:05:56
【问题描述】:

我之前已经启动过几次 Python,现在我正在创建一个套接字服务器。我已经让服务器与具有多个客户端的多个线程一起运行(万岁!)但我正在寻找我无法调用的功能(我什至不知道它是否存在)我想创建一种客户端通道可以发送不同类型的消息。

我创建了一个通道 INFO 的示例,如果服务器接收到这种类型的套接字,它只会进行打印

我创建了另一个通道 DEBUG,我可以在其中发送服务器将执行的自定义命令

在非编程语言中它会这样做:

def socketDebug(command):
     run command

def socketInfo(input):
     print input

if socket == socketDebug:
     socketDebug(socket.rcv)
else:
   if socket == socketInfo:
     socketInfo(socket.rcv)

我希望我很清楚。

【问题讨论】:

    标签: python sockets websocket publish-subscribe channel


    【解决方案1】:

    这是一个非常简单的 Channel 类实现。它创建一个套接字,接受 来自客户端的连接并发送消息。它本身也是一个客户, 从其他 Channel 实例接收消息(例如在单独的进程中)。

    通信是在两个线程中完成的,这很糟糕(我会使用异步 io)。什么时候 收到一条消息,它会调用接收线程中注册的函数 可能会导致一些线程问题。

    每个 Channel 实例都创建自己的套接字,但它的可扩展性要高得多 具有由单个实例多路复用的频道“主题”。

    一些现有的库提供“通道”功能,例如nanomsg

    这里的代码用于教育目的,如果它可以帮助...

    import socket
    import threading
    
    class ChannelThread(threading.Thread):
      def __init__(self):
        threading.Thread.__init__(self)
    
        self.clients = []
        self.chan_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.chan_sock.bind(('',0))  
        _, self.port = self.chan_sock.getsockname()
        self.chan_sock.listen(5)
        self.daemon=True
        self.start()
    
      def run(self):
        while True:
          new_client = self.chan_sock.accept()
          if not new_client:
            break
          self.clients.append(new_client)
    
      def sendall(self, msg):
        for client in self.clients:
          client[0].sendall(msg)
    
    class Channel(threading.Thread):
      def __init__(self):
        threading.Thread.__init__(self)
    
        self.daemon = True
        self.channel_thread = ChannelThread()
    
      def public_address(self):
        return "tcp://%s:%d" % (socket.gethostname(), self.channel_thread.port)
    
      def register(self, channel_address, update_callback):
        host, s_port = channel_address.split("//")[-1].split(":")
        port = int(s_port)
        self.peer_chan_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)   
        self.peer_chan_sock.connect((host, port))
        self._callback = update_callback
        self.start()
    
      def deal_with_message(self, msg):
        self._callback(msg)
    
      def run(self):
        data = ""
        while True:
          new_data = self.peer_chan_sock.recv(1024)
          if not new_data:
            # connection reset by peer
            break
          data += new_data
          msgs = data.split("\n\n")
          if msgs[-1]:
            data = msgs.pop()
          for msg in msgs:
            self.deal_with_message(msg)
    
      def send_value(self, channel_value):
        self.channel_thread.sendall("%s\n\n" % channel_value)
    

    用法:

    在进程A中:

    c = Channel()
    c.public_address()
    

    在进程B中:

    def msg_received(msg):
      print "received:", msg
    
    c = Channel()
    c.register("public_address_string_returned_in_process_A", msg_received)
    

    在进程A中:

    c.send_value("HELLO")
    

    在进程B中:

    received: HELLO
    

    【讨论】:

    • 感谢您的回答。我会去的。我也在探索 Redis Pub/Sub 系统。
    • 这只是一个示例,用于教育目的。如果有帮助,我很高兴。 Redis 发布/订阅系统很棒(Redis 很棒!)。
    猜你喜欢
    • 2016-03-18
    • 1970-01-01
    • 1970-01-01
    • 2018-08-11
    • 2021-12-18
    • 1970-01-01
    • 2015-02-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多