【问题标题】:Druid for non time-series data非时间序列数据的德鲁伊
【发布时间】:2017-02-21 15:55:29
【问题描述】:

对于数据在生成后立即发送到 Druid 的情况,一切都很好(就像在物联网中一样)。喜欢它。

但现在我有不同的情况,源于延迟数据输入。

最终用户可以离线(失去互联网连接),数据存储在她的手机中,只有在她重新上线后才会发送给 Druid。

这意味着,当她恢复互联网时,发送给 Druid 的数据(例如通过 Tranquility 服务器)将被 Druid 拒绝(因为 Druid 实时不接受过去的数据)。

当然,我可以将时间戳设置为数据发送到服务器的时间。但这会歪曲报告...,除非...,如果我添加另一个字段(比如说:generate_ts),并将其声明为另一个维度。

但是我不会从您在 Druid (?) 中免费获得的基于时间的自动汇总中受益。我将不得不使用 groupBy(将 generate_ts 作为维度之一),如下所示:

{
  "queryType": "groupBy",
  "dataSource": "my_datasource",
  "granularity": "none",
  "dimensions": [
    "city",
    {
      "type" : "extraction",
      "dimension" : "generated_ts",
      "outputName" :  "dayOfWeek",
      "extractionFn" : {
        "type" : "timeFormat",
        "format" : "EEEE"
      }
    }
  ],
  ...
}

我的问题是:

  1. 这种方法有效吗?
  2. 如果是:罚款是多少? (我想这会是性能,但有多糟糕?)

谢谢, 拉卡

--

在下面回应 Ramkumar 的回复,后续问题:

我还是不太明白这批摄取:

假设事件 A。它在时间戳 3 生成,直到时间戳 15 才发送到服务器。

当它在时间戳 15 发送时,它具有以下值:{ts: 15, generated_ts: 3, metric1: 12, dimension1: 'a'}

他们的时间戳键是“ts”。

这是不准确的,理想的情况是 {ts: 3, generated_ts: 3, metric1: 12, dimension1: 'a'},但我必须将 15 指定为 insert_ts,以便 Tranquility 接受它。

现在,在批量摄取期间,我想修复它,现在它具有正确的 ts {ts: 3, generated_ts: 3, metric1: 12, dimension1: 'a'}。

问题:那我会有重复的事件吗?

或者...(我怀疑):指定时间间隔的批量摄取基本上会替换该间隔内的所有数据? (我希望是这样,那我就不用担心数据重复了)

附加说明(刚刚):我遇到了这个:https://github.com/druid-io/tranquility/blob/master/docs/overview.md#segment-granularity-and-window-period

说:

我们在 Metamarkets 的方法是通过 Tranquility 实时发送我们的所有数据,但也通过在 S3 中存储副本并跟进夜间 Hadoop 批处理索引作业以重新摄取数据来降低这些风险。这使我们能够保证最终,每个事件在 Druid 中只表示一次。

所以...这是重新摄取,其含义(我猜)完全替代,对吧?

【问题讨论】:

  • 您的方法是有效的,但正如您所说,在延迟方面肯定会产生成本。尽管 druid 使用缓存,但它仍然不如原生 rollups 高效。我们遇到了类似的问题,我们的解决方案是使用批量摄取来修复延迟数据。虽然不优雅,但这是我们能想到的最好的。
  • 我还是不太明白这批摄取。上面的问题(在最后的原始帖子中)。
  • 它将完全替换所有段,不会有任何重复。我可能会以单独的答案解释我们所做的事情,看看是否可以帮助您。

标签: time-series offline olap rollup druid


【解决方案1】:

我们遇到了类似的问题,我们使用 lambda 架构解决了这个问题。我们的设置中有 2 个管道:

  1. 我们的实时管道从 Kafka+Spark 获取数据,并引入 druid。这将是实时数据。但是,比 druid 期望的粒度更旧的数据将被拒绝。因此,延迟数据到达会导致数据丢失。
  2. 我们的批处理管道每小时将数据摄取到 Hadoop,然后我们触发批处理摄取作业到 Druid。这将为键中提到的时间戳创建段,进行聚合并用相同的时间戳替换旧段。在实践中,druid 的存储原理基于不变性、MVCC 和日志结构存储。因此,当新版本的段使用相同的时间戳时,旧段将被垃圾收集。

有关批量摄取的更多详细信息: 我们的批处理作业操作来自 HDFS 的数据,这些数据被组织到每小时文件夹中。我们得到的任何迟到的事件都会被放入正确的小时桶中。我们有 XXX 小时延迟数据的 SLA(如果您已阅读 great article,则称为水印)。因此,我们取当前小时,减去 XXX 并获取相应的每小时文件夹文件,并在 druid 上触发该特定小时的批处理摄取作业。请注意,如果事件在水印之前到达,这仍然​​会导致数据丢失,但我们需要做出妥协,因为 druid 不支持在特定小时内对片段进行就地更新,而且我们也不能拥有任意长的水印,因为我们的HDFS 端的存储空间非常有限。

【讨论】:

  • 感谢所有答案和指点!所以,完整的解决方案需要外部任务调度程序,对吧?我正在考虑 RunDeck。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-03
相关资源
最近更新 更多