【问题标题】:Visibility of temproray tables and database tables in Spark SQL, is it possible to make a nested query to temprorary table from usual jdbc querySpark SQL中临时表和数据库表的可见性,是否可以从通常的jdbc查询对临时表进行嵌套查询
【发布时间】:2023-03-03 10:46:01
【问题描述】:

我有一个DataFrame 作为临时表

val dailySummariesDfVisualize =
      dailySummariesDf
    .orderBy("event_time").registerTempTable("raw") 

我可以使用 Spark SQL 从中提取一些内容:

val df = sqlContext.sql("SELECT * FROM raw")
df.show()

并且输出有效。然后我想对 JDBC 数据库查询中的临时表进行嵌套查询,如下所示:

val dailySensorData =
getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(), 
          s"SELECT *  FROM values WHERE time in (SELECT event_time FROM raw) limit 1000000")
           .persist(StorageLevel.MEMORY_ONLY_SER)
dailySensorData.show(400, false)

在这里我得到了例外:

org.postgresql.util.PSQLException: ERROR: relation "raw" does not exist

如果我尝试像这样在sqlContext.sql() 中执行

val df = sqlContext.sql("SELECT * FROM values WHERE time in (SELECT event_time FROM raw)")
df.show()

我明白了:

org.apache.spark.sql.AnalysisException: Table or view not found: values; line 1 pos 14;
'Project [*]
+- 'Filter 'time IN (list#4967 [])
   :  +- 'Project ['event_time]
   :     +- 'UnresolvedRelation [raw]
   +- 'UnresolvedRelation [values]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:106)

像值(真正的 jdbc 表)和原始(临时表)一样,它都不可见。如何在嵌套查询中使用临时表?

更新

根据我尝试过的 mazaneicha(在此处检索所有值,因为无法使用嵌套查询限制它们):

val dailySummariesDfVisualize =
      dailySummariesDf
    .orderBy("event_time").createOrReplaceTempView("raw") 

val dailySensorData =
      getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(), 
      s"SELECT *  FROM values").createOrReplaceTempView("values")     

val df = sqlContext.sql("SELECT * FROM values WHERE time in (SELECT event_time FROM raw)")
df.explain(true)

这是合乎逻辑的计划:

= Parsed Logical Plan ==
'Project [*]
+- 'Filter 'time IN (list#5475 [])
   :  +- 'Project ['event_time]
   :     +- 'UnresolvedRelation [raw]
   +- 'UnresolvedRelation [values]

== Analyzed Logical Plan ==
devicename: string, value: double, time: timestamp, coffee_machine_id: string, digital_twin_id: string, write_time: timestamp
Project [devicename#5457, value#5458, time#5459, coffee_machine_id#5460, digital_twin_id#5461, write_time#5462]
+- Filter time#5459 IN (list#5475 [])
   :  +- Project [event_time#4836]
   :     +- SubqueryAlias raw
   :        +- Sort [event_time#4836 ASC NULLS FIRST], true
   :           +- Relation[event_type#4835,event_time#4836,event_payload#4837,coffee_machine_id#4838,digital_twin_id#4839] JDBCRelation((SELECT *  FROM events WHERE (event_time > '2021-03-31'  or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1]
   +- SubqueryAlias values
      +- Relation[devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] JDBCRelation((SELECT *  FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1]

== Optimized Logical Plan ==
Join LeftSemi, (time#5459 = event_time#4836)
:- Relation[devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] JDBCRelation((SELECT *  FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1]
+- Project [event_time#4836]
   +- Relation[event_type#4835,event_time#4836,event_payload#4837,coffee_machine_id#4838,digital_twin_id#4839] JDBCRelation((SELECT *  FROM events WHERE (event_time > '2021-03-31'  or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1]

== Physical Plan ==
SortMergeJoin [time#5459], [event_time#4836], LeftSemi
:- *(2) Sort [time#5459 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(time#5459, 200), true, [id=#1219]
:     +- *(1) Scan JDBCRelation((SELECT *  FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1] [devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] PushedFilters: [], ReadSchema: struct<devicename:string,value:double,time:timestamp,coffee_machine_id:string,digital_twin_id:str...
+- *(4) Sort [event_time#4836 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(event_time#4836, 200), true, [id=#1224]
      +- *(3) Scan JDBCRelation((SELECT *  FROM events WHERE (event_time > '2021-03-31'  or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1] [event_time#4836] PushedFilters: [], ReadSchema: struct<event_time:timestamp>

【问题讨论】:

  • 您应该能够使用 JDBC 来创建 values 数据框而不使用 WHERE 子句,注册临时视图 "values",然后针对两个临时视图运行 SQL。另请注意,registerTempTable 很久以前就已弃用,请尝试改用createTempView
  • @mazaneicha 这会因为性能问题,因为值真的很大,所以我希望使用嵌套查询的结果到临时表/视图将减少 jdbc 与 db 的交互
  • 你能分享一下.explain()吗?
  • @mazaneicha 刚刚在帖子中添加了实物计划
  • 我明白你的意思,最终阅读整个values :( 我能想到的唯一其他选择是来自raw.collectAsList() 键,并使用此列表创建IN 子句。

标签: scala apache-spark apache-spark-sql apache-zeppelin


【解决方案1】:

根据 mazaneicha 的建议,我能够通过从 DataFramw Rows 生成 scala 中的 where 子句来解决这个问题,与我进行提取查询的数据相比,这些子句并不多:

var collectedString = scala.collection.mutable.MutableList[String]()

for (row <- dailySummariesDfVisualize.collectAsList())
  {
      println(row(1))
      val start = row(1)
      val end = row(5)
      val timeSelection = s" time > ' ${start}' and  time < '${end}'"
      collectedString+=timeSelection    
  }

val whereClause = collectedString.mkString(" or ")
println(whereClause)

val dailySensorData =
      getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(), 
      s"SELECT *  FROM values WHERE "+whereClause+" limit 1000000")
       .persist(StorageLevel.MEMORY_ONLY_SER)    

dailySensorData.show(400, false)

它以可接受的性能完成我实际需要的输出。

格式化的 whereClause 输出类似于:

time > ' 2021-03-24 07:06:34.0' and  time < '2021-03-24 07:08:34.0' or  time > ' 2021-03-24 07:07:41.0' and  time < '2021-03-24 07:09:41.0' or  time > ' 2021-03-24 07:07:43.0' and  time < '2021-03-24 07:09:43.0'

等等

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-20
    相关资源
    最近更新 更多