【问题标题】:handling timeouts in eventmachine http server在 eventmachine http 服务器中处理超时
【发布时间】:2012-06-20 02:32:17
【问题描述】:

如何在基于 eventmachine 的 http 服务器中处理超时?我基本上将http请求信息放在处理它的队列中,然后处理可能会调用回调函数,也可能不会。我可以设置一个超时时间,但是我还没有弄清楚如何添加超时处理程序或超时回调。

我浏览了文档,但没有设法从中收集到任何有用的信息。将逻辑放入 unbind 方法显然不起作用,因为在调用 unbind 时请求已完成,并且在回调创建代码旁边添加 EM::error_handler 也不起作用。

我想捕捉超时事件并在超时事件上返回特定的 json。

这是我的代码 - HTTP 请求处理程序

class HTTPRequestHandler  < EventMachine::Connection

  def initialize(s,q,h)
    @tcpserver = s
    @queue = q
    @callback_hash = h
    self.comm_inactivity_timeout = API_REQUEST_TIMEOUT

  end

  def post_init
      @parser = RequestParser.new
  end

  def receive_data(data)
    handle_http_request if @parser.parse(data)
  end

  def parse_query_parms(query_str)
    begin
      rethash = {}
      query_arr = query_str.split(/&/)
      query_arr.each { |element|
        e_arr = element.split(/\=/)
        rethash[e_arr[0]] = e_arr[1]
      }
      return rethash
    rescue
      return nil
    end
  end

  def handle_http_request
    result = parse_query_parms(@parser.env["QUERY_STRING"]) # hash
    if result
      if result.has_key?('id') and result.has_key?('rid') and result.has_key?('json')
        puts result

        # Callback to handle this
         cb = EM.Callback{ |rid,rtime,msg|
           data = "{\"rid\":\"#{rid}\",\"rtime\":\"#{rtime}\",\"msg\":#{msg}}"
           send_data("HTTP/1.1 200 OK\r\n")
           send_data("Content-Type: application/json\r\n")
           send_data("Content-Length: #{data.bytesize}\r\n")
           send_data("\r\n")
           send_data(data)
           close_connection_after_writing
         }

         # Add callback to hash
         @callback_hash[result['rid']] = cb

         # Unencode jsonin url
         json_from_api = result['json']
         json_from_api = URI.decode(json_from_api)

         # Push request onto queue
         qreq=QueuedRequest.new(result['id'],json_from_api)
         @queue.push(qreq)

      else
        data = "{\"success\":\"false\",\"response\":\"request needs id, rid, json parameters\"}"
        send_data("HTTP/1.1 200 OK\r\n")
        send_data("Content-Type: application/json\r\n")
        send_data("Content-Length: #{data.bytesize}\r\n")
        send_data("\r\n")
        send_data("#{data}")
        close_connection_after_writing
      end
    else
      data = "{\"success\":\"false\",\"response\":\"unable to parse parameters\"}"
      send_data("HTTP/1.1 200 OK\r\n")
      send_data("Content-Type: application/json\r\n")
      send_data("Content-Length: #{data.bytesize}\r\n")
      send_data("\r\n")
      send_data("#{data}")
      close_connection_after_writing
    end
  end

end

我们初始化所有内容并处理队列的主循环:

EM.synchrony do
  h = {} # map of rids -> callbacks for requests

  # Intialize TCP and HTTP Servers
  q = EM::Queue.new # Queue of messages from HTTP Server
  s = TCPProxyServer.new(h)
  EM.start_server(LISTEN_HOST_CLIENT, LISTEN_PORT_API, HTTPRequestHandler, s, q, h)
  s.start
  puts "Server starting (http and tcp)"

  # process queue of messages coming in from API (recursive)
  process_queue = Proc.new do |qreq| 
    # Our functions
    @operation = lambda do
      puts qreq
      begin
        # Send data to channel
        if s.connections_plug[qreq.id]
          s.connections_plug[qreq.id].send_data(qreq.json)
        else
          return "unable to find id:#{qreq.id} in connection"
        end
      rescue Exception=>e
        puts "Unable to send process queued request! #{e}"
      end     
    end
    @callback = lambda { |result| }

    EM::defer(@operation,@callback) 
    EM.next_tick{ q.pop(&process_queue) }
  end
  q.pop(&process_queue)
end

【问题讨论】:

  • 设置连接超时是否有帮助:set_comm_inactivity_timeout (eventmachine.rubyforge.org/EventMachine/Connection.html#M000306)?
  • 设置超时时间,但我还没有弄清楚如何生成响应。现在我只是在超时时得到一个空响应,并且在连接关闭后发生解除绑定,所以没有用在那里放置任何响应的东西。

标签: ruby eventmachine


【解决方案1】:

根据EventMachine group 的建议,我在我的 http 请求中附加了一个计时器:

class HTTPRequestHandler  < EventMachine::Connection
  def initialize(s,q,h)
    @tcpserver = s
    @queue = q
    @callback_hash = h
    #self.comm_inactivity_timeout = API_REQUEST_TIMEOUT # handled using one off timer
  end

  def post_init
    @parser = RequestParser.new

    # Use timer to handle timeout
    @timer = EventMachine::Timer.new API_REQUEST_TIMEOUT, proc {
      data = {:err => "timeout"}
      data = data.to_json
      send_data("HTTP/1.1 200 OK\r\n")
      send_data("Content-Type: application/json\r\n")
      send_data("Content-Length: #{data.bytesize}\r\n")
      send_data("\r\n")
      send_data("#{data}")
      close_connection_after_writing
    }
  end

  def unbind
    @timer.cancel()
  end

  def receive_data(data)
  end

end

【讨论】:

    猜你喜欢
    • 2014-06-26
    • 1970-01-01
    • 2022-07-29
    • 2020-11-20
    • 2016-09-14
    • 2010-09-21
    • 1970-01-01
    • 2013-07-15
    • 1970-01-01
    相关资源
    最近更新 更多