【问题标题】:Run Identical model on multiple GPUs, but send different user data to each GPU在多个 GPU 上运行相同的模型,但向每个 GPU 发送不同的用户数据
【发布时间】:2018-06-17 04:06:21
【问题描述】:

任何人都在高效的数据并行方面取得了成功,您将相同的模型定义发送到多个 GPU,但将不同的用户数据发送到每个 GPU?

dist-keras 看起来很有希望。但我很想听听有关按照这些思路采取的任何方法的反馈。

我们有用户行为数据:10 万用户、200 个字段(单热向量)、每个用户 30,000 条记录。我们在 Tensorflow 上使用 Keras 构建了一个 RNN,以预测仅为 1 个用户采取的下一个动作(20 多个可能的动作)。在 1 个 GPU 上训练大约需要 30 分钟。 (我的盒子有 8 个 GPU)。现在,我们想为所有 10 万用户构建模型。

我们能够使用多 GPU 方法对单用户数据执行数据并行处理。

但是由于该模型每个用户需要 30 分钟,并且有 10 万个用户,我们希望按用户对数据进行分区,并使用集群以分布式方式为每个用户数据运行相同的模型,并为该用户生成模型输出.

我目前正在使用 Keras 2.1.x 和 TensorFlow 1.4。

【问题讨论】:

标签: python tensorflow pyspark keras distributed


【解决方案1】:

这与您所描述的不完全一样,但是,可能可行的方法是获取每个批次的切片,并通过获取模型并构建一个单独的模型来分别在不同的 GPU 上对其进行训练,从而自动执行此操作。

假设我们想让模型并行化,然后在训练期间在硬件之间拆分其批次。

def make_parallel(model, gpu_count):
    """
    make a paralellized model from the input model on the
    given gpu count that splits the input batch amongst the 
    hardware.

    :param model: The model you want to make parallel
    :param gpu_count: The gpu count
    :return: The parellelized model
    """
    def get_slice(data, idx, parts): # take a slice of the batch
        shape = tf.shape(data)
        size = tf.concat([shape[:1] // parts, shape[1:]], axis=0)
        stride = tf.concat([shape[:1] // parts, shape[1:] * 0], axis=0)
        start = stride * idx
        return tf.slice(data, start, size)

    outputs_all = [[] for i in range(len(model.outputs))]

    # Place a copy of the model on each GPU, each getting a slice of the batch
    for i in range(gpu_count):
        with tf.device('/gpu:%d' % i):
            with tf.name_scope('tower_%d' % i) as scope:
                inputs = []
                for x in model.inputs:
                    input_shape = tuple(x.get_shape().as_list())[1:]
                    slice_n = Lambda(get_slice, output_shape=input_shape, arguments={'idx': i, 'parts': gpu_count})(x)
                    inputs.append(slice_n)

                outputs = model(inputs)

                if not isinstance(outputs, list):
                    outputs = [outputs]

                # Save all outputs to be joined at a later date
                for l in range(len(outputs)):
                    outputs_all[l].append(outputs[l])

    # merge outputs on CPU
    with tf.device('/cpu:0'):
        merged = [merge(output, mode='concat', concat_axis=0) for output in outputs_all]
        return Model(input=model.inputs, output=merged)

你能在这个模型上训练时报告速度结果吗?

【讨论】:

    猜你喜欢
    • 2013-11-16
    • 1970-01-01
    • 2018-12-02
    • 1970-01-01
    • 1970-01-01
    • 2020-11-01
    • 2021-04-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多