【发布时间】:2019-05-21 06:14:23
【问题描述】:
我有 dns(string) 和 ip-address (string) 的数据框。我想使用 UDF 来应用我创建的搜索不同/唯一 dns 并将其与它匹配的 ips 数量相关联的 python 函数。最后,它将将该信息输出到列表中。最终结果是 UDF 接受一个数据帧并返回一个列表。
#creating sample data
from pyspark.sql import Row
l = [('pipe.skype.com','172.25.132.26'),('management.azure.com','172.25.24.57'),('pipe.skype.com','172.11.128.10'),('management.azure.com','172.16.12.22'),('www.google.com','172.26.51.144'),('collector.exceptionless.io','172.22.2.21')]
rdd = sc.parallelize(l)
data = rdd.map(lambda x: Row(dns_host=x[0], src_ipv4=x[1]))
data_df = sqlContext.createDataFrame(data)
def beaconing_aggreagte(df):
"""Loops through unique hostnames and correlates them to unique src ip. If an individual hostname has less than 5 unique source ip connection, moves to the next step"""
dns_host = df.select("dns_host").distinct().rdd.flatMap(lambda x: x).collect()
HIT_THRESHOLD = 5
data = []
for dns in dns_host:
dns_data =[]
testing = df.where((f.col("dns_host") == dns)).select("src_ipv4").distinct().rdd.flatMap(lambda x: x).collect()
if 0 < len(testing) <= 5: #must have less than 5 unique src ip for significance
dns_data.append(dns)
data.append([testing,dns_data])
print([testing,dns_data])
return data
我认为我的架构可能不正确
#Expected return from function: [[['172.25.24.57','172.16.12.22'],[management.azure.com]],..]
array_schema = StructType([
StructField('ip', ArrayType(StringType()), nullable=False),
StructField('hostname', ArrayType(StringType()), nullable=False)
])
testing_udf_beaconing_aggreagte = udf(lambda z: beaconing_aggreagte(z), array_schema)
df_testing = testing_df.select('*',testing_udf_beaconing_aggreagte(array('dns_host','src_ipv4')))
df_testing.show()
这个错误输出到:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1248.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1248.0 (TID 3846823, 10.139.64.23, executor 13): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
我的最终目标是获取一个 df 并以 [[[list of ips], [dns_host]],...] 格式返回一个列表。我正在尝试使用 UDF 来帮助并行化集群上的操作,而不是使用一个执行器。
【问题讨论】:
标签: python arrays dataframe pyspark user-defined-functions