【问题标题】:How to direct stream(kafka) a JSON file in spark and convert it into RDD?如何在火花中直接流(kafka)JSON文件并将其转换为RDD?
【发布时间】:2019-08-15 20:25:28
【问题描述】:

写了一个代码,当文件被给定(在生产者中)时直接流(kafka)字数统计

代码:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

需要使用 Dstream 将输入的 json 文件转换为 spark Dataframe。

【问题讨论】:

    标签: apache-spark pyspark apache-kafka apache-spark-sql


    【解决方案1】:

    这应该可行:

    一旦你的变量包含 TransformedDStream kvs,你就可以创建一个 DStream 映射并将数据传递给一个处理函数,如下所示:

    data = kvs.map( lambda tuple: tuple[1] )
    data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
    

    您应该定义应该使用您的 JSON 数据创建数据帧的处理函数:

    def readMyRddsFromKafkaStream( readRdd ):
      # Put RDD into a Dataframe
      df = spark.read.json( readRdd )
      df.registerTempTable( "temporary_table" )
      df = spark.sql( """
        SELECT
          *
        FROM
          temporary_table
      """ )
      df.show()
    

    希望对我的朋友有帮助:)

    【讨论】:

      猜你喜欢
      • 2019-04-24
      • 2015-03-25
      • 2018-02-22
      • 1970-01-01
      • 1970-01-01
      • 2016-05-12
      • 2016-05-26
      • 2017-04-27
      • 2017-02-28
      相关资源
      最近更新 更多