【发布时间】:2017-01-10 19:39:25
【问题描述】:
我必须使用 python 开发一个 Spark 脚本来检查一些日志并验证用户是否在两个事件之间更改了他的 IP 所在的国家/地区。我有一个 csv 文件,其中包含保存在 HDFS 上的 IP 范围和相关国家/地区,如下所示:
startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany
还有一个日志 csv 文件:
userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login
我使用 Spark Dataframe 加载这两个文件,并且我已经修改了包含带有滞后函数的日志的文件,添加了一个包含 previousIp 的列。我认为的解决方案是将 ip 和 previousIp 替换为关联的国家/地区,以便比较它们并使用 dataFrame.filter("previousIp" != "ip")。 我的问题是,有没有办法在 Spark 中做到这一点?比如:
dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)
为了有这样的数据框:
userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy
如果没有,我该如何解决我的问题?谢谢
【问题讨论】:
标签: python apache-spark hdfs pyspark udf