【问题标题】:Count number of users per window using PySpark使用 PySpark 计算每个窗口的用户数
【发布时间】:2019-04-23 03:40:37
【问题描述】:

我正在使用 Kafka 流式传输 JSON 文件,将每一行作为消息发送。其中一个键是用户的email

然后我使用 PySpark 计算每个窗口的唯一用户数,并使用他们的电子邮件来识别他们。命令

def print_users_count(count):
    print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())

给我下面的错误。我该如何解决这个问题?

AttributeError                            Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
      2     print 'The number of unique users is:', count
      3 
----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'

这是我的 PySpark 代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:
    sc.stop()
except:
    pass  

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()

【问题讨论】:

  • 你能试试这个吗:print_users_count(parsed.map(lambda message: message['email']).distinct().count())
  • 是的!我得到了AttributeError: 'TransformedDStream' object has no attribute 'distinct'

标签: json apache-spark pyspark apache-kafka spark-streaming


【解决方案1】:

您没有将 lambda 函数应用于任何事物。 message 引用的是什么?不是 lambda 函数就是这样,一个函数。这就是为什么你得到AttributeError: 'function' object has no attribute 'distinct'。它没有应用于任何数据,因此它没有返回任何数据。您需要引用键 email 所在的数据框。

请参阅 pyspark.sql.functions.countDistinct(col, *cols)pyspark.sql.functions.approx_count_distinct pyspark docs 的 pyspark 文档。这应该是获得唯一计数的更简单的解决方案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-08-27
    • 2020-04-03
    • 2019-08-16
    • 2021-04-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-01
    相关资源
    最近更新 更多