【发布时间】:2016-11-28 00:54:39
【问题描述】:
我是 Spark 的新手,我绝对需要一些帮助来对来自 Kafka Stream 的推文进行分类。下面我将解释到目前为止我已经完成的步骤过程以及我被卡住的地方。
我希望你们中的一些人可以帮助我解决这个问题。
提前致谢。
上下文如下:
我有一个简单的 Kafka Producer,它模拟推文的流(从文件中读取)和一个 TweetAnalyzer Consumer,它应该在 上处理和分类推文Spark Streaming Context,一旦收到它们。
为了对收到的推文进行分类,我之前在磁盘上构建并存储了 TF-IDF 和 朴素贝叶斯 模型,它们在 Spark Streaming Context 启动。
对于每条处理的推文(词干、标点符号等),我应该计算其 TF-IDF 向量(特征向量),并分别利用之前加载的 IDF 和朴素贝叶斯模型对其进行分类。
直截了当,当我必须将推文的 词频向量 (TF) 转换为其 TF-IDF 向量 时,我的问题就出现了。
这是代码:
卡夫卡制作人
text_file = list(
csv.reader(
open('/twitterDataset/twitter/test_data.txt', 'rU')
)
)
for row in text_file:
time.sleep(1)
jd = json.dumps(row).encode('ascii')
producer.send(kafka_topic,jd)
推文分析器
#setting configuration
...
#reading configuration
...
#setting Kafka configuration
...
# Create Spark context
sc = SparkContext(
appName = app_name,
master = spark_master
)
# Create Streaming context
ssc = StreamingContext(
sc,
int(spark_batch_duration)
)
# Loading TF MODEL and compute TF-IDF
....
kafkaParams = {'metadata.broker.list"': kafka_brokers}
# Create direct kafka stream with brokers and topics
kvs = KafkaUtils.createDirectStream(
ssc,
[kafka_topic],
{"metadata.broker.list": kafka_brokers}
)
obj1 = TweetPreProcessing()
lines = kvs.map(lambda x: x[1])
tweet = lines.flatMap(obj1.TweetBuilder)
hashingTF = HashingTF()
#computing TF for each tweet
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.map(lambda x: IDF().fit(x))
.pprint()
ssc.start()
ssc.awaitTermination()
在最后几行代码中,我无法在 x 上应用 IDF().fit(x) 函数,因为 Spark 需要“词频向量的 RDD”,而在这一点上,由于 Streaming Spark 上下文,我有一个“Transformed DStream”。
我尝试使用 transform() 或 foreachRDD() 函数代替 map(),但我没有不知道如何在转换后正确返回一个新的 DStream。
例如:
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.transform(classify_tweet)
.pprint()
def classify_tweet(tf):
#compute TF-IDF of the tweet
idf = IDF().fit(tf)
tf_idf = idf.transform(tf)
#print(tf_idf.collect())
return idf
如果我使用转换函数运行代码,Spark 会触发(在回溯顶部)此错误:
文件 “/workspace_spark/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py”,第 67 行,调用中 return r._jrdd
AttributeError: 'IDFModel'目的 没有属性'_jrdd'
但是,如果我省略 return 语句并简单地打印 tf_idf 向量,它会给我正确的输出,如下所示:
[SparseVector(1048576, {164998: 0.0, 364601: 0.0, 924192: 0.0, 963449: 0.0})]
[SparseVector(1048576, {251465: 0.0, 821055: 0.0, 963449: 0.0})]
[SparseVector(1048576, {234762: 0.0, 280973: 0.0, 403903: 2:0.0, 71273) 0.0, 861562: 0.0, 1040690: 0.0})] ...
如果我做对了,我认为问题是当它需要 DStream 时,我不能返回 SparseVector。
不管怎样,这个问题有解决办法吗?
如果有人能帮我解决这个问题,我将非常感激,我很悲惨地被卡住了。
谢谢
【问题讨论】:
标签: apache-spark pyspark spark-streaming apache-spark-mllib sentiment-analysis