【问题标题】:ML DecisionTreeClassifier - Continuous FeaturesML DecisionTreeClassifier - 连续特征
【发布时间】:2018-01-04 21:09:19
【问题描述】:

如何告诉 ml.DecisionTreeClassifier 在连续特征而不是分类特征上评分,而不必使用 Bucketizer 或 QuantileDiscretizer 方法?

下面是我将连续特征传递到 ML 中的 DecisionTreeClassifier 并且没有对特征进行分箱 (Buckizer) 的代码,大部分评分集被忽略而不是评分(spark 2.1 不支持保留)。

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.types import StringType, DoubleType 
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import udf

# Load the training set that is in parquet format into a data frame
train_df = sqlContext.read.parquet("/data/training_set")

# convert data types to double
train_df.withColumn("income", train_df["income"].cast(DoubleType())
train_df.withColumn("age", train_df["age"].cast(DoubleType())

# StringIndexer - Target
# First we will StringIndexer to get numeric categorical features
indexer1 = StringIndexer(inputCol="target", outputCol="target_numeric", handleInvalid="skip")

############
# StringIndexer/OneHotEncoder - age_in_two_year_increments_2nd_individual
# First we will StringIndexer to get numeric categorical features
indexer2 = StringIndexer(inputCol="income", outputCol="income_numeric", handleInvalid='skip')

# Next we change the categorical feature into binarizing via OneHotEncoder
encoder2 = OneHotEncoder(inputCol="income_numeric", outputCol="income_vector")
############

############
# StringIndexer/OneHotEncoder - age_in_two_year_increments_2nd_individual
# First we will StringIndexer to get numeric categorical features
indexer3 = StringIndexer(inputCol="age", outputCol="age_numeric", handleInvalid='skip')

# Next we change the categorical feature into binarizing via OneHotEncoder
encoder3 = OneHotEncoder(inputCol="age_numeric", outputCol="age_vector")
############

# Create distinct StringIndexer transformers with the outputCol
# parameter set to be the name of the input column appended 
indexedcols = [
"income_vector",
"age_vector"
]

# FEATURES need to be in a Vector which is why this is converted using a VectorAssembler
# The VectorAssember is going to take as input our index columns and our output will be the features.
# Create a VectorAssembler transformer to combine all of the indexed
# categorical features into a vector. Provide the "indexedcols" list
# created above as the inputCols parameter, and name the outputCol "features".
va = VectorAssembler(inputCols = indexedcols, outputCol = 'features')

# Create a DecisionTreeClassifier, setting the label column to your
# indexed label column ("label_ix") and the features column to the
# newly created column from the VectorAssembler above ("features").
# Store the new StringIndexer transformers, the VectorAssembler,
# as well as the DecisionTreeClassifier in a list called "steps"
clf = DecisionTreeClassifier(labelCol="target_numeric", impurity="gini",  maxBins=32, maxMemoryInMB=1024)

#  Create steps for transform for the ml pipeline
steps = [indexer1, 
        indexer2, encoder2, 
        indexer3, encoder3,
        va, clf]

# Create a ML pipeline named "pl" using the steps list to set the stages parameter
pl = Pipeline(stages=steps)

# Run the fit method of the pipeline on the DataFrame
# model in a new variable called "plmodel"
plmodel = pl.fit(train_df)

######################################################################################
# Scoring Set
######################################################################################

# Now get the data you want to run the model against 
scoring_df = sqlContext.read.parquet("/data/scoring_set")

# convert data types to double
scoring_df.withColumn("income", scoring_df["income"].cast(DoubleType())
scoring_df.withColumn("age", scoring_df["age"].cast(DoubleType())

# Run the transform method of the pipeline model created above
# on the "test_df" DataFrame to create a new DataFrame called "predictions"
#
# skip past any labels that are not in the training set.  If you don't skip then errors will be produced 
#saying unseen label:40 which means the scoring set has a new element that did not exist in the training set for the feature.
predictions = plmodel.transform(scoring_DF)

vector_udf1 = udf(lambda vector: float(vector[1]))
vector_udf0 = udf(lambda vector: float(vector[0]))

# Save dataframe to hdfs
outputDF = predictions.select("age", \
"income", \
"prediction", \
vector_udf1("probability").alias("probability0")), \
vector_udf1("probability").alias("probability1")).write.format("parquet").mode("overwrite").save("/data/algo_scored")

【问题讨论】:

    标签: machine-learning pyspark decision-tree


    【解决方案1】:

    对于连续特征,不需要使用 Bucketizer 或 QuantileDiscretizer。对于分类特征,您可以使用 StringIndexer 和 OneHotEncoder 并将其包含在管道中,但对于连续特征,您只需使用 VectorAssembler 指定特征,DecisionTreeClassifier 将自动对特征进行分类。

    所以代码看起来像:

    from pyspark.mllib.linalg import Vectors
    from pyspark.ml import Pipeline
    from pyspark.sql import Row, SparkSession, SQLContext
    from pyspark.sql.types import StringType, DoubleType 
    from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.functions import udf
    
    # Load the training set that is in parquet format into a data frame
    train_df = sqlContext.read.parquet("/data/training_set")
    
    # convert data types to double
    train_df.withColumn("income", train_df["income"].cast(DoubleType())
    train_df.withColumn("age", train_df["age"].cast(DoubleType())
    
    # StringIndexer - Target
    # First we will StringIndexer to get numeric categorical features
    indexer1 = StringIndexer(inputCol="target", outputCol="target_numeric", handleInvalid="skip")
    
    # Create distinct StringIndexer transformers with the outputCol
    # parameter set to be the name of the input column appended 
    indexedcols = [
    "income",
    "age"
    ]
    
    # FEATURES need to be in a Vector which is why this is converted using a VectorAssembler
    # The VectorAssember is going to take as input our index columns and our output will be the features.
    # Create a VectorAssembler transformer to combine all of the indexed
    # categorical features into a vector. Provide the "indexedcols" list
    # created above as the inputCols parameter, and name the outputCol "features".
    va = VectorAssembler(inputCols = indexedcols, outputCol = 'features')
    
    # Create a DecisionTreeClassifier, setting the label column to your
    # indexed label column ("label_ix") and the features column to the
    # newly created column from the VectorAssembler above ("features").
    # Store the new StringIndexer transformers, the VectorAssembler,
    # as well as the DecisionTreeClassifier in a list called "steps"
    clf = DecisionTreeClassifier(labelCol="target_numeric", impurity="gini",  maxBins=32, maxMemoryInMB=1024)
    
    #  Create steps for transform for the ml pipeline
    steps = [indexer1, 
            va, clf]
    
    # Create a ML pipeline named "pl" using the steps list to set the stages parameter
    pl = Pipeline(stages=steps)
    
    # Run the fit method of the pipeline on the DataFrame
    # model in a new variable called "plmodel"
    plmodel = pl.fit(train_df)
    
    ######################################################################################
    # Scoring Set
    ######################################################################################
    
    # Now get the data you want to run the model against 
    scoring_df = sqlContext.read.parquet("/data/scoring_set")
    
    # convert data types to double
    scoring_df.withColumn("income", scoring_df["income"].cast(DoubleType())
    scoring_df.withColumn("age", scoring_df["age"].cast(DoubleType())
    
    # Run the transform method of the pipeline model created above
    # on the "test_df" DataFrame to create a new DataFrame called "predictions"
    #
    # skip past any labels that are not in the training set.  If you don't skip then errors will be produced 
    #saying unseen label:40 which means the scoring set has a new element that did not exist in the training set for the feature.
    predictions = plmodel.transform(scoring_DF)
    
    vector_udf1 = udf(lambda vector: float(vector[1]))
    vector_udf0 = udf(lambda vector: float(vector[0]))
    
    # Save dataframe to hdfs
    outputDF = predictions.select("age", \
    "income", \
    "prediction", \
    vector_udf1("probability").alias("probability0")), \
    vector_udf1("probability").alias("probability1")).write.format("parquet").mode("overwrite").save("/data/algo_scored")
    

    【讨论】:

      猜你喜欢
      • 2017-11-04
      • 2022-11-25
      • 1970-01-01
      • 2021-10-20
      • 1970-01-01
      • 2022-11-27
      • 2017-06-03
      • 1970-01-01
      • 2020-09-21
      相关资源
      最近更新 更多