【问题标题】:Map a table of a cassandra database using spark and RDD使用 spark 和 RDD 映射 cassandra 数据库的表
【发布时间】:2015-08-24 07:16:16
【问题描述】:

我必须映射一个表格,其中记录了应用程序的使用历史。该表有这些元组:

<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>

AppId总是不同的,因为很多app都引用了date这样的格式来表示dd/mm/yyyy hh/mmcpuUsagememoryUsage是用%来表示所以例如:

<3ghffh3t482age20304,230720142245,0.2,3,5>

我是这样从cassandra中检索数据的(小sn-p):

public static void main(String[] args) {
        Cluster cluster;
        Session session;
        cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
        session = cluster.connect();
        session.execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication "
                + "= {'class':'SimpleStrategy', 'replication_factor':3};");
        String createTableAppUsage = "CREATE TABLE IF NOT EXISTS foo.appusage"
                + "(appid text,date text, cpuusage double, memoryusage double, "
                + "PRIMARY KEY(appid,date) " + "WITH CLUSTERING ORDER BY (time ASC);";
        session.execute(createTableAppUsage);
        // Use select to get the appusage's table rows
        ResultSet resultForAppUsage = session.execute("SELECT appid,cpuusage FROM foo.appusage");
       for (Row row: resultForAppUsage)
             System.out.println("appid :" + row.getString("appid") +" "+ "cpuusage"+row.getString("cpuusage"));
        // Clean up the connection by closing it
        cluster.close();
    }

所以,我现在的问题是通过key value 映射数据并创建一个集成此代码的元组(sn-p 不起作用):

        <AppId,cpuusage>

        JavaPairRDD<String, Integer> saveTupleKeyValue =someStructureFromTakeData.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String x) {
                return new Tuple2(x, y);
            }

如何使用 RDD 和 reduce eg. cpuusage &gt;50 映射 appId 和 cpuusage?

有什么帮助吗?

提前致谢。

【问题讨论】:

  • 不确定我是否理解这个问题。您想用等效的 spark - cassandra 连接 API 表达式替换 `session.execute("SELECT appid,cpuusage FROM foo.appusage");`?
  • @maasg 嗨,我的问题是,在从 cassandra 检索数据后,如上面的代码所示,我想创建一个数据集 RDD 来映射 并对 reduce 进行操作在这个.. 例如。减少cpu的利用率> 50 ..等等。我该怎么做?

标签: java mapreduce apache-spark rdd


【解决方案1】:

假设您已经创建了一个有效的 SparkContext sparkContext,已将 spark-cassandra 连接器依赖项添加到您的项目并配置您的 spark 应用程序以与您的 cassandra 集群通信(参见docs),那么我们可以像这样在 RDD 中加载数据:

val data = sparkContext.cassandraTable("foo", "appusage").select("appid", "cpuusage")

在 Java 中,这个想法是相同的,但它需要更多的管道,描述为 here

【讨论】:

  • 感谢您的回答,我配置了所有内容..我可以执行查询等。所以可以返回一个地图,在这个例子中val data是一个rdd地图?
  • @OiRc RDD 是集合,不限制键重复,因此它们不符合“映射”(如数据结构中的)合同。您需要 Map 的哪些功能? Spark 中可能有办法。
  • 不不,我不想使用 java map.. 术语 map 我的意思是我想包含所有 对,然后减少它们。你能看到这个question, related to this question,你能给我一个答案吗?我想知道的是,在返回之前是否可以对reduceByKey 函数进行操作。
  • 你能在 rdd 地图中做这个吗?就像从 rdd 地图中读取数据一样?
  • @ArditMeti 不,您不能在 RDD 函数闭包中使用 sparkContext
猜你喜欢
  • 1970-01-01
  • 2016-01-06
  • 2015-01-21
  • 1970-01-01
  • 2020-06-25
  • 2011-01-28
  • 2020-12-14
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多