【问题标题】:In Scala, What is correct way to filter Spark Cassandra RDD by a List[String]?在 Scala 中,通过 List[String] 过滤 Spark Cassandra RDD 的正确方法是什么?
【发布时间】:2021-08-04 02:09:49
【问题描述】:

我有一个字符串格式的 id 列表,这个列表大约有 20,000 个 id:

var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;

当我对我的一个 cassandra 表使用此列表时,它工作得很好,无论timelineIdsString 的大小如何:

var timelineHistorySource = sc.cassandraTable[Timeline]("acd", "timeline_history_bytimelineid")
        .select("ownerid", "userid", "timelineid", "timelinetype", "starttime", "endtime", "attributes", "states")
if (constrain)
    timelineHistorySource = timelineHistorySource.where("timelineid IN ?", timelineIdsString)

当我对我的另一个表执行此操作时,当我在列表中有超过 1000 个 id 时,它永远不会完成:

var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
            .select("ownerid","dispositionid","month","timelineid","createddate","createduserid")
if(constrain)
    dispositionSource = dispositionSource.where("timelineid IN ?", timelineIdsString);

两个 cassandra 表都有 key 作为时间线,所以我知道它是正确的。只要timelineids 是一个小列表,此代码就可以正常工作。

有没有更好的方法从 cassandra RDD 中过滤?是不是 IN 子句的大小导致它窒息?

【问题讨论】:

    标签: scala apache-spark cassandra apache-spark-sql spark-cassandra-connector


    【解决方案1】:

    与其在 Spark 级别执行连接,不如使用 Cassandra 本身执行连接 - 在这种情况下,您将从 Cassandra 中仅读取必要的数据(假设连接键是分区键或主键)。对于 RDD,这可以通过 .joinWithCassandraTable 函数 (doc) 来完成:

    import com.datastax.spark.connector._
    
    val toJoin = sc.parallelize(1 until 5).map(x => Tuple1(x.toInt))
    val joined = toJoin.joinWithCassandraTable("test", "jtest1")
      .on(SomeColumns("pk"))
    
    scala> joined.toDebugString
    res21: String =
    (8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
     |  ParallelCollectionRDD[147] at parallelize at <console>:33 []
    

    对于 Dataframe,它被称为 direct join,自 SCC 2.5 起可用(请参阅 announcement) - 您需要传递一些配置才能启用它,请参阅文档:

    import spark.implicits._
    import org.apache.spark.sql.cassandra._
    
    val cassdata = spark.read.cassandraFormat("jtest1", "test").load
    
    val toJoin = spark.range(1, 5).select($"id".cast("int").as("id"))
    val joined = toJoin.join(cassdata, cassdata("pk") === toJoin("id"))
    
    scala> joined.explain
    == Physical Plan ==
    Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
    +- *(1) Project [cast(id#0L as int) AS id#2]
       +- *(1) Range (1, 5, step=1, splits=8)
    

    我有一个long & detailed blog post about joins with Cassandra - 查看更多详细信息。

    【讨论】:

    • 谢谢,这就是我要找的。它是有效、快速地加入 RDD 的最佳方式的可靠答案。
    【解决方案2】:

    您可以尝试将 ID 列表保留为数据框 timelineIds,并根据 timelineid 将表与它进行内部连接。然后从生成的 df 中删除不必要的列 (timelineIds.timelineid)。

    【讨论】:

    • 我基本上就是这么做的,而且效果很好。只是不确定这是否是首选的过滤方法。 ''' var dispositionsFromSQL = sparksession.sql(""" |SELECT |cr.ownerid, cr.dispositionid, cr.month, cr.sessionid, cr.createddate, cr.createduserid, cr.channeltype, cr.queuename |FROM allDisps cr |INNER JOIN allTimelineIds dn ON cr.sessionid = dn.timelineid """.stripMargin ) .toDF(); '''
    • 这种join会在Spark级别进行,对于大数据集来说效率很低。最好使用直接连接来代替
    猜你喜欢
    • 2017-06-26
    • 2020-12-14
    • 2015-06-27
    • 2016-12-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多