(PySpark) on YARN - Behind the Scenes (Teil 2)

pyspark-2

In Teil eins wurde gezeigt, welche YARN Container und welche Linux Prozesse beim Aufruf der Spark Shell gestartet werden. Im zweiten Teil dieses Beitrags führt Olaf Hein aus, wie die Python Shell mit PySpark gestartet wird.

Python + Spark

Im letzten Beispiel wurde die Spark Shell für Scala verwendet. Bei der Verwendung von Python werden zusätzliche Prozesse gestartet. Für einen ersten Test wird die Python Shell mit pyspark2 gestartet:

[cloudera]$ pyspark2 --master yarn
Python 2.6.6 (r266:84292, Jul 23 2015, 15:22:56)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-11)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to

     ____              __
    / __/__  ___ _____/ /__
   _\ \/ _ \/ _ `/ __/  '_/
  /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera3
     /_/

Using Python version 2.6.6 (r266:84292, Jul 23 2015 15:22:56)
SparkSession available as 'spark'.
>>> 

Danach werden mit ps die Prozesse des Benutzers cloudera angezeigt:  

[cloudera]$ ps -fU cloudera
...
cloudera9630 32322 0 22:52 pts/100:00:00 python
cloudera96869630 17 22:52 pts/100:00:43 /usr/java/default/bin/java -cp /opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/...
... 

Für den Benutzer cloudera werden wieder zwei neue Linux Prozesse auf dem Edge Node gestartet. Zuerst wurde ein Python Prozess (PID 9630) gestartet. Dieser stellt die Python Shell bereit. Anschließend hat dieser Prozess eine Java VM gestartet (PID 9686). Der Prozessbaum für die PySpark Shell unterscheidet sich von der Scala Spark-Shell dadurch, dass zuerst ein Python Interpreter gestartet wird und keine Bash Shell:

9630 python
-> 9686 /usr/java/default/bin/java .. 

Die YARN Prozesse werden wieder über die Application ID ermittelt:

[cloudera]$ yarn application -list
19/07/20 00:01:43 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id   Application-Name   Application-Type   User   Queue   State   Final-State   Progress   Tracking-URL
application_1563598094160_0003   PySparkShell   SPARK   cloudera   root.users.cloudera   RUNNING   UNDEFINED   10%   http://quickstart.cloudera:4040

[cloudera]$ yarn applicationattempt -list application_1563598094160_0003
19/07/20 01:32:11 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
Total number of application attempts :1
ApplicationAttempt-Id   State   AM-Container-Id   Tracking-URL
appattempt_1563598094160_0003_000001   RUNNING   container_1563598094160_0003_01_000001   http://quickstart.cloudera:8088/proxy/application_1563598094160_0003/ 

Mit ps und grep werden die Linux Prozesse angezeigt und gefiltert:

[cloudera]$ ps -ef | grep 1563598094160_0003
yarn 22053 11125 0 Jul19 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/default_container_executor.sh
yarn 22055 22053 0 Jul19 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg 'quickstart.cloudera:36568' --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/__spark_conf__/__spark_conf__.properties 1> /var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001/stdout 2> /var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001/stderr

yarn22058 220553 Jul19 ? 00:00:19 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg quickstart.cloudera:36568 --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/__spark_conf__/__spark_conf__.properties

cloudera 25046 20046 0 00:01 pts/1 00:00:00 grep 1563598094160_0003 

Auch in diesem Fall werden 3 Linux Prozesse im Cluster gestartet. Auffällig ist, dass es im Cluster noch keinen Python Prozess gibt.

Erst bei der Verarbeitung von Spark Tasks im Cluster werden von YARN Executor Prozesse gestartet. Im Fall von PySpark laufen dann auch Python Prozesse im Betriebssystem des Worker Nodes. Mit dem folgenden Spark Script lässt sich das leicht testen:

from time import sleep
rdd1 = sc.parallelize([1])
sleep_seconds=120
rdd2 = rdd1.map(lambda i: sleep(sleep_seconds))
rdd2.collect() 

Der Befehl yarn container -list zeigt jetzt einen zweiten Container:

[cloudera]$ yarn container -list appattempt_1563598094160_0003_000001
19/07/20 01:27:53 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
Total number of containers :2
Container-Id   Start   Time   Finish Time   State   Host   LOG-URL
container_1563598094160_0003_01_000001   Fri Jul 19 23:52:02 -0700 2019   N/A RUNNING   quickstart.cloudera:8041   http://quickstart.cloudera:8042/node/containerlogs/container_1563598094160_0003_01_000001/cloudera
container_1563598094160_0003_01_000010   Sat Jul 20 01:27:47 -0700 2019   N/A RUNNING   quickstart.cloudera:8041   http://quickstart.cloudera:8042/node/containerlogs/container_1563598094160_0003_01_000010/cloudera 

Die Prozesse können wieder mit dem ps Kommando angezeigt werden. Die Python Prozesse können allerdings nicht über grep und die Application ID gefunden werden, da die ID nicht als Argument über die Kommandozeile übergeben wird. In einem kleinen Cluster können einfach alle Prozesse des Benutzers yarn angezeigt werden. In größeren Clustern ist es einfacher nach den PPIDs zu suchen. Es folgt die Prozessliste der beiden gestarteten YARN Container. Damit es übersichtlicher wird, wurde in diesem Fall die Ausgabe abgeschnitten und mit # wurden Kommentare eingefügt.  

# Executor Processe
yarn18700 11125 0 01:27 ?00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000010/default_container_executor.sh
yarn18703 187000 01:27 ?00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/l
yarn18706 18703 36 01:27 ?00:00:18 /usr/java/default/bin/java -server -Xmx1024m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1
yarn18859 187062 01:27 ?00:00:00 python -m pyspark.daemon
yarn18888 188590 01:27 ?00:00:00 python -m pyspark.daemon

# ApplicationMaster Prozesse
yarn22053 111250 Jul19 ?00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/default_container_executor.sh
yarn22055 220530 Jul19 ?00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/l
yarn22058 220550 Jul19 ?00:00:32 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_15 

Die Prozesse des ApplicationMasters existieren wie zuvor. Für den Executor wurden zusätzliche Prozesse gestartet. Da PySpark verwendet wird, werden in diesem Fall auch Python Prozesse gestartet. Das sind die Prozesse mit den PIDs 18859 und 18888.

Nachdem die Spark Tasks beendet wurden, d.h. nachdem collect() ausgeführt wurde, werden der Container für den Executor und damit auch alle Prozesse im Betriebssystem beendet.

Hier der vereinfachte Prozessbaum der im YARN Cluster gestarteten Prozesse:

11125/usr/java/default/bin/java .. NodeManager
-> 22053 bash default_container_executor.sh
   -> 22055 /bin/bash .. java ExecutorLauncher ..
      -> 22058 /usr/java/default/bin/java ExecutorLauncher ..
-> 18700 bash .. default_container_executor.sh
   -> 18703 /bin/bashjava .. CoarseGrainedExecutorBackend
      -> 18706 /usr/java/default/bin/java .. CoarseGrainedExecutorBackend
         -> 18859 python -m pyspark.daemon
            -> 18888 python -m pyspark.daemon 

Fortsetzung

Im dritten Teil geht Olaf Hein auf die Verwendung unterschiedlicher Python Versionen ein.

 

Kommentare

Derzeit gibt es keine Kommentare. Schreibe den ersten Kommentar!
Gäste
Donnerstag, 17. Oktober 2019

Sicherheitscode (Captcha)