【问题标题】:How to process events in batches with elixir flow如何使用 Elixir Flow 批量处理事件
【发布时间】:2018-01-10 23:25:57
【问题描述】:

我有一个 csv_file,其中 a.) 首先,每行需要转换为 xml 和 b.) 其次,转换后的 xml 将被发送到 rails 端进行一些数据库写入操作。

下面是我的 Flow 代码。

flow = csv_rows
 |> Flow.from_enumerable()
 |> Flow.partition
 |> Flow.map(&(CSV.generate_xml(&1)))
 |> Flow.map(&(CSV.save_to_rails_databse(&1)))
 |> Flow.run

对于小的 csv 文件,Everyting 工作正常,但是当 csv_file 非常大(假设 20,000 条)记录时,执行第二个操作(即在 rails 端写入数据库)试图同时插入两条多条记录时间,由于elixir同时向rails端发送了太多请求,因此数据库达到了峰值。

处理 50 个批次中的事件是否会很好,min_demandmax_demand 在这种情况下是否有用。

【问题讨论】:

    标签: elixir


    【解决方案1】:

    您可以使用Flow.map_state/2 接收特定状态的整个状态(在您的情况下,由于您正在映射,状态将是该批次中的事件)。

    你会想在这里使用三个参数,都给 from_enumerable:

    • min_demand:这实际上是批量大小
    • max_demand:阶段之间变化的最大行数
    • stages:处理数据的并发阶段数。在您的情况下,同时处理多少批次

    其他一些注意事项:

    • 您不需要分区,因为您没有进行任何分组
    • 考虑使用NimbleCSV,它允许将 CSV 作为流使用 - 如果 CSV 太大,这有助于内存使用
    • 在这个例子中你可能根本不需要 Flow,Task.asycn_stream/3 就足够了

    当我们在 Flow 上工作时,我们能够获得一些 Flow 课程并将其应用到 Elixir。其中一课产生了Task.async_stream/3,当您想要在没有reduce 阶段的情况下映射集合时,这很有用,这正是您所拥有的:

    batch_size = 100
    
    # 8 tasks running at the same time and we don't care about the results order
    async_options = [max_concurrency: 8, ordered: false]
    
    csv_rows
    |> Stream.chunk(batch_size)
    |> Task.async_stream(fn batch -> 
      batch
      |> Enum.map(&CSV.generate_xml/1)
      |> CSV.save_to_batch_rails_database()
    end, async_options)
    |> Stream.run()
    

    我没有测试过代码,但它应该可以提供足够的指导。它应该和 Flow 一样快,但没有额外的依赖。

    【讨论】:

    • 感谢async_stream 的提及,这也正是我目前所需要的
    猜你喜欢
    • 2023-03-15
    • 2019-02-20
    • 1970-01-01
    • 2021-09-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多