【发布时间】:2019-04-17 02:25:34
【问题描述】:
我需要编写带有内部选择和分区依据的 Spark sql 查询。问题是我有 AnalysisException。 我已经在这上面花了几个小时,但是用其他方法我没有成功。
例外:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30 ASC NULLS FIRST]
+- Project [currentTemperature#27, deviceId#28, status#29, timestamp#30, wantedTemperature#31, CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]
我认为这是一个太复杂的查询,无法像这样实现。但我不知道如何修复它。
SparkSession spark = SparkUtils.getSparkSession("RawModel");
Dataset<RawModel> datasetMap = readFromKafka(spark);
datasetMap.registerTempTable("test");
Dataset<Row> res = datasetMap.sqlContext().sql("" +
" select deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime ,max(timestamp) as maxTime, count(*) as countFrame " +
" from (select test.*, sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +
" from test " +
" ) test " +
" group by deviceid, grp ");
任何建议将不胜感激。 谢谢。
【问题讨论】:
-
我也遇到同样的错误,请问您有解决办法吗?
-
我没有。我从一开始就采用了不同的方法。我使用了自定义聚合。
-
你是说,pandas_udf ?
标签: java apache-spark apache-spark-sql spark-streaming