【发布时间】:2021-12-16 13:14:58
【问题描述】:
我正在尝试动态更改 mqtt 客户端处理程序和证书,当订阅者和发布者连接时,这会导致订阅者 EOF
这就是我想要做的,
1] 我正在初始化订阅者/发布者(使用 firstPubHandler、firstConnectHandler 和默认证书)
2] 使用发布者在服务器上发送注册消息以获取新证书详细信息
3] 服务器将返回证书详细信息,该响应将由主题 .../id/Certificate 上的 firstConnectHandler 处理以下载证书。
4] firstPubHandler 将处理服务器的响应并重新初始化发布者/订阅者(使用 messagePubHandler、connectHandler 和新下载的证书),connectHandler 将侦听所有主题 /id/+
一切正常,除了当我重新初始化订阅者/发布者时,订阅者不断断开连接并出现错误“EOF”
我在这里做错了吗?或者有没有更好的方法来做到这一点? 任何帮助表示赞赏
-- 主函数
var opt Params
var publisher mqtt.Client
var subscriber mqtt.Client
func main() {
InitializeBroker(firstPubHandler, firstConnectHandler)
//Ultimately it will trigger message on ".../id/Certificate" topic which will be handled byfirstConnectHandler
PublishRegistrationMessage(publisher)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)
go func() {
for {
}
}()
<-done
<-c
DisconnectBrocker()
}
-- 处理程序
// First handlers
var firstPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
DownloadCertificates(msg.Payload())
InitializeBroker(messagePubHandler, connectHandler)
}
var firstConnectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
if token := c.Subscribe(opt.SubClientId+"/id/Certificate", 0, firstPubHandler); token.Wait() && token.Error() != nil {
log.Error(token.Error())
}
}
// Second handlers
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
ProcessMessage(msg.Payload())
}
var connectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
if token := c.Subscribe(opt.SubClientId+"/id/+", 0, messagePubHandler); token.Wait() && token.Error() != nil {
log.Error(token.Error())
}
}
// Common handler
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Info(err)
}
-- Mqtt 代理初始化
func InitializeBroker(lMessageHandler mqtt.MessageHandler, lConnectHandler mqtt.OnConnectHandler) {
statusPublishTopic := opt.PubClientId/id
nodeSubscribeTopic := opt.SubClientId/id
// Build the options for the publish client
publisherOptions := mqtt.NewClientOptions()
publisherOptions.AddBroker(opt.Broker)
publisherOptions.SetClientID(statusPublishTopic)
publisherOptions.SetDefaultPublishHandler(lMessageHandler)
publisherOptions.OnConnectionLost = connectLostHandler
// Build the options for the subscribe client
subscriberOptions := mqtt.NewClientOptions()
subscriberOptions.AddBroker(opt.Broker)
subscriberOptions.SetClientID(nodeSubscribeTopic)
subscriberOptions.SetDefaultPublishHandler(lMessageHandler)
subscriberOptions.OnConnectionLost = connectLostHandler
subscriberOptions.OnConnect = lConnectHandler
if !opt.NoTLS {
tlsconfig, err := NewTLSConfig()
if err != nil {
log.Fatalf(err)
}
subscriberOptions.SetTLSConfig(tlsconfig)
publisherOptions.SetTLSConfig(tlsconfig)
}
publisher = mqtt.NewClient(publisherOptions)
if token := publisher.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf(token.Error())
}
subscriber = mqtt.NewClient(subscriberOptions)
if token := subscriber.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf(token.Error())
}
}
func NewTLSConfig() (config *tls.Config, err error) {
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(rootCert)
if err != nil {
return nil, err
}
certpool.AppendCertsFromPEM(pemCerts)
cert, err := tls.LoadX509KeyPair(nodeCertFilePath, pvtKeyFilePath)
if err != nil {
return nil, err
}
config = &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
Certificates: []tls.Certificate{cert},
}
return config, nil
}
【问题讨论】:
-
您确定这是异常行为吗?优雅地结束现有连接将是应用新重新初始化的配置的明显方法
-
@daniel-farrell 我认为重新初始化时断开一次是很正常的,但是重新初始化后它一直显示EOF,不确定是否正常。