【问题标题】:Calculating median on the spark dataframe计算火花数据帧的中位数
【发布时间】:2018-01-10 15:22:01
【问题描述】:

我正在使用 Spark Dataframe,我想根据给定数据计算中位数。

SparkContext sc = new SparkContext(new SparkConf().setAppName("sql").setMaster("local"));
        SQLContext sql = new SQLContext(sc);
        //HiveContext

        DataFrame df =sql.read().json("test.json");
        df.registerTempTable("sample");
DataFrame df1=sql.sql("SELECT percentile_approx(imp_recall_timeinterval.average, 0.5) FROM sample");

但我没有得到输出。 我收到以下错误。

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function percentile_approx;
        at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:65)
        at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:65)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:64)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
        at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:573)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:570)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionDown$1(QueryPlan.scala:75)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:85)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:89)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:89)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:94)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:94)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:64)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12.applyOrElse(Analyzer.scala:570)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12.applyOrElse(Analyzer.scala:568)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:568)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:567)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
        at com.cloudera.sparkwordcount.JavaWordCount.<init>(JavaWordCount.java:41)
        at com.cloudera.sparkwordcount.JavaWordCount.main(JavaWordCount.java:131)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

请提供计算中位数的最简单方法。 我尝试了许多可用的解决方案,但找不到合适的解决方案。 注意:- 我使用的是 spark 1.6.1 版本。

这是完整的代码:-

JavaWordCount.java

package com.cloudera.sparkwordcount;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
public class JavaWordCount {

    public static void main(String[] args) {
        SparkContext sc = new SparkContext(new SparkConf().setAppName("sql").setMaster("local"));
        //SQLContext sql = new SQLContext(sc);
        HiveContext sql =new HiveContext(sc);
        DataFrame df =sql.read().json("test.json");
        df.registerTempTable("sample");
        DataFrame df2=sql.sql("SELECT percentile_approx(imp_recall_timeinterval.average, 0.5) FROM sample");
        System.out.println("Median is =======>");
        df2.show();
}
}

这里是 pom.xml:-

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.

  Cloudera, Inc. licenses this file to you under the Apache License,
  Version 2.0 (the "License"). You may not use this file except in
  compliance with the License. You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  CONDITIONS OF ANY KIND, either express or implied. See the License for
  the specific language governing permissions and limitations under the
  License.
  -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.cloudera.sparkwrodcount</groupId>
  <artifactId>sparkwordcount</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>"Spark Word Count"</name>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
    <repository>
      <id>maven-hadoop</id>
      <name>Hadoop Releases</name>
      <url>https://repository.cloudera.com/content/repositories/releases/</url>
    </repository>
    <repository>
      <id>cloudera-repos</id>
      <name>Cloudera Repos</name>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  </properties>

  <build>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
    </plugins>  
  </build>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>1.6.1</version>
    </dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.10</artifactId>
    <version>1.6.2</version>
    <scope>provided</scope>
</dependency>
  </dependencies>
</project>

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    您正在使用的 UDF“percentile_approx()”在您使用时可用 使用配置单元上下文而不是 SQLContext。尝试使用 HiveContext。

        SparkContext sc = new SparkContext(new SparkConf().setAppName("sql").setMaster("local"));
            //SQLContext sql = new SQLContext(sc);
            HiveContext sql = new HiveContext(sc);
    
            DataFrame df =sql.read().json("test.json");
            df.registerTempTable("sample");
    DataFrame df1=sql.sql("SELECT percentile_approx(imp_recall_timeinterval.average, 0.5) FROM sample");
    

    *别忘了导入 hive 所需的库。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>2.0.1</version>
    </dependency>
    

    【讨论】:

    • 我在 pom.xml 中添加了以下依赖 org.apache.hivehive-exec1.2.1 .. 但是当我在我的 java 类中使用它时它不显示 HiveContext
    • 那不是正确的回购。使用这个 org.apache.sparkspark-hive_2.101.6.2provided依赖>
    • 我遇到了同样的异常。 HiveContext 已导入,但在运行时抛出相同的错误。
    • 能否分享完整的代码。那样调试会更容易。请同时提及所有库和各自的版本。
    • 现在我收到以下异常:- 无法使用类加载器 org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@58affbdf 启动数据库“metastore_db”,请参阅下一个详情例外。
    猜你喜欢
    • 2020-08-09
    • 2018-02-15
    • 1970-01-01
    • 2015-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-12
    • 2019-04-12
    相关资源
    最近更新 更多