【问题标题】:Spark Structured Streaming dynamic lookup with Redis使用 Redis 进行 Spark Structured Streaming 动态查找
【发布时间】:2021-02-12 00:09:30
【问题描述】:

我是新来的火花。 我们目前正在构建管道:

  1. 从 Kafka 主题中读取事件
  2. 借助 Redis-Lookup 丰富这些数据
  3. 将事件写入新的 Kafka 主题

所以,我的问题是,当我想使用 spark-redis 库时,它的性能非常好,但数据在我的流式传输作业中保持静态。

虽然数据在 Redis 被刷新,但它并没有反映到我的数据框中。 Spark 首先读取数据,然后从不更新它。 另外我首先从 REDIS 数据中读取,关于 1mio key-val 字符串的总数据。

我可以做什么样的方法/方法,我想使用 Redis 作为内存中的动态查找。 查找表几乎改变了 1 小时。

谢谢。

使用的库: spark-redis-2.4.1.jar commons-pool2-2.0.jar jedis-3.2.0.jar

这里是代码部分:

import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events") 


val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
        .alias("C"))


import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*") 
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )



val curated_df = my_raw_events_df 
.select(

     ...
     $"C.SystemEntryDate".alias("RecordCreateDate")
    ,$"C.Profile".alias("ProfileCode")     
    ,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
    ,$"C.IdentityType"     
    ,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")     
     ...

).as("C")



import org.apache.spark.sql.streaming.Trigger

val query = curated_df
   .select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
   .writeStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
   .option("topic", "curated-event")
   .option("checkpointLocation","/user/spark/checkPointLocation/xyz")
   .trigger(Trigger.ProcessingTime("30 seconds"))
   .start()

   query.awaitTermination()

【问题讨论】:

  • 您是否考虑过只使用 Redis Streams 而不是 Kafka?
  • 嗨 Korland,原始事件以 json 格式传入 KAfka,并且必须在流式传输时丰富它们。
  • 能否请您分享一些您采用哪种方法的详细信息。

标签: scala apache-spark redis streaming lookup


【解决方案1】:

另一种解决方案是进行流静态连接 (spark docs):

不要将 redis rdd 收集到驱动程序,而是使用 redis 数据帧 (spark-redis docs) 作为静态数据帧与您的流连接,所以它会像:

val redisStaticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(redisStaticDf, ...)   

由于 spark 微批处理执行引擎评估每个触发器的查询执行,redis 数据帧将获取每个触发器的数据,为您提供最新数据(如果您将缓存数据帧,它不会)

【讨论】:

  • 嗨@talgo10,听起来很有趣。会试试这个。但是 Redis 数据主要关注的是键值。我的意思是有些人将数据作为 key-val 放入 redis。所以我也必须考虑如何管理这部分。
【解决方案2】:

一种选择是不使用 spark-redis,而是直接在 Redis 中查找。这可以通过df.mapPartitions 函数来实现。您可以在此处https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/ 找到一些 Spark DStreams 示例。 Structural Streaming 的想法是类似的。注意正确处理 Redis 连接。

【讨论】:

猜你喜欢
  • 2020-03-19
  • 1970-01-01
  • 2023-01-20
  • 2019-03-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-08
  • 2019-10-21
  • 2020-09-12
相关资源
最近更新 更多