【问题标题】:Redis Pub/Sub causes web socket connection to hangRedis Pub/Sub 导致 Web 套接字连接挂起
【发布时间】:2015-08-10 06:28:17
【问题描述】:

我正在构建一个通过 Web 套接字连接到服务器的 Web 应用程序。服务器组件是一个基于sinatraredisfaye-websocket 的小型Ruby 应用程序。服务器在 Phusion Passenger 上运行。一个单独的更新程序守护进程不断从各种来源拉取更新并将它们发布到 redis(使用 redis gem 和 Redis::publish)。

为了将更新推送给客户端,我在我的 Sinatra 应用中尝试了以下操作:

get '/' do
  if Faye::WebSocket.websocket?(request.env)
    store = Redis.new
    ws = Faye::WebSocket.new(request.env)

    ws.on(:open) do |event|
      store.incr('connection_count')
      puts 'Client connected (connection count: %s)' % store.get('connection_count')
    end

    ws.on(:close) do |event|
      store.decr('connection_count')
      puts 'Client disconnected (connection count: %s)' % store.get('connection_count')
    end

    ws.rack_response

    store.subscribe(:updates) do |on|
      on.message do |ch, payload|
        puts "Got update"
        ws.send(payload) if payload
      end
    end
  end
end

这仅部分有效。客户端可以成功连接并接收更新,但 store.incrstore.decr 调用不起作用。此外,连接似乎没有正确关闭——当我启动多个客户端时,我注意到连接堆积起来,乘客服务器最终停止工作。

日志输出:

devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update

当我注释掉以下块时,跟踪连接突然起作用了:

store.subscribe(:updates) do |on|
  on.message do |ch, payload|
    puts "Got update"
    ws.send(payload) if payload
  end
end

日志输出:

devserver_1 | App 1028 stdout: Client connected (connection count: 1)
devserver_1 | App 1039 stdout: Client connected (connection count: 2)
devserver_1 | App 1039 stdout: Client disconnected (connection count: 1)
devserver_1 | App 1028 stdout: Client disconnected (connection count: 0)

所以使用Redis::subscribe 似乎会以某种方式干扰网络套接字连接。

我该如何解决这个问题?

  • Phusion 乘客版本 4.0.58
  • ruby 2.2.0p0(2014-12-25 修订版 49005)[x86_64-linux-gnu]
  • 西纳特拉 (1.4.6)
  • faye-websocket (0.9.2)

【问题讨论】:

    标签: ruby websocket redis sinatra


    【解决方案1】:

    我认为这里的问题是 Faye 使用 EventMachine,这意味着您的线程上有一个反应器正在处理事件,并调用您的回调 ws.on(:open)ws.on(:close)

    现在当你击球时

    store.subscribe(:updates) do |on|
      on.message do |ch, payload|
        puts "Got update"
        ws.send(payload) if payload
      end
    end
    

    这是一个阻塞操作——它完全阻塞了当前线程。如果你当前的线程被阻塞,reactor 不能监听事件然后调用你的回调。

    对此的一种解决方案是在不同的线程上运行您的store.subscribe,这样它是否会阻塞该线程并不重要。

    但我认为更好的解决方案是使用non-blocking version of the Redis library:

    来自文档:

    redis = EM::Hiredis.connect
    pubsub = redis.pubsub
    
    pubsub.subscribe(:updates).callback do
        puts "Got update"
        ws.send(payload) if payload
    end
    

    这两个(Redis + Faye)都应该在 EventMachine 反应器循环中注册,以便将事件分派给两者。

    【讨论】:

    • 我应该指出,Redis 指南规定您不应将相同的连接用于发布和订阅。所以store应该分成store用于发布和数据,store_sub用于订阅。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-09-03
    • 1970-01-01
    • 2023-03-23
    • 1970-01-01
    • 1970-01-01
    • 2019-10-14
    • 1970-01-01
    相关资源
    最近更新 更多