【问题标题】:What is the equivalent to scala.util.Try in pyspark?pyspark 中的 scala.util.Try 等价于什么?
【发布时间】:2016-01-27 18:25:10
【问题描述】:

我有一个糟糕的 HTTPD access_log,只想跳过“糟糕”的行。

在 scala 中这很简单:

import scala.util.Try

val log = sc.textFile("access_log")

log.map(_.split(' ')).map(a => Try(a(8))).filter(_.isSuccess).map(_.get).map(code => (code,1)).reduceByKey(_ + _).collect()

对于 python,我通过使用“lambda”表示法显式定义一个函数来获得以下解决方案:

log = sc.textFile("access_log")

def wrapException(a):
    try:
        return a[8]
    except:
        return 'error'

log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()

在 pyspark 中是否有更好的方法(例如在 Scala 中)?

非常感谢!

【问题讨论】:

    标签: python scala apache-spark pyspark


    【解决方案1】:

    更好是一个主观术语,但您可以尝试几种方法。

    • 在这种特殊情况下,您可以做的最简单的事情就是避免任何异常。您只需要flatMap 和一些切片:

      log.flatMap(lambda s : s.split(' ')[8:9])
      

      如您所见,这意味着不需要异常处理或后续的filter

    • 之前的想法可以用一个简单的包装器来扩展

      def seq_try(f, *args, **kwargs):
          try:
              return [f(*args, **kwargs)]
          except:
              return []
      

      和示例用法

      from operator import div # FYI operator provides getitem as well.
      
      rdd = sc.parallelize([1, 2, 0, 3, 0, 5, "foo"])
      
      rdd.flatMap(lambda x: seq_try(div, 1., x)).collect()
      ## [1.0, 0.5, 0.3333333333333333, 0.2]
      
    • 终于有了更多面向对象的方法:

      import inspect as _inspect
      
      class _Try(object): pass    
      
      class Failure(_Try):
          def __init__(self, e):
              if Exception not in _inspect.getmro(e.__class__):
                  msg = "Invalid type for Failure: {0}"
                  raise TypeError(msg.format(e.__class__))
              self._e = e
              self.isSuccess =  False
              self.isFailure = True
      
          def get(self): raise self._e
      
          def __repr__(self):
              return "Failure({0})".format(repr(self._e))
      
      class Success(_Try):
          def __init__(self, v):
              self._v = v
              self.isSuccess = True
              self.isFailure = False
      
          def get(self): return self._v
      
          def __repr__(self):
              return "Success({0})".format(repr(self._v))
      
      def Try(f, *args, **kwargs):
          try:
              return Success(f(*args, **kwargs))
          except Exception as e:
              return Failure(e)
      

      和示例用法:

      tries = rdd.map(lambda x: Try(div, 1.0, x))
      tries.collect()
      ## [Success(1.0),
      ##  Success(0.5),
      ##  Failure(ZeroDivisionError('float division by zero',)),
      ##  Success(0.3333333333333333),
      ##  Failure(ZeroDivisionError('float division by zero',)),
      ##  Success(0.2),
      ##  Failure(TypeError("unsupported operand type(s) for /: 'float' and 'str'",))]
      
      tries.filter(lambda x: x.isSuccess).map(lambda x: x.get()).collect()
      ## [1.0, 0.5, 0.3333333333333333, 0.2]
      

      你甚至可以使用multipledispatch的模式匹配

      from multipledispatch import dispatch
      from operator import getitem
      
      @dispatch(Success)
      def check(x): return "Another great success"
      
      @dispatch(Failure)
      def check(x): return "What a failure"
      
      a_list = [1, 2, 3]
      
      check(Try(getitem, a_list, 1))
      ## 'Another great success'
      
      check(Try(getitem, a_list, 10)) 
      ## 'What a failure'
      

      如果您喜欢这种方法,我已将更完整的实现推送到 GitHubpypi

    【讨论】:

      【解决方案2】:

      首先,让我生成一些随机数据以开始使用。

      import random
      number_of_rows = int(1e6)
      line_error = "error line"
      text = []
      for i in range(number_of_rows):
          choice = random.choice([1,2,3,4])
          if choice == 1:
              line = line_error
          elif choice == 2:
              line = "1 2 3 4 5 6 7 8 9_1"
          elif choice == 3:
              line = "1 2 3 4 5 6 7 8 9_2"
          elif choice == 4:
              line = "1 2 3 4 5 6 7 8 9_3"
          text.append(line)
      

      现在我有一个字符串text 看起来像

        1 2 3 4 5 6 7 8 9_2
        error line
        1 2 3 4 5 6 7 8 9_3
        1 2 3 4 5 6 7 8 9_2
        1 2 3 4 5 6 7 8 9_3
        1 2 3 4 5 6 7 8 9_1
        error line
        1 2 3 4 5 6 7 8 9_2
        ....
      

      你的解决方案:

      def wrapException(a):
          try:
              return a[8]
          except:
              return 'error'
      
      log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()
      
      #[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)]
      

      这是我的解决方案:

      from operator import add
      def myfunction(l):
          try:
              return (l.split(' ')[8],1)
          except: 
              return ('MYERROR', 1) 
      log.map(myfunction).reduceByKey(add).collect()
      #[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)]
      

      评论:

      (1) 我强烈建议也计算带有“错误”的行,因为它不会增加太多开销,并且还可以用于完整性检查,例如,所有计数加起来应该是总数日志中的行,如果您过滤掉这些行,您不知道这些行是真正的坏行,或者您的编码逻辑出现问题。

      (2) 我会尽量将所有行级操作打包到一个函数中,避免mapfilter函数的链接,这样可读性更好。

      (3) 从性能的角度来看,我生成了一个 1M 记录的样本,我的代码在 3 秒内完成,你的代码在 2 秒内完成,这不是一个公平的比较,因为数据太小而且我的集群非常强大,我会建议您生成一个更大的文件(1e12?)并对您的文件进行基准测试。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-05-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多