【问题标题】:Client connection to VerneMQ cluster客户端连接到 VerneMQ 集群
【发布时间】:2018-09-03 15:45:23
【问题描述】:

我们建立了一个 VerneMQ 3 节点集群 (https://vernemq.com/docs/clustering/)。

问题是我没有找到任何关于如何将客户端连接到整个集群的示例

例如对于单个 VerneMQ 实例,客户端连接 (in python) 是:

client.connect("xxx.xxx.xxx.xxx", 1883, 60)

其中“xxx.xxx.xxx.xxx”是单个实例的地址。
如果是集群,有没有办法指定整个集群的地址,而不是指向单个节点?

【问题讨论】:

    标签: python cluster-computing paho mqtt-vernemq


    【解决方案1】:

    我的客户端使用Paho MQTT client library 编写,我最终使用了客户端类的following 扩展:

    import random
    from paho.mqtt.client import *
    
    
    class ClusterClient(Client):
        """A subclass of paho.mqtt.Client that supports connecting to a cluster of
           mqtt brokers. connect() and connect_async() additionally accept a list
           of hostnames or host/port tuples:
    
               connect("host1")
    
               connect(["host1", "host2", "host3"]) # use default port 1883
    
               connect(["host1", ("host2", 8883), ("host3", 8883)])
    
           Hosts to connect to are chosen randomly. If a host disappears the client
           automatically connects to another host from the list.
        """
    
        def __init__(self, client_id="", clean_session=True, userdata=None,
                protocol=MQTTv311, transport="tcp"):
            super().__init__(client_id, clean_session, userdata, protocol, transport)
            self._hosts = []
    
        def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
            if isinstance(host, (list, tuple)):
                self._hosts = [(t, 1883) if isinstance(t, str) else t for t in host]
            else:
                self._hosts = [(host, port)]
    
            for host, port in self._hosts:
                if host is None or len(host) == 0:
                    raise ValueError('Invalid host.')
                if port <= 0:
                    raise ValueError('Invalid port number.')
    
            host, port = random.choice(self._hosts)
    
            super().connect_async(host, port, keepalive, bind_address)
    
        def reconnect(self):
            hosts = self._hosts[:]
            random.shuffle(hosts)
            while True:
                self._host, self._port = hosts.pop(0)
                try:
                    return super().reconnect()
                except socket.error:
                    if not hosts:
                        raise 
    

    【讨论】:

      【解决方案2】:

      没有“整个集群的具体地址”这样的东西。 (适用于 VerneMQ 和任何其他集群服务器软件)。

      如果您需要单点联系,则需要在集群前面安装一个负载平衡器。然后,您可以使用不同的负载平衡策略。

      例如,VerneMQ 可以很好地与 HAProxy 配合使用,并且支持代理协议。

      希望这会有所帮助。

      【讨论】:

      • “对于 any 其他集群服务器软件”不正确,a.i. MongoDb 集群支持 (docs.mongodb.com/manual/reference/connection-string)。
      • 谢谢,不知道。那么这是使 MongoDB 驱动程序打开与多个副本的连接的语法糖吗?对于 MQTT,这无论如何都会有所不同:允许 1 个 ClientID 1 个 TCP 连接。
      猜你喜欢
      • 1970-01-01
      • 2017-11-15
      • 1970-01-01
      • 2015-03-13
      • 2022-11-08
      • 1970-01-01
      • 2020-08-28
      • 2017-09-26
      • 2013-05-21
      相关资源
      最近更新 更多