【问题标题】:Tensorflow Federated : Why my iterative Process unable to train roundsTensorflow Federated:为什么我的迭代过程无法训练回合
【发布时间】:2020-02-10 15:34:03
【问题描述】:

我用我自己的数据集中的 TFF 编写了一个代码,所有代码都可以正常运行,除了 这一行

在 train_data 中,我制作了 4 个数据集,加载了 tf.data.Dataset,它们的类型为“DatasetV1Adapter”

def client_data(n):
  ds = source.create_tf_dataset_for_client(source.client_ids[n])
  return ds.repeat(10).map(map_fn).shuffle(500).batch(20)

federated_train_data = [client_data(n) for n in range(4)]

batch = tf.nest.map_structure(lambda x: x.numpy(), iter(train_data[0]).next())

def model_fn():
  model = tf.keras.models.Sequential([
    .........
  return tff.learning.from_compiled_keras_model(model, batch)   

所有这些都运行正常,我得到了教练和状态:

trainer = tff.learning.build_federated_averaging_process(model_fn)

除了,当我想用​​这段代码开始训练和循环时:

state, metrics = iterative_process.next(state, federated_train_data) 
print('round  1, metrics={}'.format(metrics))

我不能。错误来了!那么,错误可能来自哪里?从数据集的类型?还是我让数据联合的方式?

【问题讨论】:

  • 能否将问题扩展到包含所看到的确切错误消息?从上面的代码来看,变量名称似乎不同:创建了一个 train_data,然后请求了一个 federated_train_data
  • 感谢您的评论,我纠正了注意力不集中的错误。但我的问题是,在执行最后一行(第一轮)时,内核需要很长时间(运行),但随后它崩溃而没有显示任何内容。
  • 听起来您的机器可能内存不足?很难说没有更多信息,例如输入数据集是什么样的,代码是在 CPU 上运行还是在 GPU 上运行?尝试减少内存占用的几件事:仅在单个客户端上运行进行测试(在构建 federated_train_data 时将 range(4) 更改为 range(1)),减少 .shuffle() 缓冲区的大小。在数据输入管道上在.map() 之前调用.batch(),并使用num_parallel_calls 参数,可以大大加快数据读取速度。 tensorflow.org/guide/data_performance 是一个很好的向导。
  • @ZacharyGarrett 请看答案,我做代码

标签: tensorflow2.0 tensorflow-federated


【解决方案1】:

正如上面的 cmets 所验证的,在 client_data 函数中添加对某个有限整数 Ntake(N) 调用应该可以解决这个问题。问题是 TFF 将减少 您传递给它的数据集中的所有元素。如果你有一个无限的数据集,这意味着“永远继续运行减少”。 N 这里应该代表“单个客户拥有多少数据”,并且可以是您选择的任何内容。

【讨论】:

  • 非常感谢,这是对我有帮助的一点,但是我的数据集不平衡,这意味着并非所有客户端都有相同数量的数据(每个有 700 张图像,其他有 200 张图像.. .) 所以在这种情况下我该如何修复N,因为Take(N) 应用于函数中的每个条目
【解决方案2】:

这是我的代码,我使用 Tensorflow v2.1.0 和 tff 0.12.0

img_height = 200
img_width = 200
num_classes = 2
batch_size = 10

input_shape = (img_height, img_width, 3)

img_gen = tf.keras.preprocessing.image.ImageDataGenerator()
gen0 = img_gen.flow_from_directory(par1_train_data_dir,(200, 200),'rgb', batch_size=10)
ds_par1 = tf.data.Dataset.from_generator(gen
    output_types=(tf.float32, tf.float32),
    output_shapes=([None,img_height,img_width,3], [None,num_classes])
)
ds_par2 = tf.data.Dataset.from_generator(gen0 
    output_types=(tf.float32, tf.float32),
    output_shapes=([None,img_height,img_width,3], [None,num_classes])
)

dataset_dict={}
dataset_dict['1'] = ds_par1
dataset_dict['2'] = ds_par2

def create_tf_dataset_for_client_fn(client_id):
    return dataset_dict[client_id]

source = tff.simulation.ClientData.from_clients_and_fn(['1','2'],create_tf_dataset_for_client_fn)

def client_data(n):
  ds = source.create_tf_dataset_for_client(source.client_ids[n])
  return ds


train_data = [client_data(n) for n in range(1)]

images, labels = next(img_gen.flow_from_directory(par1_train_data_dir,batch_size=batch_size,target_size=(img_height,img_width)))
sample_batch = (images,labels)

def create_compiled_keras_model():
  .....

def model_fn():
    keras_model = create_compiled_keras_model()
    return tff.learning.from_compiled_keras_model(keras_model, sample_batch)

iterative_process = tff.learning.build_federated_averaging_process(model_fn)
state = iterative_process.initialize()


state, metrics = iterative_process.next(state, train_data)
print('round 1, metrics={}'.format(round_num, metrics))

【讨论】:

  • 您可以尝试将take(100) 添加到您的client_data 函数吗?即,return ds.take(100) 而不仅仅是return ds?我想知道这是否是在默默地创建一个无限数据集。
  • take(100) 是什么意思?是batch_size吗?如果是,我可以从生成器中删除 batch_size 并在这里制作?
  • 非常感谢,这是对我有帮助的一点,但是我的数据集不平衡,这意味着并非所有客户端都有相同数量的数据(每个有 700 张图像,其他有 200 张图像.. .) 所以在这种情况下我该如何修复 N,因为 Take(N) 应用于函数中的每个条目
猜你喜欢
  • 2021-03-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-03
  • 1970-01-01
  • 2019-06-05
  • 1970-01-01
  • 2015-11-21
相关资源
最近更新 更多