【问题标题】:How to union multiple dynamic inputs in Palantir Foundry?如何在 Palantir Foundry 中合并多个动态输入?
【发布时间】:2021-06-23 22:31:59
【问题描述】:

我想在 Palantir Foundry 中合并多个数据集,数据集的名称是动态的,因此我无法静态地在 transform_df() 中给出数据集名称。有没有办法可以动态地将多个输入输入transform_df 并合并所有这些数据帧?

我尝试遍历数据集,例如:

li = ['dataset1_path', 'dataset2_path']

union_df = None
for p in li:
  @transforms_df(
    my_input = Input(p), 
    Output(p+"_output")
  )
  def my_compute_function(my_input):
    return my_input

  if union_df is None:
    union_df = my_compute_function
  else:
    union_df = union_df.union(my_compute_function)

但是,这不会生成联合输出。

【问题讨论】:

标签: pyspark dynamic union palantir-foundry foundry-code-repositories


【解决方案1】:

这应该可以为您做一些更改,这是一个带有 json 文件的动态数据集的示例,您的情况可能会略有不同。这是一种通用的方法,您可以处理动态 json 输入数据集,该数据集应该适用于您可以指定的任何类型的动态输入文件类型或铸造数据集的内部。这个通用示例正在处理上传到平台中数据集节点的一组 json 文件。这应该是完全动态的。在这之后做一个联合应该是一件简单的事情。

这里还有一些额外的日志记录。

希望对你有帮助

from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging


def transform_generator():
    transforms = []
    transf_dict = {## enter your dynamic mappings here ##}

    for value in transf_dict:
        @transform(
            out=Output(' path to your output here '.format(val=value)),
            inpt=Input(" path to input here ".format(val=value)),
        )
        def update_set(ctx, inpt, out):
            spark = ctx.spark_session
            sc = spark.sparkContext

            filesystem = list(inpt.filesystem().ls())
            file_dates = []
            for files in filesystem:
                with inpt.filesystem().open(files.path) as fi:
                    data = json.load(fi)
                file_dates.append(data)

            logging.info('info logs:')
            logging.info(file_dates)
            json_object = json.dumps(file_dates)
            df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
            df_2 = df_2.withColumn('upload_date', F.current_date())

            df_2.drop_duplicates()
            out.write_dataframe(df_2)
        transforms.append(update_logs)
    return transforms


TRANSFORMS = transform_generator()

【讨论】:

    【解决方案2】:

    所以这个问题分为两个问题。

    如何使用编程输入路径处理转换

    要处理带有程序输入的转换,重要的是要记住两件事:

    1st - 转换将在 CI 时确定您的输入和输出。这意味着您可以拥有生成转换的 Python 代码,但不能从数据集中读取路径,它们需要硬编码到生成转换的 Python 代码中。

    2nd - 您的转换将在 CI 执行期间创建一次。这意味着您不能在数据集构建时使用增量或特殊逻辑来生成不同的路径。

    有了这两个前提,例如在您的示例或 @jeremy-david-gamet 的(回复为 ty,给了您 +1)中,您可以拥有在 CI 时生成路径的 python 代码。

    dataset_paths = ['dataset1_path', 'dataset2_path']
    
    for path in dataset_paths:
      @transforms_df(
        my_input = Input(path), 
        Output(f"{path}_output")
      )
      def my_compute_function(my_input):
        return my_input
    
    

    但是要将它们联合起来,您需要第二次转换来执行联合,您需要传递多个输入,因此您可以为此使用 *args**kwargs

    dataset_paths = ['dataset1_path', 'dataset2_path']
    
    all_args = [Input(path) for path in dataset_paths]
    all_args.append(Output("path/to/unioned_dataset"))
    @transforms_df(*all_args)
    def my_compute_function(*args):
        input_dfs = []
        for arg in args:
           # there are other arguments like ctx in the args list, so we need  to check for type. You can also use kwargs for more determinism.
           if isinstance(arg, pyspark.sql.DataFrame):
                input_dfs.append(arg)
        
        # now that you have your dfs in a list you can union them
        # Note I didn't test this code, but it should be something like this
        ...
    
    

    如何合并具有不同架构的数据集。

    对于这一部分,有很多关于如何在 spark 中合并不同数据帧的问答。这是从https://stackoverflow.com/a/55461824/26004复制的简短代码示例

    from pyspark.sql import SparkSession, HiveContext
    from pyspark.sql.functions import lit
    from pyspark.sql import Row
    
    def customUnion(df1, df2):
        cols1 = df1.columns
        cols2 = df2.columns
        total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
        def expr(mycols, allcols):
            def processCols(colname):
                if colname in mycols:
                    return colname
                else:
                    return lit(None).alias(colname)
            cols = map(processCols, allcols)
            return list(cols)
        appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
        return appended
    

    【讨论】:

      【解决方案3】:

      由于输入和输出是在 CI 时确定的,因此我们无法形成真正的动态输入。我们将不得不以某种方式指向代码中的特定数据集。假设数据集的路径共享相同的根,以下似乎需要最少的维护:

      from transforms.api import transform_df, Input, Output
      from functools import reduce
      
      
      datasets = [
          'dataset1',
          'dataset2',
          'dataset3',
      ]
      inputs = {f'inp{i}': Input(f'input/folder/path/{x}') for i, x in enumerate(datasets)}
      kwargs = {
          **{'output': Output('output/folder/path/unioned_dataset')},
          **inputs
      }
      
      
      @transform_df(**kwargs)
      def my_compute_function(**inputs):
          unioned_df = reduce(lambda df1, df2: df1.unionByName(df2), inputs.values())
          return unioned_df
      

      关于不同模式的联合,since Spark 3.1 one can use this

      df1.unionByName(df2, allowMissingColumns=True)
      

      【讨论】:

        猜你喜欢
        • 2022-07-06
        • 1970-01-01
        • 2021-10-16
        • 1970-01-01
        • 1970-01-01
        • 2022-08-03
        • 2021-02-11
        • 2022-01-10
        • 2022-06-27
        相关资源
        最近更新 更多