【问题标题】:How to write a Spark Dataframe to Azure Event hub?如何将 Spark Dataframe 写入 Azure 事件中心?
【发布时间】:2022-04-06 21:12:06
【问题描述】:

我需要将 Synapse Apache Spark Scala 脚本中的 DataFrame 写入 Azure 事件中心。

我的解决方案基于这篇文章。 https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md#writing-data-to-eventhubs

如何将具有多列的 DataFrame 转换为具有称为“body”的单列且所有列都为 json 的 DataFrame?

我正在使用 Scala 和 Spark 3

这是我找到的解决方案,其中 toJson 创建了一个名为“值”的列。

df.toJSON.selectExpr("value as body")

谢谢。

【问题讨论】:

  • 我无法让该解决方案在 Scala 中运行。也许这些函数只在 python 库中可用。我下面的解决方案有效,但我想避免在 toJSON 和 selectExpr 之间创建额外的 Dataframe

标签: apache-spark azure-eventhub azure-synapse


【解决方案1】:

这里有一个解决方案。

首先从eventthub中获取连接字符串。这不是事件中心命名空间中的连接字符串。正确的连接字符串将包含带有 eventhub 名称的“EntityPath”。

例如。

val connectionString="Endpoint=sb://{youreventhub}.servicebus.windows.net/;SharedAccessKeyName={policyname};SharedAccessKey={yourkey};EntityPath={eventhub}";

创建事件中心配置。

import org.apache.spark.eventhubs._
val ehWriteConf = EventHubsConf(connectionString);

将您的数据框转换为带有列正文的数据框并写入 eventthub。

df.toJSON.selectExpr("value as body").
write.format("eventhubs").options(ehWriteConf.toMap).
save()

【讨论】:

    【解决方案2】:

    我一直在做df.select(struct(*[c for c in df.columns]).alias("body")),它对我来说效果很好。它将按原样获取您的数据框,并使整个内容成为列名为“body”的新数据框的值。

    【讨论】:

      猜你喜欢
      • 2020-06-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多