【问题标题】:Merging an async channel buffer合并异步通道缓冲区
【发布时间】:2016-11-15 06:10:08
【问题描述】:

我有一个对亚马逊进行搜索的 ISBN 列表。我已经按顺序解决了这个问题,所以现在的任务是实现并发。我使用core.async 进行了尝试。我遇到的问题是在搜索完成后,我需要能够将所有书籍合并到一个集合中,以便我可以按书籍排名对它们进行排序。由于我使用的是一个缓冲区大小为 10 的通道,我不知道该怎么做。我的方法可能完全错误。感谢您的帮助。

这里是并发函数

(def book_channel (chan 10))

(defn concurrency_test [list_of_isbns]
      ([doseq [isbn list_of_isbns]
        (go(>! book_channel(get_title_and_rank_for_one_isbn(amazon_search isbn))))])
    )
)

获取标题:

(defn get_title_and_rank_for_one_isbn [amazon_report]
  (def book_title (get-in amazon_report [:items 0 :item-atributes :title]))
  (def sales_rank(get-in amazon_report [:items 0 :SalesRank]))
  (def book_isbn(get-in amazon_report [:items 0 :asin]))
  (reduce into [[book_title] [book_isbn] [sales_rank]]))

和电话:

(def list_of_isbns (split_isbns "src/clj_amazon/isbn_list.txt"))
(concurrency_test list_of_isbns)

【问题讨论】:

    标签: asynchronous concurrency clojure


    【解决方案1】:

    你应该可以使用

    (async/reduce conj '() book-chan)
    

    创建通道中所有项目的集合,但记得关闭通道,因为在通道关闭之前reduce不会返回结果。

    【讨论】:

      【解决方案2】:

      如果您的目标是重叠所有 I/O 绑定任务(例如 amazon-search)并并行化所有 CPU 绑定任务(例如解析报告等),那么您应该研究管道功能。下面是一些伪代码说明它们的用法:

      (let [isbn>   (chan)
            report> (chan)
            out>    (chan)]
      
        ;; pipeline-async will take isbn from isbn> channel, invoke
        ;; amazon-search-async and pipe the result to report> channel.
      
        (pipeline-async 10  ;; up to 10 I/O bound requests
                        report>
                        amazon-search-asyn
                        isbn>)
      
        ;; pipeline will take report from report> channel and feed it
        ;; to the transducer (map get-title-and-rank-etc) for processing,
        ;; the processed report will be pushed to the out> channel.
      
        (pipeline (.. Runtime getRuntime availableProcessors)
                  out>
                  (map get-title-and-rank-etc)
                  report>)
      
        ;; read isbn from file and push it to isbn> channel
        (->> "isbn_list.txt"
             io/reader
             line-seq
             (onto-chan isbn>))
      
        ;; take all report from out> channel and sort it by rank & title
        (sort-by (juxt :rank :title) (<!! (async/into [] out>))))
      

      【讨论】:

        猜你喜欢
        • 2019-10-26
        • 2016-03-18
        • 2017-04-20
        • 1970-01-01
        • 2020-11-11
        • 2016-06-05
        • 2012-02-04
        • 2014-01-30
        • 2016-05-16
        相关资源
        最近更新 更多