【发布时间】:2019-04-15 05:28:49
【问题描述】:
我对 Spark 结构化流式传输相当陌生,我正在尝试将来自 Kafka 主题(Spark 2.3.2、Kafka 2.0)的多个流连接在一起
连接在流上效果很好,我可以对键进行简单的等连接。在来自 2 个主题的一个特定连接上,我必须进行一些数据转换,因为在一个主题上,连接键以十六进制编码,而在另一个主题上,它是 base64。
经过大量调试,我得到了以下代码,我正在 PySpark Zeppelin 笔记本中对其进行测试。 2 个主题流存储在 Python 字典中
debug = (topicStreams['invprop']
.where("invpropv.PHC_UID_IPID = '183C1BA9B3444919B6C33DAB0B639A87'")
.writeStream.outputMode("append").format("memory")
.queryName("debug").start()
)
正如我所料,这会从第一个主题返回一条消息
debug2 = (topicStreams['hca']
.where("hex(unbase64(hcav.id)) = '183C1BA9B3444919B6C33DAB0B639A87'")
.writeStream.....
这第二个流还返回一条消息,这当然是我要加入的两条消息。我想我可以假设键确实匹配。
debug3 = (topicStreams['invprop']
.join(topicStreams['hca'],
expr("invpropv.PHC_UID_IPID = hex(unbase64(hcav.id))"))
.writeStream...
这个连接永远不会返回任何东西。什么会导致此连接失败?我想我一定忽略了一些基本的东西。
【问题讨论】: