【问题标题】:Apache Flink stream scalingApache Flink 流缩放
【发布时间】:2017-02-19 02:30:54
【问题描述】:
我是 Apache Flink 的新手,我正在尝试了解一些关于与 Kafka 一起扩展 Flink 流作业的最佳实践。我无法找到合适答案的一些问题包括:
- 您可以/应该运行多少个流式作业?运行太多流是否存在可扩展性问题?多少算太多?
- 如果我们运行 2000 个流来满足业务需求,那么管理这些流的最佳方法是什么?
- 从一个流读取流数据到另一个流的首选方法是什么?我们可以加入流、执行连续查询等吗?
提前感谢您的支持,如果这些问题看起来有些基本,我们深表歉意,但我正在努力更好地处理这项技术。我已经阅读了大部分文档,但由于我在这方面缺乏经验,我承认可能不会将一些概念放在一起。感谢您的帮助!
【问题讨论】:
标签:
streaming
apache-kafka
apache-flink
【解决方案1】:
流的数量没有限制,flink 将根据 Job Manager/Task Manager 的内存/CPU、正在使用的并行化和插槽数进行扩展。我使用 YARN 来管理资源。如果连接的流的数量很高,那么我们需要小心一点,不是所有/大部分处理都发生在某些任务管理器上,因为这会减慢进程。 kafka 流本身可能存在延迟,或者由于某些任务管理器负载过重而导致的内部延迟肯定会出现,因此需要进行预防性检查。
连续查询支持已作为最新 flink 版本的一部分构建,您可以查看 flink 文档。
-
如果通过将一个数据流读取到另一个数据流,您的意思是在 flink 术语中连接两个流,那么我们可以将它们连接到一个公共键上并保持一个值状态。请注意,值状态在任务管理器中维护,并且不跨任务管理器共享。否则,如果您暗示合并两个或多个流,我们可以构建平面图函数,使来自这些流的数据采用标准格式。
联合示例:
val stream1: DataStream[UserBookingEvent] = BookingClosure.getSource(runmode).getSource(env)
.map(new ClosureMapFunction)
val stream2: DataStream[UserBookingEvent] = BookingCancel.getSource(runmode).getSource(env)
.map(new CancelMapFunction)
val unionStream: DataStream[UserBookingEvent] = stream1.union(stream2)
---
import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
override def map(in: String): Option[UserBookingEvent] = {
val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
try {
implicit lazy val formats = org.json4s.DefaultFormats
val json = parse(in)
..............
} catch {
case e: Exception => {
LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
None
}
}
}
}