Structured Streaming
- 在spark sql 引擎基础上建立的可扩展,容错的流处理引擎。
- 可以以处理一小批静态数据的方式处理流计算
- 当流数据到来时,spark sql 引擎会连续的增量的更新最终结果,spark sql 的所有优化都用使用到
- 通过checkpointing and Write Ahead Logs 保证端到端的 exactly-once 容错处理
- fast, scalable, fault-tolerant, end-to-end exactly-once
- spark 2.3 以前 使用 micro-batch processing engine,端对端的延迟在100 ms,端对段只有一次语义(exactly-once)
- spark 2.3 以后 使用 Continuous Processing ,延迟在 1ms 端对的最少一次语义(at-least-once)
程序模型
- 最基本的设计是将实时数据流看成一张不断添加数据的无边界的表,与批处理方式相似
基本处理

- 每个一定时间【1s】会触发一次数据操作,新的数据会作为行添加到input Table
- 然后触发 input 上的查询操作生成 结果result table
- Output Modes : 结果内容最终可以以不同模式被输出到外部存储
- Complete Mode :
- 全部更新后的result table被写入到外部存储,有外部存储接收器负责决定如何处理整个结果表
- Append Mode
- 只处理新添加到result table的数据,适用于数据不会被更新的场景,只要处理增量就可以
- select, where, map, flatMap, filter, join
- Update Mode -Available since Spark 2.1.1
- Output Sinks -- 保存目的源
- File sink , 保存到目录
- Kafka sink , 保存到kafka
- Foreach sink , 遍历处理,自己实现保存逻辑
- Console sink,测试用,打印输出
- Memory sink,测试用
- Triggers -- 批处理任务触发时机定义
- unspecified(默认)
- 不指定的情况, micro-batch 模式,每个 micro-batch 处理完成后开启一个新的 micro-batch
- Fixed interval micro-batches -- 指定间隔的 micro-batches
- 处理时间快的情况,在指定间隔内完成,等待到间隔时间开启新的micro-batches
- 处理时间慢的情况,超过指定间隔时间了,处理完成立即开启新的micro-batches
- 没有新数据的情况不会开启micro-batches
- One-time micro-batch
- Continuous with fixed checkpoint interval -- 低延迟持续处理(2.3.1 实验)
- 1 ms 端对端延迟,最少一次容错语义 ,相比微批处理 100ms延迟,exactly-once容错处理
- 每一种模式都有各自适用的查询操作
参数
- spark.sql.streaming.minBatchesToRetain
- 保存最多批次数,用于恢复,默认100
- 超出的批次数据会被删除
- 影响对象
- StreamExecution#offsetLog -- write-ahead-log,未处理批次数据
- StreamExecution#commitLog -- 完成批次记录
基于事件发生时间的窗口处理
- 时间点说明:
- 事件发生时间Event-time,如数据产生的时间点
- 数据接收时间点 receive-time, spark 流处理接收到数据的时间点
- 场景及问题点:
- 需要知道每5分钟用户的点击量
- 之前的批处理方式的spark streaming,按照批处理数据的时间点为单位,也就是spark数据接收的时间点处理
- 当由于网络原因,事件发生时间和数据接收时间差异比较大的时候,如:10:01分发生数据在10:08分才收到,那么按照原来spark streaming在10:01-10:05这个批处理就统计不到这条数据
-
- spark2.1 开始可以使用watermarking来处理数据在一定时间范围内的延后数据做聚合操作
- 操作代码,withWatermark("timestamp", "10 minutes")十分钟之内的延时收到的数据都能处理
- Structured Streaming可以按照事件发生时间Event-time为窗口处理,相当于在一个表里面统计发生时间这列的数据
- 对于超时特别久的数据可以做特殊处理
- withWatermark支持延迟数据处理
- 如果开发人员需要开发基于事件时间的有状态的流计算,则必须使用watermark机制
- Append mode
- 会保持中间状态,直到超过watermark之后,把最终结果写入Result Table,删除中间状态,所以结果出来有个延迟时间
- 指定withWatermark使用的字段,聚合操作groupby时需要使用相同字段
- df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode
- 先设定在做其他操作
- df.groupBy("time").count().withWatermark("time", "1 min") is invalid in Append output mode
- Update mode
- 每次处理都写入Result Table,在watermark之内的数据会更新旧的数据,在watermark之外的数据丢弃
- Complete mode
end-to-end exactly-once 容错
- sources -- offsets 跟踪读取的位置
- File source
- 将某个目录的文件作为数据流,支持 text, csv, json, parquet
- Kafka source
- Socket source
- sinks -- 幂等操作
- File sink,存储到文件目录
- Console sink,到控制台,测试用
- Memory sink,到内存,测试用
- execution engine -- checkpointing and write ahead logs to record the offset range of the data being processed in each trigger
问题
- 保存数据量,complete模式全量数据保存?update模式保存多少数据
相关文章:
-
2021-09-20
-
2021-10-05
-
2022-01-05
-
2021-07-20
-
2022-12-23
-
2022-12-23
-
2021-07-10
猜你喜欢
-
2022-01-14
-
2021-12-09
-
2021-10-12
-
2022-12-23
-
2021-06-07
-
2021-08-14
-
2021-05-18
相关资源
-
下载
2022-12-01
-
下载
2021-11-03
-
下载
2023-01-25
-
下载
2021-10-04
-
下载
2023-03-09