【问题标题】:SparklyR wrapper for spark SQL: sqlContext.sqlSpark SQL 的 SparklyR 包装器:sqlContext.sql
【发布时间】:2017-07-30 17:56:01
【问题描述】:

我正在尝试为 SparklyR 的 SQL 函数编写一个包装器。我创建了以下函数:

sqlfunction <- function(sc, block) {
  spark_context(sc) %>% 
invoke("sqlContext.sql", block) }

然后我使用以下方式调用它:

newsqlData <- sqlfunction(sc, "select
                          substr(V1,1,2),
                          substr(V1,3,3),
                          substr(V1,6,6),
                          substr(V1,12,4),
                          substr(V1,16,4)
                          FROM TABLE1 WHERE V1 IS NOT NULL")

但我收到以下错误:

Error: java.lang.IllegalArgumentException: invalid method sqlContext.sql for object 12
at sparklyr.Invoke$.invoke(invoke.scala:113)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
at sparklyr.StreamHandler$.read(stream.scala:55)
at sparklyr.BackendHandler.channelRead0(handler.scala:49)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

任何建议或修复将不胜感激。

【问题讨论】:

    标签: r apache-spark apache-spark-sql sparklyr r-dbi


    【解决方案1】:

    应该是:

    sqlfunction <- function(sc, block) {
      spark_session(sc) %>% invoke("sql", block)
    }
    

    其中scspark_connection(来自:spark_connect(master = master_url) 的输出)。

    这个:

    • spark_session(sc) - 从连接对象中检索 SparkSession
    • invoke("sql", block) - 使用 block 作为参数调用 SparkSession 实例的 sql 方法。

    示例用法:

    library(sparklyr)
    
    sc <- spark_connect(master = "local[*]")
    sqlfunction(sc, "SELECT SPLIT('foo,bar', ',')")
    
    <jobj[11]>
      class org.apache.spark.sql.Dataset
      [split(foo,bar, ,): array<string>]
    

    这将为您提供对 Java 对象的引用。如果您愿意,例如可以注册为临时表:

    ... %>% invoke("createOrReplaceTempView", "some_name_for_the_view")
    

    并通过tbl访问:

    library(dplyr)
    
    tbl(sc, "some_name_for_the_view") 
    

    ... %>% sdf_register()
    

    直接获取tbl_spark对象。

    您使用的代码:

    • spark_context - 提取 SparkContext 实例。
    • invoke("sqlContext.sql", block) - 尝试调用不存在的方法 (sqlContext.sql)。

    在最新版本中,您可以将invoke("createOrReplaceTempView", ...) 替换为简单的sdf_register

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-05-15
      • 2017-02-25
      • 1970-01-01
      • 2017-03-31
      • 2019-11-04
      • 2017-02-23
      • 2022-12-15
      • 2019-05-29
      相关资源
      最近更新 更多