【问题标题】:Threaded posting of many requests to same url多个请求的线程发布到相同的 url
【发布时间】:2017-01-13 06:44:06
【问题描述】:

我已经构建了一个这样的 json 块:

data = {
    "url" : "http://www.example.com/post",
    "postdata" : [
        { "key1" : "value1" },
        { "key2" : "value2" },
        { "key3" : "value3" },
        { "key4" : "value4" },
        { "key5" : "value5" },
        { "key6" : "value6" }
    ]
}

我正在尝试将“postdata”中的每个块并行(例如 2 个池)发布到相同的“url”。我曾尝试使用 Faraday、Typhoeus 和 Parallel,但在这样做的过程中我没有遇到一个正常运行的用例。

理想情况下,我想使用 Typhoeus::Hydra 或 Faraday,将 'data' 对象传递给它,并使用 data['url'] 作为端点汇集 data['postdata'],但我已经空手而来。大多数情况下,我遇到了需要一个数据数组的情况,例如:

[
    { "url" : "http://...",
      "data" : { "key1" : "value1" },
    { "url" : "http://...",
      "data" : { "key2" : "value2" },
    { "url" : "http://...",
      "data" : { "key3" : "value3" }
]

但我显然希望避免这种重复。

最终目标:将潜在的 100 个 json 主体并行发布到同一个 url,一次由有限的(比如 10 个)池化。任何人都可以帮助引导我走上正确的道路吗?

免责声明:这是一个内部端点,所以没有恶意。

基于 tadman 的解决方案:

class BatchWrapper
  def initialize(data, offset)
    @data = data
    @offset = offset
  end

  def as_json
    @data['postdata'][@offset]
  end
end

q = Queue.new

data['postdata'].each_index do |i|
  q << BatchWrapper.new(data, i)
end

t = Thread.new do
  n = q.size
  n.times do |i|
    value = q.pop
    res = http.post(data['url'], value.as_json)
    puts res
    puts "consumed #{value.as_json}"
  end
end

t.join

【问题讨论】:

    标签: ruby multithreading parallel-processing faraday typhoeus


    【解决方案1】:

    通常的策略是将源数据重新映射为更易于消化的数据,然后将其拆分到多个工作线程或通过某种异步事件驱动的方法(例如您在 EventMachine 中使用)。

    线程更容易理解,然后你可以使用Queue结构来批量处理每个线程消耗的工作。

    将您的作业分解为一系列对象,将这些对象塞入队列,然后启动 N 个线程来处理这些队列。

    由于您使用的是线程,因此可以使用共享数据。例如,您可以有一个瘦包装对象,给定该格式的结构,捕获您发送的偏移量:

    class BatchWrapper
      def initialize(data, offset)
        @data = data
        @offset = offset
      end
    
      def as_json
        @data['postdata'][@offset]
      end
    end
    

    然后,只需为您打算发出的每个请求填充这些对象之一:

    q = Queue.new
    
    data['postdata'].each_index do |i|
      q << BatchWrapper.new(data, i)
    end
    

    然后您可以在工作人员中旋转队列:

    Thread.new do
      while (wrapper = q.pop)
        # Send it...
        make_request(wrapper.to_json)
      end
    end
    

    包装器方法允许您准确组合从该主对象共享的数据以及特定于请求的数据,并对数据本身进行任何必要的重组。 as_json 方法返回 to_json 方法最终编码的内容,因此您可以完全控制。

    【讨论】:

    • 谢谢。
    • 详细说明这一点,我最终得到了“t = Thread.new do”的内部,如下: n = q.size ; n.times 做 |我| ;值 = q.pop ; res = req(data['url'], value.as_json) ; puts res 似乎已经工作了,因为它为每次迭代(因为它正在弹出)和每个 json 块(value.as_json)输出索引 1。
    • 在一般代码中,队列大小可能会随时间而变化,因此最好不要依赖于任何特定长度。一个简单的方法可以在所有工作人员完成后将其杀死,即在其中注入 N 个表示“停止处理”的作业并将线程从循环中中断。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-09
    • 1970-01-01
    • 1970-01-01
    • 2014-02-27
    相关资源
    最近更新 更多