【发布时间】:2018-05-17 10:23:16
【问题描述】:
我正在尝试使用 scala 在 Spark 1.6 中解析日志文件,这是示例数据
2017-02-04 04:48:11,123 DEBUG [org.quartz.core.QuartzSchedulerThread] - <batch acquisition of 0 triggers>
2017-02-04 04:48:20,892 INFO [org.jasig.inspektr.audit.support.Slf4jLoggingAuditTrailManager] - <Audit trail record BEGIN
=============================================================
WHO: audit:unknown
WHAT: TGT-7d937-yRqp6ObM7JOtkUZ7Ff4yEo95-casino1.example.org
ACTION: TICKET_GRANTING_TICKET_DESTROYED
APPLICATION: CASINO
WHEN: Sat Feb 04 04:48:20 AEDT 2017
CLIENT IP ADDRESS: 160.50.201.557
SERVER IP ADDRESS: login.cfu.asg
=============================================================
>
2017-02-04 04:48:32,165 INFO [org.jasig.cas.services.DefaultServicesManagerImpl] - <Reloading registered services.>
2017-02-04 04:48:32,167 INFO [org.jasig.casino.services.DefaultServicesManagerImpl] - <Loaded 2 services.>
2017-02-04 04:48:38,889 DEBUG [org.quartz.core.QuartzSchedulerThread] - <batch acquisition of 1 triggers>
2017-02-04 04:48:52,790 DEBUG [org.quartz.core.QuartzSchedulerThread] - <batch acquisition of 0 triggers>
2017-02-04 04:48:52,790 DEBUG [org.quartz.core.JobRunShell] - <Calling execute on job DEFAULT.serviceRegistryReloaderJobDetail>
2017-02-04 04:48:52,790 INFO [org.jasig.casino.services.DefaultServicesManagerImpl] - <Reloading registered services.>
2017-02-04 04:48:52,792 DEBUG [org.jasig.casino.services.DefaultServicesManagerImpl] - <Adding registered service ^(https?|imaps?)://.*>
2017-02-04 04:48:52,792 DEBUG [org.jasig.casino.services.DefaultServicesManagerImpl] - <Adding registered service
2017-02-04 04:48:52,792 INFO [org.jasig.casino.services.DefaultServicesManagerImpl] - <Loaded 2 services.>
2017-02-04 04:49:14,365 INFO [org.jasig.casino.services.DefaultServicesManagerImpl] - <Reloading registered services.>
2017-02-04 04:49:14,366 INFO [org.jasig.casino.services.DefaultServicesManagerImpl] - <Loaded 2 services.>
2017-02-04 04:49:19,699 DEBUG [org.quartz.core.QuartzSchedulerThread] - <batch acquisition of 0 triggers>
2017-02-04 04:49:43,465 DEBUG [org.quartz.core.QuartzSchedulerThread] - <batch acquisition of 0 triggers>
2017-02-04 04:50:00,978 INFO [org.jasig.casino.authentication.PolicyBasedAuthenticationManager] - <JaasAuthenticationHandler successfully authenticated >
2017-02-04 04:50:00,978 INFO [org.jasig.casino.authentication.PolicyBasedAuthenticationManager] - <Authenticated 3785973 with credentials.>
2017-02-04 04:50:00,978 INFO [org.jasig.inspektr.nhgij.support.Slf4jLogggbhAuditTrailManaver] - <Audit trail record BEGIN
=============================================================
WHO: z3705z73
WHAT: supplied credentials: [d37c5973]
ACTION: AUTHENTICATION_SUCCESS
APPLICATION: casinoINO
WHEN: Sat Feb 04 04:50:00 AEDT 2017
CLIENT IP ADDRESS: 101.181.28.555
SERVER IP ADDRESS: login.cfu.asg
=============================================================
>
数据还在继续,但模式之间可能还有其他日志数据与我的解析无关。我有大约 40GB 的文件,每个文件都包含一天的数据。
所有这些文件都是 gzip 压缩的。我尝试使用 sc.wholeTextFiles 来获取一对 RDD,但由于每个文件在 400mb 到 800mb 之间(未压缩)而遇到 Java 堆空间错误。
所以我开始使用 sc.textFile 并尝试一个读取一个文件。我可以创建一个 RDD[String],幸运的是 sc.textFile 在对此 RDD 运行任何操作时不会返回任何堆空间问题。
这是我尝试过的代码。
val casinop2 = sc.wholeTextFiles("/logdata/casino/catalina.out-20150228.gz")
val casop = casinop2.flatMap(x=>x.split("\n"))
.filter(x=> !(x.contains("Reloading registered services") || x.contains("Loaded 2 services.") || x.contains("DEBUG") || x.contains("ERROR") || x.contains("java.lang.RuntimeException") || x.contains("Caused by:") || x.contains("Granted ticket") || x.contains("java.lang.IllegalStateException") || x.startsWith("\t") || x.contains("org.jasig.cas.authentication.PolicyBasedAuthenticationManager") ))
val pattern = new Regex("""((\d{4})-(\d{2})-\d{2}\s\d{2}:\d{2}:\d{2}),\d{3}\s+(\w+)\s+\[(.*)\]\s+\-\s+\<.*\s\=*\s+([W][H][O]\:)\s+(.*)\s+([W][H][A][T]\:)\s+(.*)\s+([A][C][T][I][O][N]\:)\s+(.*)\s+([A][P][P][L][I][C][A][T][I][O][N]\:)\s+(.*)\s+([W][H][E][N]\:)\s+(.*)\s+([A-Z\s]{17}\:)\s+(.*)\s+([A-Z\s]{17}\:)\s+(.*)\s+\=*\s\s\>""")
pattern: scala.util.matching.Regex = ((\d{4})-(\d{2})-\d{2}\s\d{2}:\d{2}:\d{2}),\d{3}\s+(\w+)\s+\[(.*)\]\s+\-\s+\<.*\s\=*\s+([W][H][O]\:)\s+(.*)\s+([W][H][A][T]\:)\s+(.*)\s+([A][C][T][I][O][N]\:)\s+(.*)\s+([A][P][P][L][I][C][A][T][I][O][N]\:)\s+(.*)\s+([W][H][E][N]\:)\s+(.*)\s+([A-Z\s]{17}\:)\s+(.*)\s+([A-Z\s]{17}\:)\s+(.*)\s+\=*\s\s\>
case class MLog(datetime: String, message: String, process: String, who: String, what: String, action: String, application: String, when: String, clientipaddress: String, serveripaddress: String,year: String, month: String)
pattern.findAllMatchIn(casop.collect.toString).toList
现在最后一条语句引发了堆空间错误。我希望 rdd 进入字符串变量的原因是正则表达式需要多行输入,而不是单行。对于单行,我会使用地图、平面地图等。
我应该从日志文件得到的输出应该是
|2017-02-04 04:54:41| INFO|org.jasig.inspekt...| s4542732|supplied credenti...|AUTHENTICATION_SU...| CAS|Sat Feb 04 04:54:...| 175.163.28.77|login.vu.edu.au|2017| 02|
|2017-02-04 04:54:41| INFO|org.jasig.inspekt...| s4542732|TGT-78959-EX63Wf2...|TICKET_GRANTING_T...| CAS|Sat Feb 04 04:54:...| 175.163.28.77|login.vu.edu.au|2017| 02|
|2017-02-04 04:54:41| INFO|org.jasig.inspekt...| 4542732|ST-474481-jTxCJFB...|SERVICE_TICKET_CR...| CAS|Sat Feb 04 04:54:...| 175.163.28.77|login.vu.edu.au|2017| 02|
|2017-02-04 04:54:44| INFO|org.jasig.inspekt...|audit:unknown|ST-474481-jTxCJFB...|SERVICE_TICKET_VA...| CAS|Sat Feb 04 04:54:...| 203.13.194.68|login.vu.edu.au|2017| 02|
|2017-02-04 04:55:02| INFO|org.jasig.inspekt...| s3785573|supplied credenti...|AUTHENTICATION_SU...| CAS|Sat Feb 04 04:55:...| 101.181.28.125|login.vu.edu.au|2017| 02|
|2017-02-04 04:55:02| INFO|org.jasig.inspekt...| s3785573|TGT-78960-yWaWkcN...|TICKET_GRANTING_T...| CAS|Sat Feb 04 04:55:...| 101.181.28.125|login.vu.edu.au|2017| 02|
|2017-02-04 04:55:02| INFO|org.jasig.inspekt...| 3785573|ST-474482-rARxdUG...|SERVICE_TICKET_CR...| CAS|Sat Feb 04 04:55:...| 101.181.28.125|login.vu.edu.au|2017| 02|
|2017-02-04 04:55:02| INFO|org.jasig.inspekt...|audit:unknown|ST-474482-rARxdUG...|SERVICE_TICKET_VA...| CAS|Sat Feb 04 04:55:...| 203.13.194.68|login.vu.edu.au|2017| 02|
+-------------------+-------+--------------------+-------------+--------------------+--------------------+-----------+--------------------+---------------+---------------+----+-----+
我们如何读取多行输入并提供给正则表达式?
【问题讨论】:
-
您是否尝试过增加堆大小?例如:
--executor-memory 10g -
对我的回答满意吗?希望对你有帮助!!!
-
感谢改进的正则表达式,我尝试使用--executor-memory 10g,它仍然抛出错误“java.lang.OutOfMemoryError:GC 开销限制超出”
-
由于某种原因,垃圾收集器占用了过多的时间(进程的 98% 的 CPU 时间)并且每次恢复的内存很少(堆的 2%)。这实际上意味着您的程序停止任何进展,并且一直忙于运行垃圾收集。为了防止您的应用程序在没有完成任何操作的情况下占用 CPU 时间,JVM 会抛出此错误,以便您有机会诊断问题。这发生在一些代码中,其中大量临时对象是在内存已经非常受限的环境中创建的。
-
@Allan,将堆大小设置为 10G 并没有解决内存不足错误。我无法解决其他可能出错的问题。我做了一个工作以继续我的处理。我调用 sc.textfiles 并读取大型输入文件,过滤它们,保存到临时位置并在 sc.wholetextfiles 中读取它们。临时文件小于原始文件大小的一半,所以没有抛出outofMemory错误。
标签: regex scala apache-spark cloudera-cdh