【发布时间】:2020-10-10 16:10:41
【问题描述】:
我将尝试简化我要解决的问题。我有一个从 JSON 文件中读取的员工数据流,并且具有以下架构:
StructType([ \
StructField("timeStamp", TimestampType()),\
StructField("emp_id", LongType()),\
StructField("on_duty", LongType()) ])
# on_duty is an int boolean-> 0,1
示例:
{"timeStamp": 1514765160, "emp_id": 12471979, "on_duty": 0}
{"timeStamp": 1514765161, "emp_id": 12472154, "on_duty": 1}
我想每分钟找出 2 件事,在线员工总数和非值班员工总数,并使用结构化 Spark 流处理它
这是每分钟 wrt。时间戳,而不是系统时间。
卡夫卡产品
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
# schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )
with open(filepath, 'r', encoding="utf16") as f:
for item in json_lines.reader(f):
dataDict.update({'timeStamp':item['timestamp'],
'emp_id':item['emp_id'],
'on_duty':item['on_duty']})
_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
sleep(1)
# ^ Threading doesn't work BTW
火花流
emp_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "emp_dstream") \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value AS STRING)")
emp_data = emp_stream.select([
get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
for c in ["timeStamp", "emp_id", "on_duty"]])
# this query is a filler attempt which is not the end goal of the task
query = emp_data.groupBy(["on_duty"]).count()
emp_data.writeStream \
.outputMode("append") \
.format("console") \
.start() \
.awaitTermination()
我很困惑如何继续。我是在 kafka 生产者中进行更改还是在使用 spark 处理流时进行更改?我该怎么做?
将不胜感激任何提示或帮助!
更新 根据@Srinivas 解决方案
....----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|[1970-01-18 04:46:00, 1970-01-18 04:47:00]|1970-01-18 04:46:05|1070 |[1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,....
-------------------------------------------
Batch: 40
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window |timestamp |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20 |12 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20 |4 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:42|12 |4 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:20|4 |0 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:49|4 |0 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:44|12 |8 |
|[2017-12-31 16:02:00, 2017-12-31 16:03:00]|2017-12-31 16:02:19|8 |4 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:15|8 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:08|12 |4 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:50|8 |0 |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:27|16 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:38|5 |0 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:13|4 |4 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:36|8 |4 |
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:59|24 |4 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:40|10 |0 |
+------------------------------------------+-------------------+--------------+-----------------+
only showing top 20 rows
-------------------------------------------
Batch: 41
-------------------------------------------
+------------------------------------------+-------------------+--------------+-----------------+
|window |timestamp |Online_emp|Available_emp|
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:04:00, 2017-12-31 16:05:00]|2017-12-31 16:04:53|20 |12 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:44|20 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:47|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:27|20 |4 |
|[2017-12-31 16:03:00, 2017-12-31 16:04:00]|2017-12-31 16:03:10|4 |0 |
|[2017-12-31 16:05:00, 2017-12-31 16:06:00]|2017-12-31 16:05:25|4 |0 |
更新 2
如何获得这样的输出:
Time Online_Emp Available_Emp
2019-01-01 00:00:00 52 23
2019-01-01 00:01:00 58 19
2019-01-01 00:02:00 65 28
【问题讨论】:
-
我自己也在解决类似的问题。文档让我有点困惑。期待找到解决方案
-
你的意思是我确定吗?是的,这些是时间戳值。我应该将架构中的 longtype 更改为时间戳
-
现在我明白了,我使用的时间戳以毫秒为单位,但您的时间戳不是以毫秒为单位的。你必须用这个
.withColumn("timestamp",F.from_unixtime(F.col("timestamp")))改变.withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000)) -
是的,它可以工作,但所有批次都重复显示相同的信息。它不会与 kafka 生产者一起更新
-
我刚刚做了。我是否通过逐行发送并使用 sleep(1) 来错误地生成 kafka 流?
标签: python apache-spark pyspark apache-kafka spark-streaming