【问题标题】:Grouping event with fs2.Stream使用 fs2.Stream 对事件进行分组
【发布时间】:2019-03-08 21:28:42
【问题描述】:

我的事件流如下:

sealed trait Event

val eventStream: fs2.Stream[IO, Event] = //...

我想对在一分钟内收到的这些事件进行分组(即从每分钟的 0 秒到 59 秒)。 fs2 听起来很简单

val groupedEventsStream = eventStream groupAdjacentBy {event => 
    TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}

问题在于分组功能不纯。它使用currentTimeMillis。我可以按如下方式解决这个问题:

stream.evalMap(t => IO(System.currentTimeMillis(), t))
  .groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))

问题是我想避免使用带有元组的笨拙样板。还有其他解决方案吗?

或者在这种情况下使用不纯函数可能不是那么糟糕?

【问题讨论】:

    标签: scala functional-programming stream fs2


    【解决方案1】:

    您可以使用cats.effect.Clock 删除一些样板:

    def groupedEventsStream[A](stream: fs2.Stream[IO, A])
                              (implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
      stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
        .groupAdjacentBy(_._1)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-08-09
      • 1970-01-01
      • 1970-01-01
      • 2015-01-25
      • 2021-10-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多