【发布时间】:2020-06-04 12:08:05
【问题描述】:
下面是我在 Databricks 集群上运行的 for 循环 执行:
datalake_spark_dataframe_downsampled = pd.DataFrame(
{'IMEI' : ['001', '001', '001', '001', '001', '002', '002'],
'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0],
'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826],
'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70],
'DaysDeploymentDate': [0, 0, 1, 1, 1, 1, 1],
'label': [0, 0, 1, 1, 0, 0, ]}
)
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )
# printSchema of the datalake_spark_dataframe_downsampled (spark df):
"root
|-- IMEI: string (nullable = true)
|-- OuterSensorConnected: integer (nullable = false)
|-- OuterHumidity: float (nullable = true)
|-- EnergyConsumption: float (nullable = true)
|-- DaysDeploymentDate: integer (nullable = true)
|-- label: integer (nullable = false)"
device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
print(device_ids) #["001", "002", ..."030"] 30 ids
for i in device_ids:
#filtered_dataset=datalake_spark_dataframe_downsampled.where(datalake_spark_dataframe_downsampled.IMEI.isin([i]))
#The above operation is executed inside the function training_models_operation_testing()
try:
training_models_operation_testing(i, datalake_spark_dataframe_downsampled, drop_columns_not_used_in_training, training_split_ratio_value, testing_split_ratio_value, mlflow_folder, cross_validation_rounds_value, features_column_name, optimization_metric_value, pretrained_models_T_minus_one, folder_name_T_minus_one, timestamp_snap, instrumentation_key_value, canditate_asset_ids, executor, device_ids)
except Exception as e:
custom_logging_function("ERROR", instrumentation_key_value, "ERROR EXCEPTION: {0}".format(e))
为了解决这个问题,我附上了一个示例数据,以大致了解我的数据是怎样的......并想象存在更多的行和 ID。我刚刚创建了一些只是为了演示
如您所见,这是在使用 pyspark 运行的 Databricks 集群中的 for 循环内的一个简单函数调用。
简而言之,我首先创建一个包含在我的数据集中存在的唯一 ID(IMEI 列)的列表。这等于 30。因此,我使用 for 循环运行 30 次迭代。在每次迭代中,我都执行以下步骤:
- 过滤与 30 个资产 ID 中的每一个匹配的 datalake_spark_dataframe_downsampled (spark df) 行。例如,假设在初始 df 的 40,000 行中,只有 140 行对应于第一个设备 ID。
- 根据这 140 行 (filtered_dataset),该函数会执行 preprocessing、train-test-split 并训练两个 Spark ML 算法,仅用于过滤数据集的行。
附加的代码 sn-p 运行成功。虽然 for 循环 是按顺序执行的,但一次迭代一次。该函数为第一个 id 调用,只有在完成后才转到下一个 id。但是,我想要转换上述 for 循环,使 30 次迭代将在 pyspark 中同时运行,而不是 一个接一个。我如何在 pyspark 中实现这一点?
我愿意进行讨论和想法测试,因为我知道我所要求的可能不是那么简单,无法在 Spark 环境中执行。
我当前的日志输出(这是我在下面打印的内容)
迭代 1
开始执行...
- 执行 id 001 的函数
执行完毕...
迭代 2
开始执行...
- 执行 id 002 的函数
执行完毕...
我想要的日志输出(这是我在下面打印的内容)
开始执行...
- 执行 id 001 的函数
- 执行 id 002 的函数
- 执行 id 003 的函数
- 执行 id 004 的函数
。
.
.
.
- 执行 id 030 的函数
执行完毕...
同时(同时)一次
[更新]基于cmets(线程模块)上的答案:
【问题讨论】:
-
这个可以,需要实现多线程
-
@RohitNimmala 你能否提供一个在线指南或多线程的火花实现? ...我猜你的意思是像这个例子docs.python.org/3/library/concurrent.futures.html
-
在答案部分提供了一个示例,因为它不适合 cmets 部分,希望对您有所帮助。
标签: python apache-spark for-loop pyspark concurrency