【问题标题】:Spark structured streaming join not workingSpark结构化流式连接不起作用
【发布时间】:2019-04-15 05:28:49
【问题描述】:

我对 Spark 结构化流式传输相当陌生,我正在尝试将来自 Kafka 主题(Spark 2.3.2、Kafka 2.0)的多个流连接在一起

连接在流上效果很好,我可以对键进行简单的等连接。在来自 2 个主题的一个特定连接上,我必须进行一些数据转换,因为在一个主题上,连接键以十六进制编码,而在另一个主题上,它是 base64。

经过大量调试,我得到了以下代码,我正在 PySpark Zeppelin 笔记本中对其进行测试。 2 个主题流存储在 Python 字典中

debug = (topicStreams['invprop']
   .where("invpropv.PHC_UID_IPID = '183C1BA9B3444919B6C33DAB0B639A87'")
   .writeStream.outputMode("append").format("memory")
   .queryName("debug").start()
)

正如我所料,这会从第一个主题返回一条消息

debug2 = (topicStreams['hca']
   .where("hex(unbase64(hcav.id)) = '183C1BA9B3444919B6C33DAB0B639A87'")
   .writeStream.....

这第二个流还返回一条消息,这当然是我要加入的两条消息。我想我可以假设键确实匹配。

debug3 = (topicStreams['invprop']
   .join(topicStreams['hca'], 
         expr("invpropv.PHC_UID_IPID = hex(unbase64(hcav.id))"))
   .writeStream...

这个连接永远不会返回任何东西。什么会导致此连接失败?我想我一定忽略了一些基本的东西。

【问题讨论】:

    标签: apache-spark apache-kafka


    【解决方案1】:

    嗯,一如既往,写一个 Stackoverflow 问题总能提供答案。这次以一种非常出乎意料的方式......

    输入上面的问题花了我几分钟,然后我再次检查了我的 Zeppelin 笔记本。瞧,我现在得到了我正在寻找的单联唱片。

    连接非常慢,但它可以工作 - 需要 5 多分钟才能产生结果。我以前从来没有等过足够长的时间。不,主题不是很大,仅包含几十万条消息。

    嗯,现在我知道加入基本上是有效的。我必须找出它为什么这么慢,以及如何加快速度。

    【讨论】:

      【解决方案2】:

      我对 Pyspark 不熟悉,但我可以看到在 Python 中比较运算符是 == 而不是 = 在您的代码中。
      请重新检查= 运算符在expr() 函数中的作用

      【讨论】:

      • 感谢您的回答。 expr() 中的代码实际上是 Spark SQL 代码,所以这里的单个 = 是正确的
      猜你喜欢
      • 2020-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-13
      • 2019-12-07
      • 2017-05-04
      相关资源
      最近更新 更多