【问题标题】:How to adapt the IReduceInit from next.jdbc to stream JSON using cheshire to a HTTP response using ring如何调整来自 next.jdbc 的 IReduceInit 以使用 cheshire 将 JSON 流式传输到使用 ring 的 HTTP 响应
【发布时间】:2020-01-23 21:43:38
【问题描述】:

tl;博士如何将 IReduceInit 转换为转换值的惰性序列

我有一个数据库查询,它产生了一个相当大的数据集,用于在客户端上进行实时旋转(百万或两行,25 个属性 - 对于现代笔记本电脑来说没问题)。

我的(简化的)堆栈是调用 clojure.jdbc 来获得(我认为是惰性的)结果行序列。我可以通过 ring-json 中间件将其作为主体传递出去,从而将其序列化。 ring-json 在堆上构建响应字符串存在问题,但从 0.5.0 开始可以选择将响应流式传输出去。

通过分析几个失败案例,我发现实际上 clojure.jdbc 是在将整个结果集交还给内存之前实现的。没问题!我决定改用新的 next.jdbc,而不是在那个库中使用 reducible-query

next.jdbc 中的关键操作是 plan,它返回一个 IReduceInit,我可以使用它来运行查询并获取结果集...

(into [] (map :cc_id) (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
["675192"]

但是,这实现了整个结果集,在上述情况下,我会预先提供所有的 id 并在内存中。对一个人来说不是问题,但我通常有很多。

IReduceInit 计划是一个我可以减少的东西,如果我给出一个起始值,所以我可以在减少函数中做输出......(谢谢@amalloy)

(reduce #(println (:cc_id %2)) [] (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
675192
nil

...但理想情况下,我想在对它们应用转换函数后将此 IReduceInit 转换为值的惰性序列,因此我可以将它们与 ring-json 和 cheshire 一起使用。我没有看到任何明显的方法。

【问题讨论】:

  • 您可能想在 Clojure 邮件列表(Google 群组)或 Clojure Slack 频道上提出这个问题。

标签: clojure ring transducer cheshire


【解决方案1】:

我的惰性序列是一个坏主意的原因有很多——即使我保证不抱头,结果流期间的异常问题无疑会使 ResultSet 闲置——序列化将发生在远离可以清理的调用堆栈。

懒惰的需求是由不想在内存中实现整个结果的愿望驱动的,需要 seq 或其他 coll?以便中间件将其序列化...

因此,直接让IReduceInit JSONable,然后绕过中间件。如果在序列化过程中出现异常,控件将从 next.jdbc 通过 IReduceInit,然后可以进行有意义的清理。

;; reuse this body generator from my patch to ring.middleware.json directly, as the coll? check will fail
(defrecord JsonStreamingResponseBody [body options]
  ring-protocols/StreamableResponseBody
  (write-body-to-stream [_ _ output-stream]
    (json/generate-stream body (io/writer output-stream) options)))
 
;; the year long yak is shaved in 8 lines by providing a custom serialiser for IReduceInits…
(extend-type IReduceInit
  cheshire.generate/JSONable
  (to-json [^IReduceInit results ^JsonGenerator jg]
    (.writeStartArray jg)
    (let [rf (fn [_ ^IPersistentMap m]
               (cheshire.generate/encode-map m jg))]
      (reduce rf nil results))
    (.writeEndArray jg)))

;; at this point I can wrap the result from next.jdbc/plan with ->JsonStreamingResponseBody into the :body of the ring response and it will stream

编写这些功能仍然感觉需要做很多工作,适配器代码总是让我担心我错过了一种简单、惯用的方法。

【讨论】:

    【解决方案2】:

    IReduceInit 使 JDBC 资源在 reduce 函数退出时被清盘。 这比 LazySeq 方法更可预测,后者可能永远不会释放 JDBC 资源。

    您使用 BlockingQueue 和未来的任务来填充该队列,如下所示

     (defn lazywalk-reducible
      "walks the reducible in chunks of size n,
      returns an iterable that permits access"
      [n reducible]
      (reify java.lang.Iterable
        (iterator [_]
          (let [bq (java.util.concurrent.ArrayBlockingQueue. n)
                finished? (volatile! false)
                traverser (future (reduce (fn [_ v] (.put bq v)) nil reducible)
                                  (vreset! finished? true))]
            (reify java.util.Iterator
              (hasNext [_] (or (false? @finished?) (false? (.isEmpty bq))))
              (next [_] (.take bq)))))))
    

    如果生成了迭代器但没有遵循它的结论,这当然会造成泄漏。

    我没有彻底测试过,可能还有其他问题;但这种方法应该行得通。

    如果 Java Iterable 对您的用例不够好,您也可以使其具体化 clojure.lang.ISeq;但随后您开始进入 HeadRetention 问题;以及如何处理对Object first() 的呼叫,这将是非常可行的,但我不想过分考虑这一点

    【讨论】:

    • 谢谢本!这让我困惑了很长时间,但认为与你讨论这件事有助于让我将注意力重新集中在真正的问题上。
    【解决方案3】:

    令人沮丧。

    为什么你不能用 JDBC 做到这一点呢?没有任何 Clojure 层?

    (let [resultset (.executeQuery connection "select ...")]
      (loop 
       (when (.next resultset)
         (let [row [(.getString resultset 1)
                    (.getString resultset 2)
                    ...]])
         (json/send row)
         (recur)))
      (json/end))
    

    当然,使用 ResultSetMetaData,您可以自动将行生成到一个可以处理任何返回的函数中。

    【讨论】:

    • 所以我认为我的下一步是改变 Cheshire 以产生一个归约函数......但你是对的,只是改为一个命令式发送循环可能会给我更多的牵引力。只是我想编写惯用的抽象。高效,抽象,二选一。在实践中,我暂时删除了一个维度,这样我得到的结果就少了一个数量级,并且不会阻塞 JVM。
    • 理解结果集性质的东西必须携带在结果集中的位置。您可能也可以使用 sql 端的光标来执行此操作?或者您必须编写一个评估器,将所有功能逻辑折叠为结果集上的一系列操作。
    • 我只是要在我们进行的时候转换结果集,所以最坏的情况是有人拿着转换后的 seq 的头部,最好的情况是他们放开了它,所以整个事情在它之后卷起来。就像,使用惰性序列很容易,但是你把完成逻辑放在哪里。
    【解决方案4】:

    reduce 可以与 IReduceInit 配合使用。 IReduceInit 需要一个初始值,这是您在调用 .reduce 时指定的,但在使用 reduce 函数时不需要;这就解释了为什么你看到一个工作,而另一个却没有。

    但是,这不会给您带来惰性序列。 reduce 的一部分契约是它急切地消耗整个输入(我们将忽略 reduced,它不会改变任何有意义的东西)。您的问题是动态范围更普遍问题的一个具体案例:JDBC 生成的序列仅在某些上下文中“有效”,您需要在此上下文中进行所有处理,因此不能偷懒。相反,你通常把你的程序翻过来:不要将返回值用作序列,而是向查询引擎传递一个函数并说,“请用你的结果调用这个函数”。然后,引擎在调用该函数时确保数据有效,并在函数返回后清理数据。我不知道 jdbc.next,但是对于较旧的 jdbc,您会为此使用类似 db-query-with-resultset 的东西。您可以向它传递一些函数,该函数可以将字节添加到待处理的 HTTP 响应中,并且它会多次调用该函数。

    这有点含糊,因为我不知道您使用的是什么 HTTP 处理程序,也不知道它有什么设施可以非延迟地处理流式响应,但这是您必须遵循的一般想法想要处理动态范围的资源:懒惰不是一种选择。

    【讨论】:

    • 感谢您的回答 - 为了未来读者的利益,我将编辑我对如何减少结果的误解。这会让你的第一个 para 看起来很奇怪。
    猜你喜欢
    • 2018-02-19
    • 2012-04-02
    • 2023-04-01
    • 2013-08-20
    • 2013-09-20
    • 1970-01-01
    • 1970-01-01
    • 2022-01-06
    • 1970-01-01
    相关资源
    最近更新 更多