【问题标题】:How create batch process in requests elixir phoenix如何在请求 elixir phoenix 中创建批处理
【发布时间】:2018-09-30 12:20:46
【问题描述】:

我必须创建一个性能出色的 api,我想用 Elixir 创建它 我有一个进程(慢),我必须在一些请求后运行。我想做这个流程

在每个请求中,将接收到的数据保存在内存中 在 x 个请求后,发送到另一个 api(或 x 秒后)

在节点中我可以这样做:

let batchData = []
const handlerRequest = (req, res) => {
  batchData.push(req. body.data)
  if (batchData > 1000) {
    // Process to send to another api
    batchData = []
  }
  res.json({ success: true })
}

或者

let batchData = []
setInterval(() => {
  if (batchData > 1000) {
    // Process to send to another api
    batchData = []
  }
}, 10000)

const handlerRequest = (req, res) => {
  batchData.push(req. body.data)
  res.json({ success: true })
}

我如何在 Elixir Phoenix 中做这样的事情?

谢谢你

【问题讨论】:

    标签: process python-requests elixir phoenix


    【解决方案1】:

    这是一种使用 GenServer 的方法。我想你想在收到第一个项目时启动计时器。

    defmodule RequestHandler do
      use GenServer
    
      @name __MODULE__
      @timeout 5_000
      @size 5
    
      def start_link(args \\ []) do
        GenServer.start_link(__MODULE__, args, name: @name)
      end
    
      def request(req) do
        GenServer.cast(@name, {:request, req})
      end
    
      def init(_) do
        {:ok, %{timer_ref: nil, requests: []}}
      end
    
      def handle_cast({:request, req}, state) do
        {:noreply, state |> update_in([:requests], & [req | &1]) |> handle_request()}
      end
    
      def handle_info(:timeout, state) do
        # sent to another API
        send_api(state.requests)
        {:noreply, reset_requests(state)}
      end
    
      defp handle_request(%{requests: requests} = state) when length(requests) == 1 do
        start_timer(state)
      end
    
      defp handle_request(%{requests: requests} = state) when length(requests) > @size do
        # sent to another API
        send_api(requests)
        reset_requests(state)
      end
    
      defp handle_request(state) do
        state
      end
    
      defp reset_requests(state) do
        state
        |> Map.put(:requests, [])
        |> cancel_timer()
      end
    
      defp start_timer(state) do
        timer_ref = Process.send_after(self(), :timeout, @timeout)
        state
        |> cancel_timer()
        |> Map.put(:timer_ref, timer_ref)
      end
    
      defp cancel_timer(%{timer_ref: nil} = state) do
        state
      end
    
      defp cancel_timer(%{timer_ref: timer_ref} = state) do
        Process.cancel_timer(timer_ref)
        Map.put(state, :timer_ref, nil)
      end
    
      defp send_api(requests) do
        IO.puts "sending #{length requests} requests"
      end
    end
    

    这里有一些测试

    iex(5)> RequestHandler.start_link
    {:ok, #PID<0.119.0>}
    iex(6)> for i <- 1..6, do: Request
    [Request, Request, Request, Request, Request, Request]
    iex(7)> for i <- 1..6, do: RequestHandler.request(i)
    sending 6 requests
    [:ok, :ok, :ok, :ok, :ok, :ok]
    iex(8)> for i <- 1..7, do: RequestHandler.request(i)
    sending 6 requests
    [:ok, :ok, :ok, :ok, :ok, :ok, :ok]
    sending 1 requests
    iex(9)> for i <- 1..3, do: RequestHandler.request(i)
    [:ok, :ok, :ok]
    sending 3 requests
    iex(10)>
    

    【讨论】:

      【解决方案2】:

      您可以使用GenServerAgent

      GenServer

      一般的想法是有一个 GenServer 进程来保存要处理的数据并处理后台处理。使用GenServer.cast/2,我们可以异步向进程发送消息。因此,每当控制器收到请求时,我们都会向队列中添加一个新项目,并检查是否达到批量大小并进行处理。

      # In Controller (page_controller.ex) module
      def index(conn, params) do
         App.BatchProcessor.add_item(params)
         conn|>json(%{success: true})
      end
      

      为 GenServer 添加模块。您可以添加一个新文件lib/batch_processor.ex

      defmodule App.BatchProcessor do
         use GenServer
         
         @batch_size 10 #whenever queue reaches this size we'll start processing
         
         def init(_) do
           initial_queue = []
           {:ok, initial_queue}
         end
         
         def start_link()do
            GenServer.start_link(__MODULE__, [], [name: __MODULE__])
         end
         
         #api function to add item to the 
         def add_item(data)do
            GenServer.cast({:add, data}, __MODULE__)
         end
      
         # implement GenServer behavior function to handle cast messages for adding item to the queue
         def handle_cast({:add, data}, queue) do
             update_queue = [data | queue] #addpend new item to front of queue
             
             #check if batch size is reached and process current batch
             if Enum.count(updated_queue) >= @batch_size do
                #send async message to current process to process batch
                GenServer.cast(__MODULE__, :process_batch)
             end
             {:noreply, updated_queue}
         end
      
         #implement GenServer behavior function to handle cast messages for processing batch
         def handle_cast(:process_queue, queue)do
           spawn(fn ->
              Enum.each(queue, fn data -> 
                 IO.inspect(data)
              end)
           end)
           {:noreply, []} # reset queue to empty
         end
      end
      

      Phoenix 应用启动时启动 BatchProcessor 进程

      #application.ex
      
      children = [
        # Start the endpoint when the application starts
        supervisor(App.Web.Endpoint, []),
        # Start your own worker by calling: App.Web.Worker.start_link(arg1, arg2, arg3)
        worker(App.BatchProcessor, []),
      ]
      

      阅读更多关于GenServer 希望这会有所帮助

      【讨论】:

        猜你喜欢
        • 2015-09-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-05-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-07-06
        相关资源
        最近更新 更多