【问题标题】:How to run ETL pipeline on Databricks (Python)如何在 Databricks (Python) 上运行 ETL 管道
【发布时间】:2020-06-27 15:27:25
【问题描述】:

我正在尝试使用 kinesis 数据流和使用 spark 的数据块笔记本创建实时情绪分析模型。我注意到执行数据转换、构建模型、分析情绪以及将数据发送到数据库的代码块只运行一次,但我希望这些代码块持续运行,直到我决定停止执行。数据块中有没有办法让代码块连续运行,直到用户决定终止执行?

我尝试将我的笔记本作为作业运行,但处理 spark 流的代码块永远运行,并且不允许其他代码块完成 ETL 过程。

这是我如何设置火花流的问题吗?这是我如何设置它的代码:

kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "latest") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("tweets")  \
  .start()

上面的代码块 (.start) 是当我尝试将笔记本作为作业运行并且不允许其他代码块执行时连续运行的代码块。附言我对 databricks 和 spark 很陌生

【问题讨论】:

  • 我最近参加了 DB 课程,甚至在 CE 上也不是问题。使用另一个单元格或打开一个新选项卡。我觉得这个问题不容易理解。
  • @thebluephantom 对问题结构感到抱歉。处理 ETL 过程的其他代码位于同一笔记本的其他代码块中。问题是当 spark 流从 kinesis 数据流中获取更多数据时,它们不会连续运行。它们将运行一次并使用当前位于包含 spark 流的 df 中的数据快照来运行进程并发送到 dynamodb。这只是我运行笔记本的问题吗?
  • 听起来很合理,其他人只运行一次
  • 请显示该代码

标签: python apache-spark spark-streaming databricks amazon-kinesis


【解决方案1】:

我认为您需要按照 https://docs.databricks.com/spark/latest/structured-streaming/examples.html#write-to-amazon-dynamodb-using-foreach-in-scala-and-python 使用 foreach 遵循 Dynamo DB as Sink 的方法。

从数据库手册开始 - 关注foreach

from pyspark.sql.functions import *

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
       .selectExpr("value % 10 as key")
       .groupBy("key")
       .count()
       .toDF("key", "count")
       .writeStream
       .foreach(SendToDynamoDB_ForeachWriter())
      #.foreach(sendToDynamoDB_simple)  // alternative, use one or the other
       .outputMode("update")
       .start()'
)

streamingDF.writeStream.foreach() 允许您将流式查询的输出写入任意位置。这就是这里的线索。

【讨论】:

  • 好的,谢谢,我试试看。我正在流式传输到数据库的是 JSON 对象。这些 JSON 对象存储在一个列表中,然后对其进行迭代以发送到数据库。
  • 没关系,但您需要保持“外国”水槽的连续性。
  • 好的,我得到了数据库的流,但是我如何让我的笔记本中的情绪分析模型也连续运行,因为我需要从我的 spark 流中分析新的流数据,然后发送将结果存入数据库?
  • 增量学习还是针对训练好的模型运行?
  • 我正在使用 textblob 库,所以它只是分析那里的情绪并为推文分配一个情绪值
【解决方案2】:

在 Python 中编写 ETL 表单可以采用多种结构,具体取决于专门的先决条件、业务目标、现有工具适合哪些库,以及设计人员认为他们必须在没有任何准备的情况下工作多少。 Python 的优点在于处理记录的信息结构和单词引用,这在 ETL 任务中很重要。

Python 具有足够的适应性,客户几乎可以使用本地信息结构编写任何 ETL 过程。例如,在固有的 Python 数学模块的帮助下,从纲要中筛选无效质量很简单:

import math data = [1.0, 3.0, 6.5, float('NaN'), 40.0, float('NaN')] filtered = [] for value in data: if not math.isnan(value): 已过滤。附加(值)

在没有任何准备的情况下对整个 ETL 过程进行编码并不是特别精通,因此大多数 ETL 代码最终都是纯粹的 Python 代码和远程表征的能力或文章的混合体,例如,来自之前引用的库的那些。例如,客户端可以利用 pandas 来引导包含空值的整个 DataFrame:

筛选 = data.dropna()

Python 编程改进包 (SDK)、应用程序编程接口 (API) 和不同的实用程序可用于某些阶段,其中一些可能有助于 ETL 编码。例如,Anaconda 阶段是对处理信息很重要的模块和库的 Python 挪用。它结合了自己的包主管和云服务,用于共享代码便签本和 Python 条件。

对于大部分使用 Python 进行编码具有重要意义的建议中的很大一部分也适用于 ETL 编程。例如,代码应该是“Pythonic”——这意味着开发人员应该遵循一些语言明确的规则,使内容紧凑清晰,并符合软件工程师的目标。文档也很重要,就像捆绑电路板并留意条件一样。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-29
    • 2021-06-16
    • 2012-07-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多