【问题标题】:Clojure concurrency : Automating SQL QueriesClojure 并发:自动化 SQL 查询
【发布时间】:2013-11-11 11:03:49
【问题描述】:

我有一个小程序,它应该一个一个地读取 SQL 查询/命令并针对数据库执行它们。

如果一个查询成功执行,则执行下一个查询。 如果执行一个查询出错,程序应该一起停止执行。

我有代码,只是查询在出现异常时仍会继续执行。

(defn main
   []
   (loop [queries (get-all-queries)
          querycount 1]
     (let [q (first queries)]
        (println (format "currently processing query %s", querycount))
        (cond (nil? q) (println "All Queries ran successfully.")
              :else (do
                      (cond (= (:status (process-query q querycount)) "OK") 
                               (recur (rest queries) (+querycount 1)))
                      :else (println "An error occured while running queries")))))))


 (defn process-query
     [query query-count]
     (let [{query-body :query-body, is-query-running? :is-query-running?} query
           my-agent (agent 
                       {:error false, :query-count query-count} 
                       :error-handler handler-fn)]
        (send my-agent (fn[_]
                          (execute-query! db query-body)))))
        (loop [is-query-running? (is-query-running?)
               error? (:error @my-agent)]
           (cond error? (do (println "Error") 
                            {:status "ERROR" :error-msg (:error-msg @my-agent)})
           (and (not is-query-running?) (not error?)) (do (println "Success") 
                                                          {:status "OK"})
           (:else (do
                    (Thread/sleep 2000)
                    (recur (is-query-running?) (:error @my-agent)))))))


(defn handler-fn
  [agent exception]
  (println (format "an exception occured : %s" exception))
  (if (instance? java.sql.BatchUpdateException exception)
      (println (.getNextException exception)))
  (send agent (? [_] {:error true, :error-message exception}))
  (throw exception))

我使用代理的原因是我有一些查询需要 4 小时才能运行。 当这种情况发生时,数据库不会通知程序查询已经完成。相反,程序卡住了。所以,相反,我不断轮询以检查查询是否已经完成。

  • 这是完成我想做的事情的最佳方式吗?
  • 我应该使用任何其他并发原语吗?
  • 我什至需要并发原语吗?
  • 这个问题我想了很久。

【问题讨论】:

  • is-query-running? 是如何工作的。从您的代码中不清楚,因为它作为每个查询映射中的值传入,并且可能对问题至关重要,因为如果 is-query-running? 在代理的 :error 键之前变为 false,process-query 将返回 "OK"成为真的。
  • 查询正在运行吗?运行查询并检索所有主动执行的查询,然后检查当前查询是否在运行的查询中。如果是,则返回 true。
  • 如果它返回true 而不是它本身,您应该在recur 调用process-query 中收到一条错误消息。 Clojure 是一个 Lisp-1,因此返回值 true 会影响 is-query-running?loop 内的函数定义。
  • 老兄,请将您的源代码格式化为最多 80 列。
  • @Igrapenthin,对不起伙计。代码格式化。

标签: sql concurrency clojure functional-programming


【解决方案1】:

我认为您需要使用 core.async 来解决这种工作流程 看看http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html
这个库将让您检查您的条件与所涉及的相关异步任务

一些可能对您有帮助的资源
http://www.infoq.com/news/2013/07/core-async
https://www.youtube.com/watch?v=AhxcGGeh5ho

【讨论】:

    【解决方案2】:

    主要问题似乎是:一方面,您写道长查询永远不会返回,即它们甚至不会抛出异常。另一方面,代理的错误检测机制是基于捕获异常。

    我认为您需要做的不是(主要)检查是否捕获到异常,而是当is-query-running? 返回 false 时,execute-query 是否实际上返回了有效结果。

    关于正确的并发原语,我建议使用未来而不是代理。它们比代理更简单,因为它们只能返回一个值,而不是多次更改其状态,并且它们的错误处理方式是简单地返回异常而不是常规返回值。

    然后您可以遵循这个实现思路:在循环中,在将来执行一个带有超时的deref。如果deref 的返回值是execute-query! 定期返回的值,则返回"OK"(分别在future 正文中添加第二个表达式作为可清晰识别的返回值,例如关键字:ok)。否则,如果deref 的返回值是异常,则像现在一样从异常中返回"ERROR":error-msg。最后,如果返回值是你给deref 的超时值,调用is-query-running?。如果为真,则再次循环,如果为假,则返回 ERROR 并带有特殊的 :error-msg,它表示您的查询已结束,既不返回也不抛出异常。 (并且可能会调用future-cancel,这样您就不会泄漏永无止境的execute-query! 调用线程。)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-05-17
      • 1970-01-01
      • 1970-01-01
      • 2018-12-20
      • 2021-11-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多