【问题标题】:How to log/print message in pyspark pandas_udf?如何在 pyspark pandas_udf 中记录/打印消息?
【发布时间】:2019-12-02 04:27:00
【问题描述】:

我已经测试了 loggerprint 在集群模式或客户端模式下都无法在 pandas_udf 中打印消息。

测试代码:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())


df = spark.createDataFrame(pd.DataFrame({
    'y': np.random.randint(1, 10, (20,)),
    'ds': np.random.randint(1000, 9999, (20,)),
    'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
    'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
    })
)


@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])


df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

另请注意:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

你不能在pandas_udf 中使用这个,因为这个日志超出了触发上下文对象,你不能在 udf 中引用 spark session/context。

我知道的唯一方法是使用Excetion 作为我在下面写的答案。 但这很棘手并且有缺点。 我想知道是否有任何方法可以在 pandas_udf 中打印消息。

【问题讨论】:

    标签: pandas apache-spark pyspark user-defined-functions


    【解决方案1】:

    目前,我在 spark 2.4 中尝试了各种方式。

    没有日志,很难调试有问题的 pandas_udf。我知道可以在 pandas_udf 中打印错误消息的唯一可行方法是 raise Exception 。所以用这种方式调试确实很费时间,但我知道没有更好的方法了。

    @pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
    def train_predict(pdf):
        print('#'*100)
        logger.info('$'*100)
        logger.error('&'*100)
        raise Exception('@'*100)  # The only way I know can print message but would break execution 
        return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
    

    缺点是你不能在打印消息后保持火花运行。

    【讨论】:

      【解决方案2】:

      您可以做的一件事是将日志消息放入 DataFrame 本身。 例如

      @pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
      def train_predict(pdf):
          return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])
      
      

      之后,您可以将带有相关信息的日志列选择到另一个DataFrame中并输出到文件。将其从原始 DataFrame 中删除。

      它并不完美,但它可能会有所帮助。

      【讨论】:

      • 我不知道为什么人们不赞成这个,但它帮助我调试了我的代码人。谢谢!!!
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-21
      • 2020-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-07-31
      相关资源
      最近更新 更多