【问题标题】:Pyspark read csv and combine date and time column and filter based on itPyspark读取csv并结合日期和时间列并基于它进行过滤
【发布时间】:2019-02-06 05:22:34
【问题描述】:

我有大约 10,000 个 csv 文件,每个文件包含 14 列。它们包含有关金融机构、交易价值、日期和时间的数据。

一些 csv 文件只是标题,其中没有数据。我设法在我的本地 hadoop 文件系统上加载了所有 csv 文件。我想要实现的是过滤数据以包含仅在上午 9 点到下午 6 点之间发生的记录。

我将如何实现这一目标?我对 lambda 和过滤器很困惑,所有东西都存在于 spark-python 中。

您能告诉我如何过滤此数据并使用过滤后的数据进行其他分析吗?

P.S,冬令时和夏令时也需要考虑,我想我应该有一些功能可以将时间更改为UTC格式吗?

由于我关心的是基于 csv 文件中的时间列过滤数据,因此我简化了 csvs。让我们说:

CSV 1:(过滤器.csv)

  • ISIN、货币、日期、时间
  • "1","EUR",2018-05-08,07:00
  • "2","EUR",2018-05-08,17:00
  • "3","EUR",2018-05-08,06:59
  • "4","EUR",2018-05-08,17:01

CSV 2:(NoFilter.csv)

  • ISIN、货币、日期、时间
  • "1","EUR",2018-05-08,07:01
  • "2","EUR",2018-05-08,16:59
  • "3","EUR",2018-05-08,10:59
  • "4","EUR",2018-05-08,15:01

我的代码是:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'

df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)

dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)

data = df.rdd
dataFilter = dfFilter.rdd

data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')

print data.count()
print dataFilter.count()

我希望看到 data.count 返回 4,因为所有时间都适合范围,而 dataFilter.count 返回 0,因为没有匹配时间。

谢谢!

【问题讨论】:

  • 您已经尝试过的邮政编码,我们可以帮助您更好的方式
  • 完成了,我编辑了我的问题。

标签: python csv apache-spark filter pyspark


【解决方案1】:

在您的代码中,您只能使用 'csv' 作为格式

from pyspark import SparkContext, SparkConf
ehsanLocationFiltered = '/FileStore/tables/stackoverflow.csv'
df = sqlContext.read.format('csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered).rdd
result=data.map(lambda row: row.Time > '07:00' and row.Time < '17:00')
result.count()

【讨论】:

  • 谢谢。我只需要将:result=data.map(lambda row: row.Time > '07:00' and row.Time ' 07:00' 和 row.Time
【解决方案2】:

好的,我发现我的代码有什么问题!我应该使用:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'

df = sqlContext.read.format('com.databricks.spark.csv')\
   .options(header='true', inferschema='true')\
   .load(ehsanLocationNonFiltered)

dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
   .options(header='true', inferschema='true')\
   .load(ehsanLocationFiltered)

data = df.rdd
dataFilter = dfFilter.rdd

filteredResult = data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
filteredResultExpected =dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')

print filteredResult.count()
print filteredResultExpected.count()

filteredResultExpected =filteredResult 不见了!

【讨论】:

    猜你喜欢
    • 2015-10-12
    • 2016-10-02
    • 1970-01-01
    • 1970-01-01
    • 2013-11-29
    • 1970-01-01
    • 2014-03-31
    • 1970-01-01
    • 2017-07-23
    相关资源
    最近更新 更多