如果您计划最终进行扩展,您将希望使用SparkContext 中的分布式输入法,而不是从依赖sc.parallelize 的驱动程序加载任何本地文件。听起来你需要完整地阅读每个文件,所以在你的情况下你想要:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/")
或者如果你愿意,你也可以指定单个文件,但是你只有一个带有单个元素的 RDD:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
那么每条记录就是一对<filename>,<str of bytes>。在 Dataproc 上,sc.binaryFiles 将自动直接使用 GCS 路径,而 np.load 需要本地文件系统路径。
然后在您的工作代码中,您只需使用StringIO 将这些字节字符串用作您放入np.load 的文件对象:
from StringIO import StringIO
# For example, to create an RDD of the 'arr_0' element of each of the picked objects:
npz_rdd.map(lambda l: numpy.load(StringIO(l[1]))['arr_0'])
在开发过程中,如果您真的只想将文件读入主驱动程序,您可以随时使用collect() 折叠您的 RDD 以在本地检索它:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
local_bytes = npz_rdd.collect()[0][1]
local_np_obj = np.load(StringIO(local_bytes))