【问题标题】:Dependency Issues for Cloud DataProc + Spark + Cloud BigTable with JAVACloud DataProc + Spark + Cloud BigTable 与 JAVA 的依赖性问题
【发布时间】:2018-03-12 21:32:07
【问题描述】:

我需要创建一个在 Cloud DataProc 上运行的应用程序,并使用 Spark 以大规模并行方式处理大型 BigTable 写入、扫描和删除。这可以在 JAVA 中(如果可行,也可以在 Python 中)。

我正在尝试使用 Eclipse 编写最少的代码,以实现从 BigTable 表中获取 RDD 的基本功能,方法是使用 bulkPut/bulkDelete/butkGet 或使用 newAPIHadoopRDD()或类似的东西。

我在 SO 和其他地方看到了多篇关于如何做到这一点的帖子,以及关于连接 Bigtable API、HBase API 和 Spark 的各种挑战。其中一些帖子现在已经过时了(几年前,所以可能不相关)。到目前为止,我还没有设法让任何工作,主要是由于各种依赖冲突或不一致。无论我在 POM.XML 中尝试何种依赖项和版本组合,当我尝试运行时,我都会遇到 ClassNotFound 或 NoSuchMethod 异常。

我可以就我需要包含的 Spark、HBase 和 Bigtable 依赖版本和包的“工作”组合获得一些建议吗?我的 POM.xml 目前如下所示。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>FFSpark5</groupId>
  <artifactId>FFSpark5</artifactId>
  <version>0.0.1-SNAPSHOT</version>


    <properties>
        <bigtable.version>1.0.0</bigtable.version>
        <hbase.version>1.3.1</hbase.version>
        <hbase-shaded.version>2.0.0-alpha2</hbase-shaded.version>
        <hbase-spark.version>2.0.0-alpha4</hbase-spark.version>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <spark.version>1.6.2</spark.version>
        <spark-streaming.version>1.6.2</spark-streaming.version>
        <scala.version>2.11.0</scala.version>
        <scalatest.version>3.0.3</scalatest.version>
        <bigtable.projectID>my_gcp_project_id</bigtable.projectID>
        <bigtable.instanceID>my_bigtable_instance_name</bigtable.instanceID>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark-streaming.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-spark</artifactId>
         <version>${hbase-spark.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.bigtable</groupId>
            <artifactId>bigtable-hbase-1.x-hadoop</artifactId>
            <version>${bigtable.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-shaded-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-shaded-client</artifactId>
            <version>${hbase-shaded.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-shaded-server</artifactId>
            <version>${hbase-shaded.version}</version>
        </dependency>

    </dependencies>

    <build>
    <outputDirectory>target/classes</outputDirectory>

    <resources>
        <resource>
            <directory>src/main/resources</directory>
            <filtering>true</filtering>
        </resource>
    </resources>

    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
                <mainClass>FFSpark5</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

我意识到这个版本的 POM.xml 中可能有很多错误的东西,但是我已经尝试了许多依赖项和版本的组合,但无法让它们中的任何一个真正起作用。就日志输出而言,最新的一个似乎走得最远,但仍然中断。这是最新的堆栈跟踪:

18/03/12 15:37:17 INFO BigtableSession: Bigtable options: BigtableOptions{dataHost=bigtable.googleapis.com, tableAdminHost=bigtableadmin.googleapis.com, instanceAdminHost=bigtableadmin.googleapis.com .... (lost of other options here)}.
18/03/12 15:37:17 INFO RefreshingOAuth2CredentialsInterceptor: Refreshing the OAuth token
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.6 KB, free 210.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.9 KB, free 230.5 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58982 (size: 19.9 KB, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 0 from broadcast at HBaseContext.scala:73
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 160.0 B, free 230.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 120.0 B, free 230.7 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:58982 (size: 120.0 B, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 1 from broadcast at HBaseContext.scala:74
Direct test done
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.getMvccReadPoint()J
    at org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor.getMvccReadPoint(PackagePrivateFieldAccessor.java:39)
    at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:1088)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:601)
    at FFSpark5.main(FFSpark5.java:64)

下面是我的基本代码。我们的想法是进行 3 次测试:

测试 1:只是尝试通过简单的 Bigtable API 直接访问 Bigtable,只是为了确保没有简单的问题,例如身份验证等。这工作正常

测试 2:尝试获取 newAPIHadoopRDD()。这失败了

测试 3:尝试获取 bulkPut()。这失败了

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Connection;

class PutFunction implements Function<String, Put> {

    private static final long serialVersionUID = 1L;


    public Put call(String v) throws Exception {
      String[] cells = v.split(",");
      Put put = new Put(Bytes.toBytes(cells[0]));

      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
              Bytes.toBytes(cells[3]));
      return put;
    }
  }


public class FFSpark5
{
    public static void main(String args[]) throws IOException
    {
        SparkConf conf = new SparkConf().setAppName("SparkTest").setMaster("local");        
        JavaSparkContext sc = new JavaSparkContext(conf);       
        Configuration hBaseConf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(hBaseConf);
        JavaHBaseContext hbaseContext = new JavaHBaseContext(sc, hBaseConf);

        // test 1: simple direct Bigtable access
        connection.getTable(TableName.valueOf("FFTransFlat".getBytes()))
            .put(new Put("abc".getBytes())
            .addColumn("CI".getBytes(), "I".getBytes(), "val".getBytes()));
        System.out.println("Direct test done");


        // Test 2: newAPIHadoopRDD() 
        Scan scan1 = new Scan();
        scan1.setCaching(500);
        scan1.setCacheBlocks(false);

        hBaseConf.set(TableInputFormat.INPUT_TABLE, "FFTransFlat");
        hBaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan1));

        JavaPairRDD<ImmutableBytesWritable, Result> source = sc
                .newAPIHadoopRDD(hBaseConf, TableInputFormat.class,
                        ImmutableBytesWritable.class, Result.class);

        System.out.println(source.count());

        // Test 3: bulkPut()  

         List<String> list = new ArrayList<String>(5);
            list.add("1,CI,a,1");   
            list.add("2,CI,a,2");   
            list.add("3,CI,a,3");   

            JavaRDD<String> rdd = sc.parallelize(list);

            byte tableName[] = "FFTransFlat".getBytes();
            hbaseContext.bulkPut(rdd,
                    TableName.valueOf(tableName),
                    new PutFunction());

            System.out.println(source.count());


        connection.close();
    }

}

我从 DataProc 网站上看到支持 Spark 2.2.0 和 1.6.2。我遇到了 2.2.0 的问题,所以我使用的是 1.6.2。

我能否就以下问题获得一些建议: 要使用的依赖项和版本的正确组合是什么(特别是使用 Cloud Bigtable,而不是 HBase 集群)

是否建议使用newAPIHadoopRDD 或类似bulkRead()/bulkDelete()/etc 来实现并行化。 ?或者是否有另一种首选和高性能的方式来使用 DataProc/Bigtable 执行 MPP?

为这篇冗长的帖子道歉——这是我们第一次尝试 DataProc。

***更新:

在将 Bigtable 依赖项更新为 bigtable-hbase-2.x-hadoop 并将 HBase 版本更新为 2.0.0-alpha2 后,我设法得到了一些工作。至少 bulkPut 似乎在这个阶段工作。现在将致力于从依赖项中清除不需要的东西。

【问题讨论】:

  • 较新的 Cloud Bigtable 版本具有 hbase 2.* 兼容的工件。使用 1.2.0 bigtable.version,并使用 bigtable-hbase-2.x-hadoop 工件而不是 bigtable-hbase-1.x-hadoop。我不确定 HBase 版本之间的不匹配会对您的环境造成什么影响;理想情况下,hbase 版本和 Cloud Bigtable 版本应该是同步的。

标签: apache-spark hbase google-cloud-dataproc google-cloud-bigtable


【解决方案1】:

Here 是 Cloud Bigtable 的完整工作示例,带有基于 HBase 1 的 Hortonwork's SHC。我们将使用 HBase 2 创建一个类似的示例。 新的 Spark 集成基于 Cloud Bigtable 工件,旨在与新的 HBase 2.* API (tracking issue link) 一起使用。

【讨论】:

  • 谢谢。正如答案更新中提到的,我可以确认在我找到 bigtable-hbase-2.x-hadoop 之后,事情开始起作用了。我仍然必须使用 Spark 1.6.2,因为 2.x 似乎缺少一些日志依赖项,这些依赖项是所有这些都需要一起工作的。但这一切都很好。
猜你喜欢
  • 2020-04-03
  • 2017-07-08
  • 2019-06-26
  • 2018-02-16
  • 1970-01-01
  • 1970-01-01
  • 2018-03-26
  • 2015-12-21
  • 1970-01-01
相关资源
最近更新 更多