【发布时间】:2019-04-23 03:40:37
【问题描述】:
我正在使用 Kafka 流式传输 JSON 文件,将每一行作为消息发送。其中一个键是用户的email。
然后我使用 PySpark 计算每个窗口的唯一用户数,并使用他们的电子邮件来识别他们。命令
def print_users_count(count):
print 'The number of unique users is:', count
print_users_count((lambda message: message['email']).distinct().count())
给我下面的错误。我该如何解决这个问题?
AttributeError Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())
AttributeError: 'function' object has no attribute 'distinct'
这是我的 PySpark 代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
try:
sc.stop()
except:
pass
sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})
# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
【问题讨论】:
-
你能试试这个吗:
print_users_count(parsed.map(lambda message: message['email']).distinct().count()) -
是的!我得到了
AttributeError: 'TransformedDStream' object has no attribute 'distinct'
标签: json apache-spark pyspark apache-kafka spark-streaming