【问题标题】:Asynchronous job queue for web service in ClojureClojure 中 Web 服务的异步作业队列
【发布时间】:2013-01-18 08:20:59
【问题描述】:

目前,我正在尝试使用 RESTful API 构建一个 Web 服务,以处理一些长时间运行的任务(作业)。

这个想法是用户通过执行 POST 提交作业,该 POST 返回一些用于检查作业状态的 URL,其中还包含结果的 URL。一旦作业完成(即某些值被写入数据库),结果 URL 将返回适当的信息(而不是没有结果),并且作业 url 将指示完成状态。

不幸的是,计算非常密集,因此一次只能运行一个,因此需要对作业进行排队。

在伪中需要这样的东西

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {}))

(defn schedule-job [params] 
  ;; schedules the job into the queue and 
  ;; adds the job to a jobs map for checking status via GET
  ;; note that the job should not  be evaluated until popped from the queue
)

(POST "/analyze" [{params :params}] 
 (schedulde-job params))

(GET "job/:id" [:d] 
 (get @jobs id))

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete
;; Note: should not terminate when queue is empty! 

我研究了Lamina,它允许异步处理,但它似乎不适合我的需要。

我的问题是如何使作业队列出队并在前一个完成后执行其任务,而不是在队列为空时终止,即永久处理传入的作业。

【问题讨论】:

  • 我不太擅长clojure,所以没有帮助,只是一个旁注:为什么返回204?我觉得返回一条实际消息(仅 200 条)可能会更好,其中的正文说它尚未完成或类似的内容?
  • 你是对的,这实际上可能会更好,因为我错过了“如果客户端是用户代理,它不应该改变导致发送请求的文档视图” HTTP 规范,否则我猜“无内容”会更合适。
  • 我总是喜欢 REST API 不尝试在 http 上播放。例如,如果我请求一些数据(例如某人的帐户但 ID 错误),我更喜欢在正文中获得 200 并提及没有这样的人,而不是例如 404。这感觉有点相同。但这是题外话,对此感到抱歉;)

标签: web-services clojure queue jobs lamina-clojure


【解决方案1】:

java.util.concurrent.ExecutorService 可能是您想要的。这允许您提交作业以供以后执行,并返回一个 Future,您可以查询以发现它是否已完成。

(import '[java.util.concurrent Callable Executors])

(def job-executor
  (Executors/newSingleThreadExecutor))

(def jobs (atom {}))

(defn submit-job [func]
  (let [job-id   (str (java.util.UUID/randomUUID))
        callable (reify Callable (call [_] (func))]
    (swap! jobs assoc job-id (.submit job-executor callable))
    job-id))

(use 'compojure.core)

(defroutes app
  (POST "/jobs" [& params]
    (let [id (submit-job #(analyze params))]
      {:status 201 :headers {"Location" (str "/jobs/" id)}}))
  (GET "/jobs/:id" [id]
    (let [job-future (@jobs id)]
      (if (.isDone job-future)
        (.get job-future)
        {:status 404}))))

【讨论】:

  • 不能将ExecutorService 替换为内置的future
【解决方案2】:

这似乎符合我的预期,但似乎相当不习惯。有人对如何改善这一点有想法吗?

;; Create a unique identifier
(defn uuid [] (str (java.util.UUID/randomUUID)))

;; Create a job-queue and a map for keeping track of the status
(def job-queue (ref clojure.lang.PersistentQueue/EMPTY))
(def jobs (atom {}))

(defn dequeue! [queue-ref]
  ;; Pops the first element off the queue-ref
  (dosync 
    (let [item (peek @queue-ref)]
      (alter queue-ref pop)
      item)))

(defn schedule-job! [task] 
  ;; Schedule a task to be executed, expects a function (task) to be evaluated
  (let [uuid (uuid)
        job (delay task)]
    (dosync 
      (swap! jobs assoc uuid job) 
      (alter job-queue conj job))))

(defn run-jobs []
  ;; Runs the jobs 
  (while true
    (Thread/sleep 10)
    (let [curr (dequeue! job-queue)] 
      (if-not (nil? curr) (@curr)))))

(.start (Thread. run-jobs))

【讨论】:

    【解决方案3】:

    您的描述似乎是一个多生产者和单一消费者的场景。 下面是一个示例代码(您可以将其与 REST 内容以及可能的一些 异常处理,以免代理死亡)

    (def worker (agent {}))                                                                                                                              
    
    (defn do-task [name func]                                                                                                                            
      (send worker                                                                                                                                       
            (fn [results]                                                                                                                                 
              (let [r (func)]                                                                                                                            
                (assoc results name r)))))
    
    ;submit tasks                                                                                                               
    (do-task "uuid1" #(print 10))                                                                                                                        
    (do-task "uuid2" #(+ 1 1))
    
    ;get all results
    (print @worker) 
    

    【讨论】:

      猜你喜欢
      • 2019-11-04
      • 1970-01-01
      • 2019-07-17
      • 2012-08-16
      • 1970-01-01
      • 2020-11-25
      • 1970-01-01
      • 2015-12-15
      • 1970-01-01
      相关资源
      最近更新 更多