【问题标题】:Using partitionBy() and persist() in pyspark在 pyspark 中使用 partitionBy() 和 persist()
【发布时间】:2018-03-06 14:04:03
【问题描述】:

我想优化我的脚本执行 静默分区 并在 spark 2.1 中使用 persist() 但运行我的代码时出现了一个我不明白的错误。

代码是:

rdd = sc.textFile("path").map(lambdal:l.split(";"))

rdd_pair=rdd.map(lambda a: (a[0], a)).PartitionBy(920).persist()

rdd_pair=rdd_pair.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y)

def fn(input_bag):
    output=[]
    loc0 = input_bag[0]
    for loc in input_bag[1:]:
        output.append((loc0[2],loc[2]))
        loc0 = loc
    return output

data=rdd1.map(lambda k: (k[0], fn(k[1]))).persist()

data=data.flatMap(lambda x: map(lambda e: (e, x[0]), x[1])).map(lambda x: (x[0],1)).reduceByKey(lambda p,q: p+q)

data.map(lambda x:",".join(map(str,x))).saveAsTextFile(Path)

错误如下:

Traceback (most recent call last):

  File "/home/wrjx9579/test1.py", line 12, in <module>
sc = SparkContext(conf=conf)

  File "/opt/application/Spark2/current/python/lib/pyspark.zip/pyspark
/context.py", line 118, in __ init __

  File "/opt/application/Spark2/current/python/lib/pyspark.zip/pyspark
/context.py", line 179, in _do_init

  File "/opt/application/Spark2/current/python/lib/pyspark.zip/pyspark/context.py", line 246, in _initialize_context

  File "/opt/application/Spark2/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1401, in __ call__

  File "/opt/application/Spark2/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.

: java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; 
Host Details : local host is: "..."; destination host is: "...":8032; 

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    我无法理解 fn 函数背后的逻辑,因此无法验证我的输出。 Spark 默认使用HashPartitioning。见this

    请在下面找到为以下输入提供输出的代码。

    输入:

    1;1
    2;1
    3;1
    4;2
    5;2
    6;2
    

    输出:

    >>> sc = spark.sparkContext
    >>> 
    >>> rdd = sc.textFile("/Users/pavithranrao/Desktop/test.csv").map(lambda l:l.split(";"))
    >>> 
    >>> rdd_pair=rdd.map(lambda a: (a[0], a)).partitionBy(2).persist()
    >>> rdd_pair.count()
    6
    >>> from __future__ import print_function
    >>> rdd_pair.foreach(lambda x : print(x))
    (u'2', [u'2', u'1'])
    (u'4', [u'4', u'2'])
    (u'6', [u'6', u'2'])
    (u'1', [u'1', u'1'])
    (u'3', [u'3', u'1'])
    (u'5', [u'5', u'2'])
    >>> 
    >>> rdd_pair_comb = rdd_pair.combineByKey(lambda v:[v], lambda x, y : x+[y], lambda x ,y : x+y)
    >>> rdd_pair_comb.foreach(lambda x : print(x))
    (u'2', [[u'2', u'1']])
    (u'4', [[u'4', u'2']])
    (u'6', [[u'6', u'2']])
    (u'1', [[u'1', u'1']])
    (u'3', [[u'3', u'1']])
    (u'5', [[u'5', u'2']])
    >>> 
    >>> def fn(input_bag):
    ...     output=[]
    ...     loc0 = input_bag[0]
    ...     for loc in input_bag[1:]:
    ...         output.append((loc0[2],loc[2]))
    ...         loc0 = loc
    ...     return output
    ... 
    >>> data = rdd_pair_comb.map(lambda k: (k[0], fn(k[1]))).persist()
    >>> data.count()
    6
    >>> 
    >>> from __future__ import print_function
    >>> data.foreach(lambda x : print(x))
    (u'4', [])
    (u'6', [])
    (u'1', [])
    (u'3', [])
    (u'5', [])
    

    希望这会有所帮助!

    【讨论】:

      猜你喜欢
      • 2020-06-16
      • 1970-01-01
      • 1970-01-01
      • 2023-03-17
      • 1970-01-01
      • 2015-06-26
      • 1970-01-01
      • 2019-10-23
      • 2020-07-12
      相关资源
      最近更新 更多