Worker seems to hang during training
PowerToThePeople111 opened this issue · 2 comments
Hi all,
I am using spark 2.4.0, the current version of SparkFlow with the current TensorFlow python package. I have successfully run the mnist example, but seem to have problems in training another dataset to a SparkFlow model.
Sometimes, tasks seem to hang endlessly (for hours no worker finishes) while others finish fast. The last lines of the hanging worker logs look like this:
2019-07-19 05:34:26.758486: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
W0719 05:34:26.758819 140213227337536 deprecation_wrapper.py:119] From /home/hadoop/anaconda3/envs/pySpark/lib/python3.6/site-packages/sparkflow/HogwildSparkModel.py:55: The name tf.train.import_meta_graph is deprecated. Please use tf.compat.v1.train.import_meta_graph instead.
W0719 05:34:26.796034 140213227337536 deprecation_wrapper.py:119] From /home/hadoop/anaconda3/envs/pySpark/lib/python3.6/site-packages/sparkflow/HogwildSparkModel.py:56: The name tf.get_collection is deprecated. Please use tf.compat.v1.get_collection instead.
W0719 05:34:26.796201 140213227337536 deprecation_wrapper.py:119] From /home/hadoop/anaconda3/envs/pySpark/lib/python3.6/site-packages/sparkflow/HogwildSparkModel.py:56: The name tf.GraphKeys is deprecated. Please use tf.compat.v1.GraphKeys instead.
W0719 05:34:26.796268 140213227337536 deprecation_wrapper.py:119] From /home/hadoop/anaconda3/envs/pySpark/lib/python3.6/site-packages/sparkflow/HogwildSparkModel.py:57: The name tf.global_variables_initializer is deprecated. Please use tf.compat.v1.global_variables_initializer instead.
2019-07-19 05:34:26.807986: W tensorflow/compiler/jit/mark_for_compilation_pass.cc:1412] (One-time warning): Not using XLA:CPU for cluster because envvar TF_XLA_FLAGS=--tf_xla_cpu_global_jit was not set. If you want XLA:CPU, either set that envvar, or use experimental_jit_scope to enable XLA:CPU. To confirm that XLA is active, pass --vmodule=xla_compilation_cache=1 (as a proper command-line flag, not via TF_XLA_FLAGS) or set the envvar XLA_FLAGS=--xla_hlo_profile.
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24726 thread 1 bound to OS proc set 1
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24733 thread 2 bound to OS proc set 2
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24734 thread 3 bound to OS proc set 3
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24735 thread 4 bound to OS proc set 4
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24736 thread 5 bound to OS proc set 5
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24737 thread 6 bound to OS proc set 6
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24738 thread 7 bound to OS proc set 7
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24739 thread 8 bound to OS proc set 0
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24725 thread 9 bound to OS proc set 1
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24741 thread 11 bound to OS proc set 3
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24740 thread 10 bound to OS proc set 2
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24742 thread 12 bound to OS proc set 4
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24743 thread 13 bound to OS proc set 5
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24744 thread 14 bound to OS proc set 6
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24745 thread 15 bound to OS proc set 7
OMP: Info #250: KMP_AFFINITY: pid 24600 tid 24746 thread 16 bound to OS proc set 0
I already start the context with 30 executors with 15GB of executor memory and 10GB of executor overhead to process a dataset of approx. 100MB. The model uses the following parameters:
spark_model = SparkAsyncDL(
inputCol='features_scaled',
tensorflowGraph=g,
tfInput='x:0',
tfLabel='y:0',
tfOutput='out:0',
tfLearningRate=.001,
iters=15,
predictionCol='predicted',
labelCol='labels',
verbose=1,
partitions=30,
partitionShuffles=2,
acquireLock=False,
miniBatchSize=50,
miniStochasticIters=-1
)
I experimented with different acquire locks, partition shuffles and mini batches settings. Not all produced errors, but partition shuffles seemed to have a good impact on model performance for mnist so I wanted to keep it. And when I use mini batches, I can see the loss getting reduced for later iterations in contrast to not using them where loss fluctuates way more.
Do you have ideas what the reasons could be? Would you recommend me to downgrade spark to 2.3 and tf to 1.7 or are there other possibilities that I miss?
Best,
Update:
Having a closer look at the tasks in the Spark UI Overview, I can see that the number of records being processed varies a lot: from 100 to 260k. It is exactly the tasks that have lots of examples which are the ones that seem to take endlessly for training. Is there a possibility to get them more evenly distributed? I saw you do coalesce. Maybe I should repartition the training data in spark myself before training with the same number of partitions I used for the SparkAsyncDL model?
Ok, repartitioning as mentioned above fixes the problem of uneven distribution of examples to the workers. Just in case others have the issue too.