【问题标题】:How to get detailed Information about Spark Stages&Tasks如何获取有关 Spark Stages&Tasks 的详细信息
【发布时间】:2019-01-21 21:05:06
【问题描述】:

我已经建立了一个 Apache Spark 集群,其中包含一个 master 和一个 Worker,并且我使用 Python 和 Spyder 作为 IDE。到目前为止一切正常,但我需要有关集群中任务分布的详细信息。我知道有 Spark Web UI,但我想直接在我的 Spyder 控制台中获取信息。所以我的意思是我的代码/脚本的哪一部分是由哪个 Worker/Master 完成的。我认为使用 python 包“socket”和 socket.gethostname() 必须可以获得更多信息。我真的很期待得到帮助。 这是我的代码:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from datetime import datetime
from pyspark.sql.functions import udf
from datetime import datetime
import pyspark.sql.functions as F

#spark = SparkSession \
#    .builder \
#    .appName('weather_data') \
#    .getOrCreate()


spark = SparkSession \
   .builder \
   .appName("weather_data_u") \
   .master('master_ip@...')\
   .getOrCreate()

data.show()
data.printSchema()

data_selected = data\
        .select(data['Date'],
                data['TemperatureHighC'],
                data['TemperatureAvgC'],
                data['TemperatureLowC'],
                data['DewpointHighC'],
                data['DewpointAvgC'],
                data['DewpointLowC'],
                data['HumidityAvg'],
                data['WindSpeedMaxKMH'],
                data['WindSpeedAvgKMH'],
                data['GustSpeedMaxKMH'],
                data['PrecipitationSumCM'])

data_selected.printSchema()
data_selected.show()


f = udf(lambda row: datetime.strptime(row, '%Y-%m-%d'), TimestampType())

data_selected = data_selected\
        .withColumn('date', f(data['Date'].cast(StringType())))\
        .withColumn('t_max', data['TemperatureHighC'].cast(DoubleType()))\
        .withColumn('t_mean', data['TemperatureAvgC'].cast(DoubleType()))\
        .withColumn('t_min', data['TemperatureLowC'].cast(DoubleType()))\
        .withColumn('dew_max', data['DewpointHighC'].cast(DoubleType()))\
        .withColumn('dew_mean', data['DewpointAvgC'].cast(DoubleType()))\
        .withColumn('dew_min', data['DewpointLowC'].cast(DoubleType()))\
        .cache()

 data_selected.show()

t_mean_calculated = data_selected\
.groupBy(F.date_format(data_selected.date, 'M'))\
.agg(F.mean(data_selected.t_max))\
.orderBy('date_format(date, M)')

t_mean_calculated = t_mean_calculated\
.withColumn('month', t_mean_calculated['date_format(date, M)'].cast(IntegerType()))\
.withColumnRenamed('avg(t_max)', 't_max_month')\
.orderBy('month')\
.drop(t_mean_calculated['date_format(date, M)'])\
.select('month', 't_max_month')

t_mean_calculated = t_mean_calculated.collect()

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    作为reported by @Jacek Laskowski本人,你可以使用Spark-Core local propertiesweb中修改job-name -ui

    • callSite.short
    • callSite.long

    例如,我的Spark-应用程序将多个MySQL 表同步到S3,我设置

    spark.sparkContext.setLocalProperty("callSite.short", currentTableName)
    

    所以在 web-ui 中反映当前表名

    【讨论】:

    • 感谢您的帮助!只是一个额外的问题,它是否也适用于 SparkSession 而不是 SparkContext?
    • @Maik;在上面的code-snippet中,spark.sparkContext..中的spark只不过是对SparkSession的引用(Scalaimplicits
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-12-23
    • 1970-01-01
    • 1970-01-01
    • 2020-07-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多