【问题标题】:multiprocessing hangs xgboost scoring多处理挂起 xgboost 评分
【发布时间】:2021-10-14 07:03:57
【问题描述】:

我正在尝试使用多处理同时对多个数据集进行评分。以下代码在运行时挂起,但是当我在池外的 base_model 上运行 score 时,它​​会立即执行。

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

titanic = pd.read_csv('titanic.csv')
titanic['Age'].fillna(titanic['Age'].mean(), inplace=True) #fill all missing values with the average
#create indicator for cabin because there is  areason it was missing
titanic['Cabin_ind'] = np.where(titanic['Cabin'].isnull(), 0, 1)
#convert sex to numeric
gender_num = {'male': 0, 'female': 1}
titanic['Sex'] = titanic['Sex'].map(gender_num)
#drop uncessary variables
titanic.drop(columns=['Cabin', 'Embarked', 'Name', 'Ticket'], inplace=True)
features = titanic.drop(columns=['Survived'])
labels = titanic['Survived']
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=.2, random_state=42)

from xgboost import XGBClassifier

base_model = XGBClassifier(objective='binary:logistic', nthread=4, seed=27, scoring='auc')
base_model.fit(X_train, y_train)

import multiprocessing as mp

def my_func(X_test, y_test, base_model):
    val = base_model.score(X_test, y_test)
    print(val)

def main():
    to_pass = [(X_test, y_test, base_model)]
    pool = mp.Pool(1)
    pool.starmap(my_func, to_pass)


if __name__ == "__main__":
    main()

【问题讨论】:

    标签: python python-3.x scikit-learn multiprocessing xgboost


    【解决方案1】:

    XGBoostmultiprocessing 的问题在于,当您将 XGBoost 传递给多处理时,您不知道它在后台做了什么。这不仅在 XGBoost 中很常见,在其他库中也很常见,我发现当您使用不同的进程来加快工作速度时,它们中的许多会挂起。

    我倾向于保存模型,然后使用多处理创建一个新的 XGBClassifier 并从那里加载模型。它更安全,我更确定不会发生任何奇怪的事情。

    这是一个例子

    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from xgboost import XGBClassifier
    from concurrent.futures import ProcessPoolExecutor, as_completed
    from sklearn.metrics import accuracy_score
    
    
    def get_data():
        titanic = pd.read_csv('titanic.csv', usecols=['Age', 'Cabin', 'Sex', 'Survived'])
        # fill all missing values with the average
        titanic['Age'] = titanic['Age'].fillna(titanic['Age'].mean())
        # create indicator for cabin because there is  areason it was missing
        titanic['Cabin_ind'] = np.where(titanic['Cabin'].isna(), 0, 1)
    
        # convert sex to numeric
        titanic['Sex'] = np.where(titanic['Sex'] == 'male', 0, 1)
    
        # convert Survived to numeric
        titanic['Survived'] = np.where(titanic['Survived'] == 'yes', 1, 0)
    
        features = titanic[['Age', 'Sex', 'Cabin_ind']]
        labels = titanic['Survived']
        X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)
        return X_train, X_test, y_train, y_test
    
    
    def train_model(X_train, y_train):
        print('Training model')
        base_model = XGBClassifier(objective='binary:logistic', nthread=4, seed=27, scoring='auc', use_label_encoder=False)
        base_model.fit(X_train, y_train)
        base_model.save_model('model.json')
    
    
    def score_model(i, X_test, y_test):
        print('{}: Scoring model'.format(i))
        base_model = XGBClassifier(objective='binary:logistic', nthread=4, seed=27, scoring='auc', use_label_encoder=False)
        base_model.load_model('model.json')
        score = base_model.score(X_test, y_test)
        y_pred = base_model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        return i, score, accuracy
    
    
    def main():
        X_train, X_test, y_train, y_test = get_data()
        with ProcessPoolExecutor(max_workers=1) as executor:
            task = executor.submit(train_model, X_train, y_train)
            task.result()
    
        tasks = []
        with ProcessPoolExecutor(max_workers=1) as executor:
            # You can split your test here, I am passing it as a whole
            for i in range(10):
                # I also use i in function to identify result later in case of out of order completion
                task = executor.submit(score_model, i, X_test, y_test)
                tasks.append(task)
    
        for task in as_completed(tasks):
            i, score, accuracy = task.result()
            print('Task {} completed: score={}, accuracy={}'.format(i, score, accuracy))
    
    
    if __name__ == '__main__':
        main()
    
    

    编辑:

    1. 正如 here 所讨论的,建议在分叉后初始化 xgboost,因此我在另一个进程中训练 XGBoost 以避免锁定
    2. 当您使用XGBoost 时,最好强制use_label_encoder=False。原因是XGBoost,至少在我的电脑上抱怨
    UserWarning: The use of label encoder in XGBClassifier is
    deprecated and will be removed in a future release.
    
    To remove this warning, do the following:
    1) Pass option use_label_encoder=False when constructing XGBClassifier object; and
    2) Encode your labels (y) as integers starting with 0, i.e. 0, 1, 2, ..., [num_class - 1].
    
    1. 尽量避免在pandas 中使用inplace,正如here 所讨论的那样
    2. 使用max_workers=some_cpu_number_that_makes_sense 使用多个cpu。
    3. 我用XGBoost 训练了很多东西,根据我的经验,你并不真的需要multiprocessing,即使使用2 GB 的数据集也相当快。除非你有一个非常大的数据集......

    我使用的csv

    Age,Cabin,Sex,Survived,Embarked,Name,Ticket
    ,,male,yes,,,
    19,,male,no,,,
    20,1,female,yes,,,
    25,2,male,no,,,
    ,3,female,yes,,,
    40,3,male,yes,,,
    ,3,female,yes,,,
    ,,female,yes,,,
    ,3,male,no,,,
    15,3,female,yes,,,
    

    【讨论】:

    • 我收到一个奇怪的错误 - AttributeError: 'XGBClassifier' object has no attribute '_le' 在尝试对测试数据进行评分时。我在网上没有看到太多关于这个的任何想法?感谢您的帮助!
    • 您能否提供一些示例 csv(尽可能少)和您使用的 xgoost 版本(哪个操作系统)?
    • 我更新了我的答案,我在我的机器上测试了它并且它有效,请确保你不使用标签编码器。这个问题来自标签编码标签。
    猜你喜欢
    • 2016-09-11
    • 2020-06-05
    • 2020-04-11
    • 2012-04-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多