【问题标题】:Stream and process data based on timestamp values (Using Kafka and Spark Streaming)根据时间戳值流式传输和处理数据(使用 Kafka 和 Spark Streaming)
【发布时间】:2020-10-10 16:10:41
【问题描述】:

我将尝试简化我要解决的问题。我有一个从 JSON 文件中读取的员工数据流,并且具有以下架构:

StructType([ \
  StructField("timeStamp", TimestampType()),\
  StructField("emp_id", LongType()),\
  StructField("on_duty", LongType()) ])
# on_duty is an int boolean-> 0,1

示例:

{"timeStamp": 1514765160, "emp_id": 12471979, "on_duty": 0}
{"timeStamp": 1514765161, "emp_id": 12472154, "on_duty": 1}

我想每分钟找出 2 件事,在线员工总数和非值班员工总数,并使用结构化 Spark 流处理它

这是每分钟 wrt。时间戳,而不是系统时间。

卡夫卡产品

_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         json.dumps(x).encode('utf-8'))
    
    # schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )

    with open(filepath, 'r', encoding="utf16") as f: 

        for item in json_lines.reader(f):
            dataDict.update({'timeStamp':item['timestamp'],
                    'emp_id':item['emp_id'],
                    'on_duty':item['on_duty']})
            _producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
            sleep(1)


# ^ Threading doesn't work BTW

火花流

emp_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "emp_dstream") \
  .option("startingOffsets", "latest") \
  .load() \
  .selectExpr("CAST(value AS STRING)") 

emp_data = emp_stream.select([
  get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
  for c in ["timeStamp", "emp_id", "on_duty"]])

# this query is a filler attempt which is not the end goal of the task 
query = emp_data.groupBy(["on_duty"]).count()

emp_data.writeStream \
  .outputMode("append") \
  .format("console") \
  .start() \
  .awaitTermination()

我很困惑如何继续。我是在 kafka 生产者中进行更改还是在使用 spark 处理流时进行更改?我该怎么做?

将不胜感激任何提示或帮助!


更新 根据@Srinivas 解决方案

....----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|[1970-01-18 04:46:00, 1970-01-18 04:47:00]|1970-01-18 04:46:05|1070         |[1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,....

-------------------------------------------
Batch: 40
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window                                    |timestamp          |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20            |12               |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20            |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20            |4                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:42|12            |4                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:20|4             |0                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:49|4             |0                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:44|12            |8                |
|[2017-12-31 16:02:00, 2017-12-31 16:03:00]|2017-12-31 16:02:19|8             |4                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:15|8             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:08|12            |4                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:50|8             |0                |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:27|16            |0                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:38|5             |0                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:13|4             |4                |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:36|8             |4                |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:59|24            |4                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:40|10            |0                |
+------------------------------------------+-------------------+--------------+-----------------+
only showing top 20 rows

-------------------------------------------
Batch: 41
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window                                    |timestamp          |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20            |12               |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20            |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20            |4                |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4             |0                |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4             |0                |

更新 2

如何获得这样的输出:

Time    Online_Emp  Available_Emp
2019-01-01 00:00:00 52  23
2019-01-01 00:01:00 58  19
2019-01-01 00:02:00 65  28

【问题讨论】:

  • 我自己也在解决类似的问题。文档让我有点困惑。期待找到解决方案
  • 你的意思是我确定吗?是的,这些是时间戳值。我应该将架构中的 longtype 更改为时间戳
  • 现在我明白了,我使用的时间戳以毫秒为单位,但您的时间戳不是以毫秒为单位的。你必须用这个.withColumn("timestamp",F.from_unixtime(F.col("timestamp")))改变.withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))
  • 是的,它可以工作,但所有批次都重复显示相同的信息。它不会与 kafka 生产者一起更新
  • 我刚刚做了。我是否通过逐行发送并使用 sleep(1) 来错误地生成 kafka 流?

标签: python apache-spark pyspark apache-kafka spark-streaming


【解决方案1】:

使用window 函数。

Kafka 中的示例数据

{"timeStamp": 1592669811475, "emp_id": 12471979, "on_duty": 0}
{"timeStamp": 1592669811475, "emp_id": 12472154, "on_duty": 1}
{"timeStamp": 1592669811475, "emp_id": 12471980, "on_duty": 0}
{"timeStamp": 1592669811475, "emp_id": 12472181, "on_duty": 1}
{"timeStamp": 1592669691475, "emp_id": 12471982, "on_duty": 0}
{"timeStamp": 1592669691475, "emp_id": 12472183, "on_duty": 1}
{"timeStamp": 1592669691475, "emp_id": 12471984, "on_duty": 0}
{"timeStamp": 1592669571475, "emp_id": 12472185, "on_duty": 1}
{"timeStamp": 1592669571475, "emp_id": 12472186, "on_duty": 1}
{"timeStamp": 1592669571475, "emp_id": 12472187, "on_duty": 0}
{"timeStamp": 1592669571475, "emp_id": 12472188, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472185, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472186, "on_duty": 1}
{"timeStamp": 1592669631475, "emp_id": 12472187, "on_duty": 0}
{"timeStamp": 1592669631475, "emp_id": 12472188, "on_duty": 1}
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, StructField, StructType, LongType, TimestampType

schema = StructType([ \
    StructField("timeStamp", LongType()), \
    StructField("emp_id", LongType()), \
    StructField("on_duty", LongType())])

df = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe","emp_dstream")\
    .option("startingOffsets", "earliest")\
    .load()\
    .selectExpr("CAST(value AS STRING)")\
    .select(F.from_json(F.col("value"), schema).alias("value"))\
    .select(F.col("value.*"))\
    .withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))\
    .groupBy(F.window(F.col("timestamp"), "1 minutes"), F.col("timestamp"))\
    .agg(F.count(F.col("timeStamp")).alias("total_employees"),F.collect_list(F.col("on_duty")).alias("on_duty"),F.sum(F.when(F.col("on_duty") == 0, F.lit(1)).otherwise(F.lit(0))).alias("not_on_duty"))\
    .writeStream\
    .format("console")\
    .outputMode("complete")\
    .option("truncate", "false")\
    .start()\
    .awaitTermination()

输出

+---------------------------------------------+-------------------+---------------+------------+-----------+
|window                                       |timestamp          |total_employees|on_duty     |not_on_duty|
+---------------------------------------------+-------------------+---------------+------------+-----------+
|[2020-06-20 21:42:00.0,2020-06-20 21:43:00.0]|2020-06-20 21:42:51|4              |[1, 1, 0, 1]|1          |
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|3              |[0, 1, 0]   |2          |
|[2020-06-20 21:46:00.0,2020-06-20 21:47:00.0]|2020-06-20 21:46:51|4              |[0, 1, 0, 1]|2          |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|4              |[1, 1, 0, 1]|1          |
+---------------------------------------------+-------------------+---------------+------------+-----------+

火花批处理

spark \
    .read \
    .schema(schema) \
    .json("/tmp/data/emp_data.json") \
    .select(F.to_json(F.struct("*")).cast("string").alias("value")) \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "emp_data") \
    .save()

Spark 流式传输

spark \
    .readStream \
    .schema(schema) \
    .json("/tmp/data/emp_data.json") \
    .select(F.to_json(F.struct("*")).cast("string").alias("value")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "emp_data") \
    .start()

kafka 中的 JSON 数据

/tmp/data> kafka-console-consumer --bootstrap-server localhost:9092 --topic emp_data
{"timeStamp":1592669811475,"emp_id":12471979,"on_duty":0}
{"timeStamp":1592669811475,"emp_id":12472154,"on_duty":1}
{"timeStamp":1592669811475,"emp_id":12471980,"on_duty":0}
{"timeStamp":1592669811475,"emp_id":12472181,"on_duty":1}
{"timeStamp":1592669691475,"emp_id":12471982,"on_duty":0}
{"timeStamp":1592669691475,"emp_id":12472183,"on_duty":1}
{"timeStamp":1592669691475,"emp_id":12471984,"on_duty":0}
{"timeStamp":1592669571475,"emp_id":12472185,"on_duty":1}
{"timeStamp":1592669571475,"emp_id":12472186,"on_duty":1}
{"timeStamp":1592669571475,"emp_id":12472187,"on_duty":0}
{"timeStamp":1592669571475,"emp_id":12472188,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472185,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472186,"on_duty":1}
{"timeStamp":1592669631475,"emp_id":12472187,"on_duty":0}
{"timeStamp":1592669631475,"emp_id":12472188,"on_duty":1}
^CProcessed a total of 15 messages

【讨论】:

  • 感谢您的回答,但我的控制台输出完全奇怪。我将在问题中发布我的控制台输出的一部分
  • 它与此代码一起使用 datetime.fromtimestamp(item['timestamp']).strftime('%Y-%m-%d %H:%M:%S') ,标签来自 2017-2019
  • 帮助很大:) 谢谢。一个我在其他任何地方都找不到的小问题,你如何分割一个 json 并每隔一段时间将它一点一点地发送到 kafka?一定是在unix命令中使用split的方式吧
  • No.. 可以直接使用dataframe,将字符串类型的数据和列名转换为值,然后使用格式作为Kafka发送消息
  • 更新了 Spark 批处理和流式向 kafka 发送消息的代码检查。
猜你喜欢
  • 2019-06-28
  • 2016-03-04
  • 1970-01-01
  • 1970-01-01
  • 2015-12-12
  • 2016-09-18
  • 2018-07-22
  • 1970-01-01
  • 2023-04-03
相关资源
最近更新 更多