【发布时间】:2021-05-12 07:32:37
【问题描述】:
我们使用 Apache Spark 处理数据已有 4 年了。最近 Spark Application 作业经常因为“Stream isrupted”而失败,尤其是在运行作业需要繁重的工作量和 shuffle 大量数据时。
我们已尝试配置在随机播放数据时不使用压缩或为压缩编解码器指定 snappy,但它不起作用。
有没有人有这方面的经验?您能帮我找出可能导致此错误的原因吗?
请问这个错误是否与集群中的故障磁盘有关?
环境:
火花:2.0.2、2.4
资源经理:纱线
以下是执行者的日志。
INFO memory.MemoryStore: Block broadcast_46_piece0 stored as bytes in memory (estimated size 42.1 KB, free 6.5 GB)
INFO broadcast.TorrentBroadcast: Reading broadcast variable 46 took 5 ms
INFO memory.MemoryStore: Block broadcast_46 stored as values in memory (estimated size 99.9 KB, free 6.5 GB)
INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 12, fetching them
INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.60.12.7:9478)
INFO spark.MapOutputTrackerWorker: Got the output locations
INFO storage.ShuffleBlockFetcherIterator: Getting 80 non-empty blocks out of 80 blocks
INFO storage.ShuffleBlockFetcherIterator: Started 69 remote fetches in 4 ms
INFO codegen.CodeGenerator: Code generated in 7.057086 ms
INFO codegen.CodeGenerator: Code generated in 4.36239 ms
INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 13, fetching them
INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.60.12.7:9478)
INFO spark.MapOutputTrackerWorker: Got the output locations
INFO storage.ShuffleBlockFetcherIterator: Getting 30 non-empty blocks out of 80 blocks
INFO storage.ShuffleBlockFetcherIterator: Started 16 remote fetches in 2 ms
INFO codegen.CodeGenerator: Code generated in 10.182217 ms
INFO codegen.CodeGenerator: Code generated in 5.274815 ms
INFO executor.CoarseGrainedExecutorBackend: Got assigned task 10169
INFO executor.Executor: Running task 5.1 in stage 23.0 (TID 10169)
INFO storage.ShuffleBlockFetcherIterator: Getting 80 non-empty blocks out of 80 blocks
INFO storage.ShuffleBlockFetcherIterator: Started 69 remote fetches in 9 ms
INFO storage.ShuffleBlockFetcherIterator: Getting 30 non-empty blocks out of 80 blocks
INFO storage.ShuffleBlockFetcherIterator: Started 16 remote fetches in 1 ms
ERROR executor.Executor: Exception in task 43.0 in stage 23.0 (TID 10097)
java.io.IOException: Stream is corrupted
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:163)
at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.readSize(UnsafeRowSerializer.scala:113)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.<init>(UnsafeRowSerializer.scala:120)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:66)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:730)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:605)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:162)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:100)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ERROR executor.Executor: Exception in task 5.1 in stage 23.0 (TID 10169)
java.io.IOException: Stream is corrupted
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:163)
at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.readSize(UnsafeRowSerializer.scala:113)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.<init>(UnsafeRowSerializer.scala:120)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:66)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:730)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:605)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:162)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:100)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
INFO executor.CoarseGrainedExecutorBackend: Got assigned task 10221
INFO executor.Executor: Running task 37.2 in stage 23.0 (TID 10221)
INFO storage.ShuffleBlockFetcherIterator: Getting 80 non-empty blocks out of 80 blocks
INFO storage.ShuffleBlockFetcherIterator: Started 69 remote fetches in 4 ms
INFO storage.ShuffleBlockFetcherIterator: Getting 30 non-empty blocks out of 80 blocks
INFO storage.ShuffleBlockFetcherIterator: Started 16 remote fetches in 1 ms
INFO executor.Executor: Executor is trying to kill task 37.2 in stage 23.0 (TID 10221)
INFO executor.Executor: Executor killed task 37.2 in stage 23.0 (TID 10221)
INFO executor.CoarseGrainedExecutorBackend: Got assigned task 10356
INFO executor.Executor: Running task 39.0 in stage 37.0 (TID 10356)
INFO spark.MapOutputTrackerWorker: Updating epoch to 19 and clearing cache
INFO broadcast.TorrentBroadcast: Started reading broadcast variable 54
INFO client.TransportClientFactory: Successfully created connection to Cidatanode17/10.24.18.17:8375 after 0 ms (0 ms spent in bootstraps)
INFO memory.MemoryStore: Block broadcast_54_piece0 stored as bytes in memory (estimated size 30.6 KB, free 6.5 GB)
INFO broadcast.TorrentBroadcast: Reading broadcast variable 54 took 6 ms
INFO memory.MemoryStore: Block broadcast_54 stored as values in memory (estimated size 81.6 KB, free 6.5 GB)
INFO executor.Executor: Finished task 39.0 in stage 37.0 (TID 10356). 11065 bytes result sent to driver
INFO executor.CoarseGrainedExecutorBackend: Got assigned task 13563
INFO executor.CoarseGrainedExecutorBackend: Got assigned task 13595
INFO executor.Executor: Running task 123.0 in stage 38.0 (TID 13563)
INFO executor.Executor: Running task 578.0 in stage 38.0 (TID 13595)
INFO broadcast.TorrentBroadcast: Started reading broadcast variable 56
INFO client.TransportClientFactory: Successfully created connection to Cidatanode15/10.24.18.15:8347 after 0 ms (0 ms spent in bootstraps)
谢谢。
【问题讨论】:
标签: java apache-spark