【发布时间】:2018-10-17 07:38:43
【问题描述】:
上下文:我正在 CQRS 中实现一个应用程序,并且我正在尝试优化命令的处理(基本上是聚合 Id 的 1 个流)...
问题:我希望有一个接收所有命令并通过它们在不同线程上的聚合 Id 分派这些命令的第一个流:
1) 聚合中的命令以序列化方式处理
2) 聚合独立(并行)处理它们的命令。
解决方案:我正在尝试通过聚合 Id 对流执行 groupBy 基本上...为了提供帮助,我将示例简化如下:
module Sandbox where
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))
main :: IO ()
main = do
runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
threadId <- myThreadId
liftIO $ putStrLn $ (show threadId) ++ " value " ++ (show x))
getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]
所以这个脚本显示以下结果:
ThreadId 17 value 1
ThreadId 15 value 2
ThreadId 19 value 3
ThreadId 13 value 1
ThreadId 16 value 3
ThreadId 18 value 2
我期待的是这样的(没有特殊顺序,只是 x 总是在同一个线程 x1 上处理):
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
谢谢!!
【问题讨论】:
-
我不明白——你认为线程 ID 是
1,2,3吗?还是您希望这些值按1,2,3的顺序显示?回想一下,并行执行通常不提供排序保证(否则它将是顺序执行)。 -
我已经更新了问题,没有特殊顺序只是 x 总是在同一个线程 x1 上处理,告诉我是否更清楚
标签: haskell frp conduit haskell-pipes streamly