【问题标题】:GroupBy of stream by agreggateId (Haskell / concurrency streaming)通过 agreggateId 对流进行 GroupBy(Haskell / 并发流)
【发布时间】:2018-10-17 07:38:43
【问题描述】:

上下文:我正在 CQRS 中实现一个应用程序,并且我正在尝试优化命令的处理(基本上是聚合 Id 的 1 个流)...

问题:我希望有一个接收所有命令并通过它们在不同线程上的聚合 Id 分派这些命令的第一个流:

1) 聚合中的命令以序列化方式处理
2) 聚合独立(并行)处理它们的命令。

解决方案:我正在尝试通过聚合 Id 对流执行 groupBy 基本上...为了提供帮助,我将示例简化如下:

module Sandbox where

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))

main :: IO ()
main = do
         runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
            threadId <- myThreadId
            liftIO $ putStrLn $ (show threadId) ++ "  value " ++ (show x))


getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]

所以这个脚本显示以下结果:

ThreadId 17  value 1
ThreadId 15  value 2
ThreadId 19  value 3
ThreadId 13  value 1
ThreadId 16  value 3
ThreadId 18  value 2

我期待的是这样的(没有特殊顺序,只是 x 总是在同一个线程 x1 上处理):

ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z
ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z

谢谢!!

【问题讨论】:

  • 我不明白——你认为线程 ID 是 1,2,3 吗?还是您希望这些值按1,2,3 的顺序显示?回想一下,并行执行通常不提供排序保证(否则它将是顺序执行)。
  • 我已经更新了问题,没有特殊顺序只是 x 总是在同一个线程 x1 上处理,告诉我是否更清楚

标签: haskell frp conduit haskell-pipes streamly


【解决方案1】:

在上面的代码中,parallely 决定为列表getAggregateIds 中的每个元素创建一个Haskell 线程,即[1,2,3,1,2,3]parallely 不关心列表中有一些重复的元素:它只是为每个元素启动一个线程。

原则上,parallely 只能分配少量的 Haskell 线程并在以后重用它们(可能用于相同的重复 ID 或另一个 ID),但这样做不会提高性能。实际上,这里的关键部分是分配了 Haskell 线程,而不是 OS 线程,

Haskell 线程非常轻量级,它们使用的内存非常少,因此创建和处理它们非常便宜。尝试重用它们可能会导致性能下降。

此外,Haskell 运行时可以在单个 OS 线程中执行多个 Haskell 线程。通常,运行时会保留一小部分 OS 线程,并将 Haskell 线程映射到这些线程。由于操作系统线程不像轻量级操作系统线程那样确实在 Haskell 线程之间重用。

最后,请注意ThreadId 是 Haskell 线程的名称,而不是操作系统的名称,因此看到这些 ID 没有被重用是正常的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-06
    • 1970-01-01
    • 2019-04-21
    相关资源
    最近更新 更多