【发布时间】:2018-11-15 21:13:26
【问题描述】:
我尝试执行一个示例来寻找使用 python 3 和 pyspark 的朋友
from pyspark import SparkContext, SparkConf
file = "D:\\jsonFIle\\Live.txt"
out_file = "D:\\jsonFIle\\friends.txt"
conf = SparkConf().setAppName("Common Friends").setMaster("local").set("spark.driver.memory", "5g")
#sc = SparkContext(conf=conf)
sc = SparkContext.getOrCreate()
f = sc.textFile(file)
def check(line):
if len(line.split()) > 1:
return len((line.split())[1].split(',')) > 1
return False
adlists = f.filter(check).map(lambda line: line.split()).persist()
adlists = adlists.map(lambda l: (l[0], l[1].split(','))).persist()
def form_pairs(tup):
l = []
for elem1 in tup[1]:
for elem2 in tup[1]:
if elem1 < elem2:
l.append(((elem1, elem2), 1))
return l
pairs = adlists.flatMap(form_pairs)
cf = pairs.reduceByKey(lambda x,y: x + y)
sim = sorted(cf.collect())
out = open(out_file, "w")
for tuple in sim:
s = str(tuple[0][0]) + "," + str(tuple[0][1]) + "\t" + str(tuple[1]) + "\n"
out.write(s)
out.close()
print("done")
这是一个例子,所以我知道它必须有效。 我按照在线指南安装了 pyspark https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c
如果我在 anaconda 提示“pyspark”上运行,则直接启动 jupyter 并且在导入 pyspark 时我没有任何问题。
在 anaconda 提示符下我看到了这个日志
[IPKernelApp] WARNING | Unknown error in handling PYTHONSTARTUP file C:\opt\spark\spark-2.2.2-bin-hadoop2.7\python\pyspark\shell.py:
[I 21:47:47.447 NotebookApp] Adapting to protocol v5.1 for kernel 6d4c3d50-648f-4d2d-858b-5df642386e14
[Stage 0:=========================> (15 + 8) / 33][I 21:49:28.441 NotebookApp] Saving file at /Downloads/ProvaSpark.ipynb
[I 21:51:28.434 NotebookApp] Saving file at /Downloads/ProvaSpark.ipynb
[Stage 2:===> (2 + 8) / 33][I 22:03:28.929 NotebookApp] Saving file at /Downloads/ProvaSpark.ipynb
[I 22:05:28.916 NotebookApp] Saving file at /Downloads/ProvaSpark.ipynb
[I 22:09:28.917 NotebookApp] Saving file at /Downloads/ProvaSpark.ipynb
我在执行期间看到阶段 0 和 2 在计算期间增加,并且在完成停止时增加。我的输出文件是空白的,我不明白为什么。
我的朋友尝试在他的机器上运行,但有时会出错,必须重新启动 Jupyter 内核并执行更多时间才能进行正确的计算。
有什么建议吗?
【问题讨论】:
标签: python apache-spark pyspark