【问题标题】:Is it possible to dynamically generate BigQuery table names based on the timestamps of the elements of a window?是否可以根据窗口元素的时间戳动态生成 BigQuery 表名?
【发布时间】:2015-11-17 14:40:39
【问题描述】:

例如,如果我有一个从 PubSub 读取的具有 5 分钟窗口的 Dataflow 流作业,我了解如果我为一个元素分配一个过去两天的时间戳,那么将有一个包含该元素的窗口,并且如果我使用在 BigQueryIO.java 中描述的将每日表输出到 BigQuery 的示例中,该作业会将过去两天的元素写入 BigQuery 表中的实际日期。

我想用窗口元素的时间戳而不是当前窗口的时间将过去的元素写入 BigQuery 表,可以吗?

现在我正在遵循 DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java 中描述的示例:

    PCollection<TableRow> quotes = ...
    quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
       .apply(BigQueryIO.Write
         .named("Write")
         .withSchema(schema)
         .to(new SerializableFunction<BoundedWindow, String>() {
               public String apply(BoundedWindow window) {
                 String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
                   ((DaysWindow) window).getStartDate());
                 return "my-project:output.output_table_" + dayString;
               }
             }));

【问题讨论】:

  • 向我们展示一些代码,因为不清楚您在问什么。
  • Pentium10 - 我已经用我现在使用的代码更新了问题。

标签: java google-bigquery google-cloud-dataflow


【解决方案1】:

如果我理解正确,您希望确保 BigQuery 表是根据元素的固有时间戳(引号)创建的,而不是管道运行时的挂钟时间。

TL;DR 代码应该已经做你想做的事了;如果不是,请发布更多详细信息。

更长的解释: Dataflow 中处理的一项关键创新是事件时间处理。这意味着 Dataflow 中的数据处理几乎与 处理 发生的时间完全分离 - 重要的是 正在处理的事件 发生的时间。这是使完全相同的代码能够在批处理或流数据源上运行的关键要素(例如,使用处理历史点击日志的相同代码处理实时用户点击事件)。它还可以灵活处理迟到的数据。

请参阅The world beyond batch,“事件时间与处理时间”部分,了解 Dataflow 处理模型这方面的描述(整篇文章非常值得一读)。如需更深入的描述,请参阅VLDB paper。这在windowingtriggers 的官方文档中也以更加面向用户的方式进行了描述。

因此,没有“当前窗口”这样的东西,因为管道可能同时处理许多不同的事件,这些事件发生在不同的时间,属于不同的窗口。事实上,正如 VLDB 论文所指出的,Dataflow 管道执行的重要部分之一就是“按窗口分组元素”。

在您展示的管道中,我们将使用记录上的provided timestamps 将您要写入 BigQuery 的记录分组到窗口中,并将每个窗口写入自己的表,并在必要时为新遇到的窗口创建表。如果延迟数据到达窗口(有关延迟数​​据的讨论,请参阅有关窗口和触发器的文档),我们将追加到已经存在的表中。

【讨论】:

    【解决方案2】:

    上述代码不再适合我。 Google 文档中有一个 updated example,尽管其中 DaysWindow 被替换为对我有用的 IntervalWindow

     PCollection<TableRow> quotes = ...
     quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
       .apply(BigQueryIO.Write
         .named("Write")
         .withSchema(schema)
         .to(new SerializableFunction<BoundedWindow, String>() {
           public String apply(BoundedWindow window) {
             // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
             String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
                  .withZone(DateTimeZone.UTC)
                  .print(((IntervalWindow) window).start());
             return "my-project:output.output_table_" + dayString;
           }
         }));
    

    【讨论】:

      猜你喜欢
      • 2011-01-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-05-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多