【问题标题】:postgres LISTEN/NOTIFY railspostgres LISTEN/NOTIFY rails
【发布时间】:2013-04-30 14:36:00
【问题描述】:

Ryan Bates 在 this episode 中讨论推送通知时提到了 Postgres 的 LISTEN/NOTIFY 功能,但我无法找到任何关于如何在我的 rails 应用程序中实现 LISTEN/NOTIFY 的提示。

这里是 pg 适配器内的 wait_for_notify 函数的文档,但我无法弄清楚它到底是做什么/设计的。

我们是否需要直接利用pg 适配器的connection 变量?

【问题讨论】:

标签: ruby-on-rails postgresql asynchronous push-notification


【解决方案1】:

您正在使用 wait_for_notify 方法寻找正确的位置,但由于 ActiveRecord 显然没有提供使用它的 API,您需要获取底层 PG::Connection 对象(或其中之一它们,如果您正在运行多线程设置)ActiveRecord 用于与 Postgres 对话。

一旦建立连接,只需执行您需要的任何LISTEN 语句,然后将一个块(和一个可选的超时期限)传递给wait_for_notify。请注意,这将阻塞当前线程并独占 Postgres 连接,直到达到超时或发生NOTIFY(例如,您不希望在 Web 请求中执行此操作)。当另一个进程在您正在收听的一个频道上发出 NOTIFY 时,将使用三个参数调用该块 - 通知的频道、触发 NOTIFY 的 Postgres 后端的 pid 和有效负载NOTIFY(如果有的话)。

我已经有一段时间没有使用 ActiveRecord 了,所以可能有一种更简洁的方法可以做到这一点,但这在 4.0.0.beta1 中似乎可以正常工作:

# Be sure to check out a connection, so we stay thread-safe.
ActiveRecord::Base.connection_pool.with_connection do |connection|
  # connection is the ActiveRecord::ConnectionAdapters::PostgreSQLAdapter object
  conn = connection.instance_variable_get(:@connection)
  # conn is the underlying PG::Connection object, and exposes #wait_for_notify

  begin
    conn.async_exec "LISTEN channel1"
    conn.async_exec "LISTEN channel2"

    # This will block until a NOTIFY is issued on one of these two channels.
    conn.wait_for_notify do |channel, pid, payload|
      puts "Received a NOTIFY on channel #{channel}"
      puts "from PG backend #{pid}"
      puts "saying #{payload}"
    end

    # Note that you'll need to call wait_for_notify again if you want to pick
    # up further notifications. This time, bail out if we don't get a
    # notification within half a second.
    conn.wait_for_notify(0.5) do |channel, pid, payload|
      puts "Received a second NOTIFY on channel #{channel}"
      puts "from PG backend #{pid}"
      puts "saying #{payload}"
    end
  ensure
    # Don't want the connection to still be listening once we return
    # it to the pool - could result in weird behavior for the next
    # thread to check it out.
    conn.async_exec "UNLISTEN *"
  end
end

有关更一般用法的示例,请参阅Sequel's implementation

编辑以添加:这是对正在发生的事情的另一种描述。这可能不是幕后的确切实现,但它似乎很好地描述了这种行为。

Postgres 为每个连接保留一个通知列表。当您使用连接执行LISTEN channel_name 时,您是在告诉 Postgres 该通道上的任何通知都应推送到此连接的列表(多个连接可以侦听同一个通道,因此单个通知最终可能会被推送到多个列表)。一个连接可以同时LISTEN 到多个频道,并且任何一个频道的通知都会被推送到同一个列表中。

wait_for_notify 所做的是将最旧的通知从连接列表中弹出并将其信息传递给块 - 或者,如果列表为空,则休眠直到通知可用并为此执行相同操作(或直到达到超时,在这种情况下它只返回 nil)。由于wait_for_notify 只处理一个通知,如果你想处理多个通知,你将不得不重复调用它。

当您UNLISTEN channel_nameUNLISTEN * 时,Postgres 将停止将这些通知推送到您的连接列表,但已经推送到该列表的通知将保留在那里,并且 wait_for_notify 在下次调用时仍会返回它们.这可能会导致在wait_for_notify 之后但UNLISTEN 之前累积的通知仍然存在并且在另一个线程检查该连接时仍然存在的问题。在这种情况下,在 UNLISTEN 之后,您可能希望以短暂的超时调用 wait_for_notify,直到它返回 nil。但是,除非您出于许多不同的目的大量使用 LISTENNOTIFY,否则可能不值得担心。

我在上面添加了指向 Sequel 实现的更好链接,我建议您查看它。这很简单。

【讨论】:

  • 只是为了澄清:如果它没有在 1/2 秒内收到响应,它会停止监听吗?
  • 如果在您指定的超时时间内没有收到响应,wait_for_notify 将返回 nil 并且不会调用该块,但通道 1 和通道 2 的 LISTEN 将一直有效,直到您 UNLISTEN。所以你的代码可以做任何其他事情,当你再次等待_for_notify时,你仍然会收听频道1和频道2。我相信如果在您没有阻止 wait_for_notify 时发生 NOTIFY,当您再次 wait_for_notify 时您会立即将其拾起,但我不确定。
  • 所以我们需要轮询wait_for_notify 才能真正得到消息?
  • 感谢您深入研究它并用外行的术语来表达。现在更有意义了。
  • @PreciousBodilyFluids 感谢您的回答。一个问题:如果我想连续听,我应该把 wait_for_notify 放在一个无限循环中。在那种情况下,我可以继续收听这些频道吗?
猜你喜欢
  • 2018-01-07
  • 2015-12-31
  • 2014-11-07
  • 2016-11-26
  • 2023-04-01
  • 2018-07-18
  • 2016-10-21
  • 2014-02-04
  • 1970-01-01
相关资源
最近更新 更多