【发布时间】:2017-07-14 05:11:53
【问题描述】:
我在 Dataproc 上使用 Spark 集群,但我的工作在处理结束时失败。
我的数据源是 Google Cloud Storage 上 csv 格式的文本日志文件(总容量为 3.5TB,5000 个文件)。
处理逻辑如下:
- 将文件读取到 DataFrame(模式 ["timestamp", "message"]);
- 将所有消息分组到 1 秒的窗口中;
- 将流水线 [Tokenizer -> HashingTF] 应用于每个分组的消息以提取单词及其频率以构建特征向量;
- 在 GCS 上保存带有时间线的特征向量。
我遇到的问题是,在一小部分数据(比如 10 个文件)上处理效果很好,但是当我在所有文件上运行它时,它最终会失败并出现错误,例如“容器被 YARN 杀死超出内存限制。使用了 25.0 GB 的 24 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。"
我的集群有 25 个工作人员和 n1-highmem-8 机器。所以我用谷歌搜索了这个错误,并将“spark.yarn.executor.memoryOverhead”参数增加到 6500MB。
现在我的 spark 作业仍然失败,但出现错误“作业因阶段故障而中止:4293 个任务的序列化结果的总大小 (1920.0 MB) 大于 spark.driver.maxResultSize (1920.0 MB)”
我是 spark 新手,我认为我做错了什么,或者在配置级别,或者在我的代码中。如果你能帮我把这些东西清理干净,那就太好了!
这是我的 spark 任务代码:
import logging
import string
from datetime import datetime
import pyspark
import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as F
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
NOW = datetime.now().strftime("%Y%m%d%H%M%S")
START_DATE = '2016-01-01'
END_DATE = '2016-03-01'
sc = pyspark.SparkContext()
spark = SparkSession\
.builder\
.appName("LogsVectorizer")\
.getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', 10000)
logger.info("Start log processing at {}...".format(NOW))
# Filenames to read/write locations
logs_fn = 'gs://databucket/csv/*'
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW)
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW)
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW)
# CSV data schema to build DataFrame
schema = StructType([
StructField("timestamp", StringType()),
StructField("message", StringType())])
# Helpers to clean strings in log fields
def cleaning_string(s):
try:
# Remove ids (like: app[2352] -> app)
s = re.sub('\[.*\]', 'IDTAG', s)
if s == '':
s = 'EMPTY'
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def normalize_string(s):
try:
# Remove punctuation
s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s)
# Remove digits
s = re.sub('\d*', '', s)
# Remove extra spaces
s = ' '.join(s.split())
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def line_splitter(line):
line = line.split(',')
timestamp = line[0]
full_message = ' '.join(line[1:])
full_message = normalize_string(cleaning_string(full_message))
return [timestamp, full_message]
# Read line from csv, split to date|message
# Read CSV to DataFrame and clean its fields
logger.info("Read CSV to DF...")
logs_csv = sc.textFile(logs_fn)
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema)
# Keep only lines for our date interval
logger.info("Filter by dates...")
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE))
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp"))
# Helpers to join messages into window and convert sparse to dense
join_ = F.udf(lambda x: "| ".join(x), StringType())
asDense = F.udf(lambda v: v.toArray().tolist())
# Agg by time window
logger.info("Group log messages by time window...")
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\
.agg(join_(F.collect_list("message")).alias("messages"))
# Turn message to hashTF
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens")
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000)
pipeline_tf = Pipeline(stages=[tokenizer, hashingTF])
logger.info("Fit-Transform ML Pipeline...")
model_tf = pipeline_tf.fit(logs_csv)
logs_csv = model_tf.transform(logs_csv)
logger.info("Spase vectors to Dense list...")
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\
.withColumn("tokens_counts", asDense(logs_csv.tokens_counts))
# Save to disk
# Save Pipeline and Model
logger.info("Save models...")
pipeline_tf.save(pipeline_fn)
model_tf.save(model_fn)
# Save to GCS
logger.info("Save results to GCS...")
logs_csv.write.parquet(vectors_fn)
【问题讨论】:
标签: apache-spark pyspark google-cloud-dataproc