Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
916 views
in Technique[技术] by (71.8m points)

apache spark - Why Pyspark jobs are dying out in the middle of process without any particular error

Experts, I am noticing one peculiar thing with one of the Pyspark jobs in production(running in YARN cluster mode). After executing for around an hour + (around 65-75 mins), it just dies out without throwing any particular error message. We have analyzed the YARN logs for around 2 weeks now and there is no particular error in them, it just dies in the middle while doing ETL operations(reading/writing hive table, doing simple maps, trim, lambda operations etc), not any particular piece of code to point out. Sometimes rerunning fixes it, sometimes it takes more than one rerun. The code is optimized, the spark-submit --conf has all the correctly optimized options. As we mentioned earlier, it is running absolutely perfect for around 30 other applications with very good performance stats. These are all the options we have -

spark-submit --conf spark.yarn.maxAppAttempts=1 --conf spark.sql.broadcastTimeout=36000 --conf spark.dynamicAllocation.executorIdleTimeout=1800 --conf spark.dynamicAllocation.minExecutors=8 --conf spark.dynamicAllocation.initialExecutors=8 --conf spark.dynamicAllocation.maxExecutors=32 --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.kryoserializer.buffer.max=512m --driver-memory 2G --executor-memory 8G --executor-cores 2 --deploy-mode cluster --master yarn

We want to check if it is some drive configuration i need to change to address this issue? Or there is some automatic timeout in Spark Cluster mode which can be increased? we are using Spark 1.6 with Python 2.7

The error looks like (there are several messages where it says -

ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM

But it fails when it encounters driver error (happens in the end)-

ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down

Here is the log-

19/10/24 16:17:03 INFO compress.CodecPool: Got brand-new compressor [.gz]
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000323_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000323
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000323_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 323.0 in stage 152.0 (TID 27419). 2163 bytes result sent to driver
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000135_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000135
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000135_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 135.0 in stage 152.0 (TID 27387). 2163 bytes result sent to driver
19/10/24 16:18:04 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
19/10/24 16:18:04 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:18:04 INFO util.ShutdownHookManager: Shutdown hook called

19/10/24 16:21:12 INFO executor.Executor: Finished task 41.0 in stage 163.0 (TID 29954). 2210 bytes result sent to driver
19/10/24 16:21:12 INFO executor.Executor: Finished task 170.0 in stage 163.0 (TID 29986). 2210 bytes result sent to driver
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30047
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30079
19/10/24 16:21:13 INFO executor.Executor: Running task 10.0 in stage 165.0 (TID 30047)
19/10/24 16:21:13 INFO executor.Executor: Running task 42.0 in stage 165.0 (TID 30079)
19/10/24 16:21:13 INFO spark.MapOutputTrackerWorker: Updating epoch to 56 and clearing cache
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 210
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210_piece0 stored as bytes in memory (estimated size 29.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Reading broadcast variable 210 took 3 ms
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210 stored as values in memory (estimated size 83.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO executor.Executor: Finished task 10.0 in stage 165.0 (TID 30047). 931 bytes result sent to driver
19/10/24 16:21:13 INFO executor.Executor: Finished task 42.0 in stage 165.0 (TID 30079). 931 bytes result sent to driver
19/10/24 16:21:15 WARN executor.CoarseGrainedExecutorBackend: An unknown (rxxxxxx1.hadoop.com:XXXXX) driver disconnected.
19/10/24 16:21:15 ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down.
19/10/24 16:21:15 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:21:15 INFO util.ShutdownHookManager: Shutdown hook called

Thanks, Sid

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Without any apparent stack trace it's a good idea to think of a problem from two angles: it's either a code issue or a data issue.

Either case you should start by giving the driver abundant memory so as to rule that out as a probable cause. Increase driver.memory and driver.memoryOverhead until you've diagnosed the problem.

Common code issues:

  1. Too many transformations causes the lineage to get too big. If there's any kind of iterative operations happening on the dataframe then it's a good idea to truncate the DAG by doing a checkpoint in between. In Spark 2.x you can call dataFrame.checkpoint() directly and not have to access the RDD. Also @Sagar's answer describes how to do this for Spark 1.6

  2. Trying to broadcast dataframes that are too big. This will usually result in an OOM exception but can sometimes just cause the job to seem stuck. Resolution is to not call broadcast if you are explicitly doing so. Otherwise check if you've set spark.sql.autoBroadcastJoinThreshold to some custom value and try lowering that value or disable broadcast altogether (setting -1).

  3. Not enough partitions can cause every task to run hot. Easiest way to diagnose this is to check the stages view on the Spark UI and see the size of data being read and written per task. This should ideally be in 100MB-500MB range. Otherwise increase spark.sql.shuffle.partitions and spark.default.parallelism to higher values than the default 200.

Common data issues:

  1. Data skew. Since your job is failing for a specific workload it could have data skew in the specific job. Diagnose this by checking that the median time for task completion is comparable to the 75 percentile which is comparable to the 90 percentile on the stage view in the Spark UI. There are many ways to redress data skew but the one I find best is to write a custom join function that salts the join keys prior to join. This splits the skewed partition into several smaller partitions at the expense of a constant size data explosion.

  2. Input file format or number of files. If your input file isn't partitioned and you're only doing narrow transforms (those that do not cause a data shuffle) then all of your data will run through a single executor and not really benefit from the distributed cluster setup. Diagnose this from the Spark UI by checking how many tasks are getting created in each stage of the pipeline. It should be of the order of your spark.default.parallelism value. If not then do a .repartition(<some value>) immediately after the data read step prior to any transforms. If the file format is CSV (not ideal) then verify that you have multiLine disabled unless required in your specific case, otherwise this forces a single executor to read the entire csv file.

Happy debugging!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...