【问题标题】:Create a custom Transformer in PySpark ML在 PySpark ML 中创建自定义 Transformer
【发布时间】:2015-11-26 16:53:53
【问题描述】:

我是 Spark SQL DataFrames 和 ML 的新手 (PySpark)。 如何创建自定义标记器,例如删除停用词并使用 中的一些库?我可以扩展默认的吗?

【问题讨论】:

    标签: nltk python apache-spark nltk pyspark apache-spark-ml


    【解决方案1】:

    我可以扩展默认的吗?

    不是真的。默认Tokenizerpyspark.ml.wrapper.JavaTransformer 的子类,并且与pyspark.ml.feature 中的其他转换器和估计器相同,将实际处理委托给其Scala 对应物。既然你想使用 Python,你应该直接扩展 pyspark.ml.pipeline.Transformer

    import nltk
    
    from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
    from pyspark.ml import Transformer
    from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
    # Available in PySpark >= 2.3.0 
    from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
    from pyspark.sql.functions import udf
    from pyspark.sql.types import ArrayType, StringType
    
    class NLTKWordPunctTokenizer(
            Transformer, HasInputCol, HasOutputCol,
            # Credits https://stackoverflow.com/a/52467470
            # by https://stackoverflow.com/users/234944/benjamin-manns
            DefaultParamsReadable, DefaultParamsWritable):
    
        stopwords = Param(Params._dummy(), "stopwords", "stopwords",
                          typeConverter=TypeConverters.toListString)
    
    
        @keyword_only
        def __init__(self, inputCol=None, outputCol=None, stopwords=None):
            super(NLTKWordPunctTokenizer, self).__init__()
            self.stopwords = Param(self, "stopwords", "")
            self._setDefault(stopwords=[])
            kwargs = self._input_kwargs
            self.setParams(**kwargs)
    
        @keyword_only
        def setParams(self, inputCol=None, outputCol=None, stopwords=None):
            kwargs = self._input_kwargs
            return self._set(**kwargs)
    
        def setStopwords(self, value):
            return self._set(stopwords=list(value))
    
        def getStopwords(self):
            return self.getOrDefault(self.stopwords)
    
        # Required in Spark >= 3.0
        def setInputCol(self, value):
            """
            Sets the value of :py:attr:`inputCol`.
            """
            return self._set(inputCol=value)
    
        # Required in Spark >= 3.0
        def setOutputCol(self, value):
            """
            Sets the value of :py:attr:`outputCol`.
            """
            return self._set(outputCol=value)
    
        def _transform(self, dataset):
            stopwords = set(self.getStopwords())
    
            def f(s):
                tokens = nltk.tokenize.wordpunct_tokenize(s)
                return [t for t in tokens if t.lower() not in stopwords]
    
            t = ArrayType(StringType())
            out_col = self.getOutputCol()
            in_col = dataset[self.getInputCol()]
            return dataset.withColumn(out_col, udf(f, t)(in_col))
    

    使用示例(来自ML - Features的数据):

    sentenceDataFrame = spark.createDataFrame([
      (0, "Hi I heard about Spark"),
      (0, "I wish Java could use case classes"),
      (1, "Logistic regression models are neat")
    ], ["label", "sentence"])
    
    tokenizer = NLTKWordPunctTokenizer(
        inputCol="sentence", outputCol="words",  
        stopwords=nltk.corpus.stopwords.words('english'))
    
    tokenizer.transform(sentenceDataFrame).show()
    

    对于自定义 Python Estimator,请参阅 How to Roll a Custom Estimator in PySpark mllib

    ⚠ 此答案取决于内部 API,并且与 Spark 2.0.3、2.1.1、2.2.0 或更高版本 (SPARK-19348) 兼容。有关与以前 Spark 版本兼容的代码,请参阅revision 8

    【讨论】:

    • @zero323 我想我已经在回答这个 SO 问题时做到了:stackoverflow.com/questions/41399399/…
    • API 更新了吗?这似乎很麻烦,而不仅仅是编写 init 和 _transform 需要做所有 ._set 的事情?
    猜你喜欢
    • 2017-11-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-05-12
    • 2016-09-13
    • 2016-02-04
    • 1970-01-01
    相关资源
    最近更新 更多