【发布时间】:2018-11-29 06:59:10
【问题描述】:
我有一个将 Pyspark 流数据转换为数据帧的代码。我需要将此数据帧存储到 Hbase 中。帮我另外写代码。
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: sql_network_wordcount.py <hostname> <port> ",
file=sys.stderr)
exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(host, int(port))
def process(time, rdd):
print("========= %s =========" % str(time))
try:
words = rdd.map(lambda line :line.split(" ")).collect()
spark = getSparkSessionInstance(rdd.context.getConf())
linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"])
linesDataFrame.show()
except :
pass
lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
【问题讨论】:
标签: dataframe pyspark apache-spark-sql hbase bigdata