【发布时间】:2020-03-27 08:37:32
【问题描述】:
问题:
我想在以下之间进行空间连接:
- 带有点(例如道路上的点)的大型 Spark Dataframe(500M 行)
- 带有多边形(例如区域边界)的小型geojson(20000 个形状)。
这是我目前所拥有的,我发现它很慢(很多调度程序延迟,可能是由于 communes 没有广播):
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
return joined_df[columns]
pandas_udf 将一点 points 数据帧(trace)作为 pandas 数据帧,将其转换为带有 geopandas 的 GeoDataFrame,并使用 polygons进行空间连接> GeoDataFrame(因此受益于 Geopandas 的 Rtree 连接)
问题:
有没有办法让它更快?我知道我的 communes 地理数据帧在 Spark Driver 的内存中,并且每个工作人员都必须为每次调用 udf 下载它,这是正确的吗?
但是我不知道如何使这个 GeoDataFrame 直接对工作人员可用(如在广播连接中)
有什么想法吗?
【问题讨论】:
-
你广播过公社吗?您应该广播公社,然后使用 communes.value 访问 json
-
这就是我最终做的事
标签: python pandas pyspark pyspark-sql geopandas