【问题标题】:Can we able to do Transformation in Druid我们可以在德鲁伊中进行转换吗
【发布时间】:2022-01-17 00:26:51
【问题描述】:

我有一个场景,我将在 csv 文件中接收数据,并且我需要使用现有的列生成一些列。 示例:

Col_1   Col_2    Col_3   Col_4
abc     1        No      123
xyz     2        Yes     123
def     1        Yes     345

预期:

Col_1    Col_2    Col_3   Col_4   Col_5   Col_6
abc      1        No      123     1       1
xyz      2        Yes     123     0       0
def      1        Yes     345     0       0

Col_5 条件:如果 Col_1 = 'abc' then 1 else 0 end Col_6 条件:max(Col_5) 超过 (Col_2)

我知道当我们在其中加载文件时我们可以在 Druid 中执行转换,我尝试了更简单的条件,这对我来说很好,但我很怀疑在这里执行聚合和其他转换,如 Col_6。

我们还需要对我们将要接收的不同文件数据执行聚合,假设我们今天得到 2 个文件,我们将数据加载到 Druid 表中,明天我们又得到了一些 3 个文件,这些文件具有相同 (ID) 的数据这里是 Col_2 那么我们需要根据我们拥有的所有记录进行聚合,例如:这里是 Col_6 生成...

这会在德鲁伊中实现吗?

【问题讨论】:

    标签: scala apache-spark bigdata druid pydruid


    【解决方案1】:

    Col_5 条件:如果 Col_1 = 'abc' then 1 else 0

    你可以使用以下

    df = df.withColumn('Col_5', f.when((f.col('Col_1') == 'abc'), 1).otherwise(0))
    

    Col_6 条件:max(Col_5) 超过 (Col_2)

    可以应用窗口操作

    windowSpec = Window.partitionBy("Col_2").orderBy("Col_5").desc()
    
    df_max = df.withColumn("row_number", row_number().over(windowSpec)).filter(
        f.col("row_number") == 1
    )
    

    现在删除每个 Col_2 的重复项,然后将 df_max 与您的主 df 一起加入。

    上面的代码 sn-p 是在 python 中的,但是 spark API 是一样的,所以你可以用最少的改动来使用它。

    【讨论】:

    • 谢谢 Rahul,但我已经在 Spark Scala 中执行了上述所有转换,但现在我需要在 Druid 本身中执行相同的操作。我们需要从 In between. 中消除 Spark Scala/python。
    【解决方案2】:

    第一种类型if Col_1 = 'abc' then 1 else 0 并不难。例如,请参阅 this article 的类似示例。

    第二个,聚合在一列上,听起来不可能。我们可以聚合所有维度(如主键),但不能聚合单个维度,afaik。

    【讨论】:

      【解决方案3】:

      看看https://druid.apache.org/docs/latest/misc/math-expr.html 其中包含许多您可以使用的转换表达式。

      特别是,我通过创建以下表达式使用 wikipedia 演示数据测试了您的用例:

          {
                  "type": "expression",
                  "name": "isNB",
                  "expression": "case_simple(\"namespace\", 'Main',1,0)"
                },
      
                {
                  "type": "expression",
                  "expression": "greatest( case_simple(\"IsNew\", True, 1, 0), case_simple(\"namespace\", 'Main',1,0)",
                  "name": "combined_calc"
                }
      

      需要注意的一点是,变换表达式不能引用其他变换表达式,因此需要全部从原始输入字段中进行计算。

      【讨论】:

      • 感谢您的意见,但是这里我们如何处理 Over 子句?另外正如您提到的,我们转换表达式不能引用另一个转换表达式,所以如果我们使用一个转换后的列来创建另一列,我们需要使用整个条件来创建新列,而不是只使用列名?
      • 我的错。我误解了 max (col_2, col_5) 因此使用了最大函数。你的意思是MAX(col5) over ( PARTITION BY col_2),这带来了不同的挑战,尤其是跨批次。您可能想试验一下这个扩展:druid.apache.org/docs/0.22.0/development/extensions-contrib/…
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多