【问题标题】:Loading dataframe from jupyter to dask-mpi on NERSC cluster将数据帧从 jupyter 加载到 NERSC 集群上的 dask-mpi
【发布时间】:2020-09-22 22:44:52
【问题描述】:

我正在尝试使用 NERSC 上的 dask-mpi 客户端将一些相对较大(~15GB)的 VTK 文件加载到 dask 数据帧中。但是,由于大部分工作在 NERSC 节点上以并行方式进行,因此我很难有效地实现这一点。

这里的基本结构遵循 Rollin Thomas 关于在 NERSC 上使用 dask 的笔记本 (https://gitlab.com/NERSC/nersc-notebooks/-/tree/master/dask)。所以我:

  1. 在 NERSC 上启动 Jupyter 笔记本
  2. 访问交互式队列上的节点,定义调度程序文件
  3. 启动客户端访问相同的调度程序文件。此时我可以启动客户端仪表板并观察流程
  4. 在 Jupyter 节点上读取 VTK 文件,并将其放入 pandas 数据帧 - 这工作正常,在 Jupyter 内核上需要约 15GB 内存

从这里,我想将 pandas 数据框中的信息加载到服务器上的 dask 客户端,但我还没有找到一个干净的方法来这样做。我尝试了几件事:

  1. 使用 from_pandas() 将 pandas 数据帧转换为 dask 数据帧。在这种情况下,Jupyter 内核上似乎使用了很多内存,实际上导致 Jupyter 内核上的内存不足错误。

  2. 不要在 Jupyter 内核上读取 vtk 文件,而是创建 vtk 读取函数 @dask.delayed,从该函数返回单个数据帧。当我这样做时,该过程在 NERSC 节点上正确发生,但重新分区数据帧仅使用 1 个核心,因此需要很长时间。

  3. 如上,使 vtk-reading 函数 @dask.delayed,但返回一个包含大量小熊猫数据帧的列表。在这种情况下,构建较小数据帧集的过程似乎发生在 Jupyter 内核而不是 NERSC 节点上,因此大大减慢了处理速度。

  4. 在 Jupyter 内核上加载 vtk 文件。然后创建一个列表索引来分解数据帧,以及一个从该索引范围返回数据帧的函数。在 from_delayed() 中运行它。但是,这似乎又在 Jupyter 内核而不是 NERSC 节点中完成了大部分工作,因此遇到了内存问题。如下图:

    vtkDF = loadVTKs()
    
    chunks = [[i1,j1],[i2,j2], . . .]
    for c in chunks:
        vtkDF_parts.append(foo(vtkDF,c)) #foo simply returns vtkDF from i_c->j_c
    
     vtkDDF = dd.from_delayed(vtkDF_parts)
     vtkDDF.persist()
    

我将不胜感激任何人都可以提供任何帮助,以最佳方式将这些数据正确传输到 NERSC 节点上的内存中——我一直在与 NERSC 的人们合作,但我们一直无法弄清楚。请让我知道其中哪些部分不清楚,我会澄清一下——我觉得我只是在这上面保持头脑清醒,所以我确定我并没有完全清楚。

谢谢, 詹姆斯

【问题讨论】:

    标签: dask


    【解决方案1】:

    我不知道这是否适合您的 VTK 数据类型,但这是一种相当注重内存的方法(从 https://people.sc.fsu.edu/~jburkardt/data/vtk/rbc_001.vtk 提取的数据集):

    import meshio
    import dask.dataframe as dd
    
    mesh_data = meshio.vtk.read('rbc_100.vtk')
    num_per_partition = 100
    df = dd.from_array(mesh_data.cells[0][1:][0], num_per_partition)
    

    您也可以使用 vtk 包并本质上做同样的事情(对于多边形数据)。在您执行计算周期之前,不会在 dask DataFrame 的内存中加载任何内容。如果这有帮助或者您有任何问题,请告诉我。

    编辑:

    很抱歉没有尽快回复您。所以 pandas(如果你在做空间操作,我假设 geopandas)对内存管理不是很好。我不知道您正在尝试执行哪种空间过滤,但也许探索不同的库以获得更高效的内存操作可能会有所帮助。你的文件是 ASCII 格式的吗?我假设您使用的是 vtk 或 meshio,您可能有更好的时间使用纯 io 自己解析数据并将数据拆分为 dask 和 pandas 都可以读取的 parquet 文件。 (单元格、点等)这将允许您限制读取的数据量。此外,通过这种方式,您可以将 vtk 延迟加载到 dask 数据帧中,进行处理,保存到磁盘,然后加载到 geopandas 中,而不是在内存中交换数据。

    【讨论】:

    • 感谢您的回复。但是,我不确定这是否真的解决了我的问题,因为尚不清楚它如何解决将数据从 jupyter 接口加载到集群上的 dask-mpi 客户端的问题。对于较小的数据集,dd.from_pandas() 方法(我上面的方法列表中的#1)有效,但是一旦数据帧中有 700M 行,就会在 juypter 内核上运行内存不足错误。我可以尝试从数据帧转换为数组以节省内存(尽管我需要转换回数据帧以进行以后的空间过滤)——让我知道这是否有意义。
    • 感谢您的建议。将 VTK 保存为要加载的 Parquet 文件是我当前的解决方案——我希望避免这种情况,因为它增加了另一个步骤并且需要一段时间,但我可以将数据加载到内存中。这将我带到一组不同的内存问题,但我将把它放在一个单独的堆栈溢出问题中,因为它只与 NERSC 节点上的内存有关。我确实使用 geopandas,因为我需要进行空间连接,对此我还没有看到好的替代方案。但我试图在 dask 中做到这一点,以便我在较小的数据子集上做到这一点。感谢您的帮助!
    猜你喜欢
    • 2020-09-18
    • 2022-07-11
    • 2022-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-19
    • 2019-02-17
    • 1970-01-01
    相关资源
    最近更新 更多