【问题标题】:How to create a custom Transformer from a UDF?如何从 UDF 创建自定义 Transformer?
【发布时间】:2016-05-12 20:31:36
【问题描述】:

我正在尝试使用自定义阶段创建和保存 Pipeline。我需要使用UDFcolumn 添加到我的DataFrame。因此,我想知道是否可以将UDF 或类似操作转换为Transformer

我的自定义UDF 看起来像这样,我想学习如何使用UDF 作为自定义Transformer

def getFeatures(n: String) = {
    val NUMBER_FEATURES = 4  
    val name = n.split(" +")(0).toLowerCase
    ((1 to NUMBER_FEATURES)
         .filter(size => size <= name.length)
         .map(size => name.substring(name.length - size)))
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))

【问题讨论】:

    标签: scala apache-spark apache-spark-sql user-defined-functions apache-spark-ml


    【解决方案1】:

    这不是一个功能齐全的解决方案,但您可以从以下内容开始:

    import org.apache.spark.ml.{UnaryTransformer}
    import org.apache.spark.ml.util.Identifiable
    import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
    
    class NGramTokenizer(override val uid: String)
      extends UnaryTransformer[String, Seq[String], NGramTokenizer]  {
    
      def this() = this(Identifiable.randomUID("ngramtokenizer"))
    
      override protected def createTransformFunc: String => Seq[String] = {
        getFeatures _
      }
    
      override protected def validateInputType(inputType: DataType): Unit = {
        require(inputType == StringType)
      }
    
      override protected def outputDataType: DataType = {
        new ArrayType(StringType, true)
      }
    }
    

    快速检查:

    val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
    val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")
    
    transformer.transform(df).show
    // +---+------+------------------+
    // |  k|     v|                vs|
    // +---+------+------------------+
    // |  1|abcdef|[f, ef, def, cdef]|
    // |  2|foobar|[r, ar, bar, obar]|
    // +---+------+------------------+
    

    您甚至可以尝试将其概括为以下内容:

    import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
    import scala.reflect.runtime.universe._
    
    class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
      override val uid: String,
      f: T => U
    ) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]]  {
    
      override protected def createTransformFunc: T => U = f
    
      override protected def validateInputType(inputType: DataType): Unit = 
        require(inputType == schemaFor[T].dataType)
    
      override protected def outputDataType: DataType = schemaFor[U].dataType
    }
    
    val transformer = new UnaryUDFTransformer("featurize", getFeatures)
      .setInputCol("v")
      .setOutputCol("vs")
    

    如果您想使用 UDF 而不是包装函数,则必须直接扩展 Transformer 并覆盖 transform 方法。不幸的是,大多数有用的类都是私有的,所以它可能相当棘手。

    您也可以注册 UDF:

    spark.udf.register("getFeatures", getFeatures _)
    

    并使用SQLTransformer

    import org.apache.spark.ml.feature.SQLTransformer
    
    val transformer = new SQLTransformer()
      .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")
    
    transformer.transform(df).show
    // +---+------+------------------+
    // |  k|     v|                vs|
    // +---+------+------------------+
    // |  1|abcdef|[f, ef, def, cdef]|
    // |  2|foobar|[r, ar, bar, obar]|
    // +---+------+------------------+
    

    【讨论】:

    • 我试图保存我的模型,但它说Message: Pipeline write will fail on this Pipeline because it contains a stage which does not implement Writable. Non-Writable stage: ngramtokenizer_f784079e2124 of type class我必须实现 Writable 接口吗?
    • 这是我之前提到的不好的部分。据我所知,最好的方法是实现 DefaultParamsWritableDefaultParamsReadable 但如果不将至少一部分代码放入 ML 包中,这是不可能的。不过你可以试试MLWritable / MLReadable
    【解决方案2】:

    我最初尝试扩展 TransformerUnaryTransformer 抽象,但遇到了我的应用程序无法访问 DefaultParamsWriteable 的问题。作为可能与您的问题相关的示例,我创建了一个简单的术语规范化器作为UDF 跟随 this example。我的目标是将术语与模式和集合进行匹配,以用通用术语替换它们。例如:

    "\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr"
    

    这是课

    import scala.util.matching.Regex
    
    class TermNormalizer(normMap: Map[Any, String]) {
      val normalizationMap = normMap
    
      def normalizeTerms(terms: Seq[String]): Seq[String] = {
        var termsUpdated = terms
        for ((term, idx) <- termsUpdated.view.zipWithIndex) {
          for (normalizer <- normalizationMap.keys: Iterable[Any]) {
            normalizer match {
              case (regex: Regex) =>
                if (!regex.findFirstIn(term).isEmpty) termsUpdated = 
                  termsUpdated.updated(idx, normalizationMap(regex))
              case (set: Set[String]) =>
                if (set.contains(term)) termsUpdated = 
                  termsUpdated.updated(idx, normalizationMap(set))
            }
          }
        }
        termsUpdated
      }
    }
    

    我是这样使用的:

    val testMap: Map[Any, String] = Map("hadoop".r -> "elephant",
      "spark".r -> "sparky", "cool".r -> "neat", 
      Set("123", "456") -> "set1",
      Set("789", "10") -> "set2")
    
    val testTermNormalizer = new TermNormalizer(testMap)
    val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String]))
    
    val trainingTest = sqlContext.createDataFrame(Seq(
      (0L, "spark is cool 123", 1.0),
      (1L, "adsjkfadfk akjdsfhad 456", 0.0),
      (2L, "spark rocks my socks 789 10", 1.0),
      (3L, "hadoop is cool 10", 0.0)
    )).toDF("id", "text", "label")
    
    val testTokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    
    val tokenizedTrainingTest = testTokenizer.transform(trainingTest)
    println(tokenizedTrainingTest
      .select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false))
    

    现在我仔细阅读了这个问题,听起来你在问如何避免这样做,哈哈。无论如何,我仍然会发布它,以防将来有人正在寻找一种简单的方法来应用类似转换器的功能

    【讨论】:

      【解决方案3】:

      如果您希望使转换器也可写,那么您可以在您选择的公共包中的 sharedParams 库中重新实现 HasInputCol 等特征,然后将它们与 DefaultParamsWritable 特征一起使用以使转换器具有持久性。

      通过这种方式,您还可以避免将部分代码放在 spark core ml 包中,但您需要在自己的包中维护一组并行参数。这不是一个真正的问题,因为它们几乎不会改变。

      但请务必在他们的 JIRA 板 here 中跟踪错误,该错误要求将一些常见的 sharedParams 公开而不是对 ml 私有,以便人们可以直接使用来自外部课程的那些。

      【讨论】:

        猜你喜欢
        • 2015-11-26
        • 2017-11-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-05-09
        • 1970-01-01
        • 2018-04-09
        • 1970-01-01
        相关资源
        最近更新 更多