【问题标题】:Create Spark Dataframe from SQL Query从 SQL 查询创建 Spark 数据框
【发布时间】:2016-11-17 11:52:54
【问题描述】:

我确定这是一个简单的 SQLContext 问题,但我在 Spark 文档或 Stackoverflow 中找不到任何答案

我想从 MySQL 上的 SQL 查询创建 Spark 数据框

例如,我有一个复杂的 MySQL 查询,例如

SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...

我想要一个包含 X、Y 和 Z 列的数据框

我想出了如何将整个表加载到 Spark 中,我可以将它们全部加载,然后在那里进行连接和选择。然而,这是非常低效的。我只想加载我的 SQL 查询生成的表。

这是我当前的代码近似值,它不起作用。 Mysql-connector 有一个选项“dbtable”,可用于加载整个表。我希望有一些方法可以指定查询

  val df = sqlContext.format("jdbc").
    option("url", "jdbc:mysql://localhost:3306/local_content").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", "root").
    option("password", "").
    sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
    ).load()

【问题讨论】:

标签: mysql sql scala apache-spark mysql-connector


【解决方案1】:

我在这里找到了这个Bulk data migration through Spark SQL

dbname 参数可以是用括号括起来并带有别名的任何查询。所以就我而言,我需要这样做:

val query = """
  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

正如预期的那样,将每个表作为自己的 Dataframe 加载并在 Spark 中加入它们的效率非常低。

【讨论】:

    【解决方案2】:

    如果您的table 已经在您的SQLContext 中注册,您可以简单地使用sql 方法。

    val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
    

    【讨论】:

    • 顺便说一句,这不是您问题的要点,所以我将其添加为评论。如果您正在寻找如何实现前面的步骤(连接到 mysql 等...),您可以查看此帖子 Spark + MySQL example
    • 谢谢。我已经找到了如何将整个表加载到 Spark 中。但是,我的问题是我有一个复杂的查询,连接了许多大表,并且只选择了几列。我希望创建一个只有选定列的简单数据框
    • 如何向SQLContext注册表?
    【解决方案3】:

    TL;DR:只需在您的数据库中创建一个视图。

    详情: 我的 postgres 数据库中有一个表 t_city,我在其上创建了一个视图:

    create view v_city_3500 as
        select asciiname, country, population, elevation
        from t_city
        where elevation>3500
        and population>100000
    
    select * from v_city_3500;
    
     asciiname | country | population | elevation
    -----------+---------+------------+-----------
     Potosi    | BO      |     141251 |      3967
     Oruro     | BO      |     208684 |      3936
     La Paz    | BO      |     812799 |      3782
     Lhasa     | CN      |     118721 |      3651
     Puno      | PE      |     116552 |      3825
     Juliaca   | PE      |     245675 |      3834
    

    在火花壳中:

    val sx= new org.apache.spark.sql.SQLContext(sc)
    
    var props=new java.util.Properties()
    props.setProperty("driver", "org.postgresql.Driver" )
    val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"
    
    val city_df=sx.read.jdbc(url=url,table="t_city",props)
    val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)
    

    结果:

    city_df.count()
    Long = 145725
    
    city_3500_df.count()
    Long = 6
    

    【讨论】:

      【解决方案4】:

      要将查询的输出保存到新的数据帧,只需将结果设置为等于变量:

      val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
      

      现在newDataFrame 是一个具有所有可用数据框功能的数据框。

      【讨论】:

      • 谢谢,但我需要在 SQL 中加入许多表并只选择几列。我不想将每个表都加载到 Spark 中。我可以创建一个 SQL 表以通过查询加载吗?查看我添加到问题中的详细信息。
      【解决方案5】:

      使用 MYSQL 读取/加载数据,如下所示

      val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]")
          val sc = new SparkContext(conf)
          sc.setLogLevel("ERROR")
          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
          val jdbcDF = sqlContext.read.format("jdbc").options(
            Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password",
              "dbtable" -> "TABLE_NAME")).load()
      

      如下表写入数据

      import java.util.Properties
          val prop = new Properties()
          prop.put("user", "<>")
          prop.put("password", "simple$123")
          val dfWriter = jdbcDF.write.mode("append")
          dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)
      

      要从查询创建数据框,请执行以下操作

      val finalModelDataDF = {
            val query = "select * from table_name"
            sqlContext.sql(query)
          };
      
          finalModelDataDF.show()
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-11-17
        • 2020-05-03
        • 2016-05-15
        • 1970-01-01
        • 2019-11-19
        • 2017-09-02
        • 1970-01-01
        相关资源
        最近更新 更多