【问题标题】:Windowing and aggregating pyspark DataFrame [duplicate]窗口化和聚合pyspark DataFrame [重复]
【发布时间】:2017-08-22 14:21:17
【问题描述】:

我正在尝试处理来自套接字的传入事件,然后窗口化并聚合事件数据。我在窗户上遇到了障碍。看来,即使我为 DataFrame 指定了架构,它也不会转换为列。

import sys
from pyspark.sql.types import StructType, StringType, TimestampType, FloatType, IntegerType, StructField

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


if __name__ == "__main__":
    # our data currently looks like this (tab separated).
    # -SYMBOL   DATE            PRICE   TICKVOL BID         ASK
    # NQU7  2017-05-28T15:00:00 5800.50 12      5800.50     5800.50
    # NQU7  2017-05-28T15:00:00 5800.50 1       5800.50     5800.50
    # NQU7  2017-05-28T15:00:00 5800.50 5       5800.50     5800.50
    # NQU7  2017-05-28T15:00:00 5800.50 1       5800.50     5800.50

    if len(sys.argv) != 3:
        # print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)

    spark = SparkSession \
        .builder \
        .appName("StructuredTickStream") \
        .getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel('WARN')

    # Read all the csv files written atomically in a directory
    tickSchema = StructType([
        StructField("symbol", StringType(), True),
        StructField("dt", TimestampType(), True),
        StructField("price", FloatType(), True),
        StructField("tickvol", IntegerType(), True),
        StructField("bid", FloatType(), True),
        StructField("ask", FloatType(), True)
    ])

    events_df = spark \
        .readStream \
        .option("sep", "\t") \
        .option("host", sys.argv[1]) \
        .option("port", sys.argv[2]) \
        .format("socket") \
        .schema(tickSchema) \
        .load()

    events_df.printSchema()
    print("columns = ", events_df.columns)

    ohlc_df = events_df \
        .groupby(F.window("dt", "5 minutes", "1 minutes")) \
        .agg(
            F.first('price').alias('open'),
            F.max('price').alias('high'),
            F.min('price').alias('low'),
            F.last('price').alias('close')
        ) \
        .collect()


    query = ohlc_df \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()

print("columns = ", events_df.columns) 的输出是['value'],进程失败并显示以下跟踪:

pyspark.sql.utils.AnalysisException: "cannot resolve '`dt`' given input columns: [value];;\n'Aggregate [timewindow('dt, 300000000, 60000000, 0)], [timewindow('dt, 300000000, 60000000, 0) AS window#3, first('price, false) AS open#7, max('price) AS high#9, min('price) AS low#11, last('price, false) AS close#13]\n+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3a32b1a2,socket,List(),Some(StructType(StructField(symbol,StringType,true), StructField(dt,TimestampType,true), StructField(price,FloatType,true), StructField(tickvol,IntegerType,true), StructField(bid,FloatType,true), StructField(ask,FloatType,true))),List(),None,Map(sep -> \t, host -> localhost, port -> 9999),None), textSocket, [value#0]\n"

知道我做错了什么吗?

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    您的数据框只有一列value,在这里您尝试从该events_df 访问列dt。这是问题的主要原因。

    下面的语句很清楚,显示它有单列value

    print("columns = ", events_df.columns)
    

    你需要检查这个

    events_df = spark \
        .readStream \
        .option("sep", "\t") \
        .option("host", sys.argv[1]) \
        .option("port", sys.argv[2]) \
        .format("socket") \
        .schema(tickSchema) \
        .load()
    

    为什么它只用一列创建 df。

    【讨论】:

      猜你喜欢
      • 2015-07-11
      • 1970-01-01
      • 1970-01-01
      • 2022-01-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-09
      相关资源
      最近更新 更多