(PySpark) on YARN - Behind the Scenes (Teil 2)
In Teil eins wurde gezeigt, welche YARN Container und welche Linux Prozesse beim Aufruf der Spark Shell gestartet werden. Im zweiten Teil dieses Beitrags führt Olaf Hein aus, wie die Python Shell mit PySpark gestartet wird.
Python + Spark
Im letzten Beispiel wurde die Spark Shell für Scala verwendet. Bei der Verwendung von Python werden zusätzliche Prozesse gestartet. Für einen ersten Test wird die Python Shell mit pyspark2 gestartet:
[cloudera]$ pyspark2 --master yarn Python 2.6.6 (r266:84292, Jul 23 2015, 15:22:56) [GCC 4.4.7 20120313 (Red Hat 4.4.7-11)] on linux2 Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera3 /_/ Using Python version 2.6.6 (r266:84292, Jul 23 2015 15:22:56) SparkSession available as 'spark'. >>>
Danach werden mit ps die Prozesse des Benutzers cloudera angezeigt:
[cloudera]$ ps -fU cloudera ... cloudera9630 32322 0 22:52 pts/100:00:00 python cloudera96869630 17 22:52 pts/100:00:43 /usr/java/default/bin/java -cp /opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/... ...
Für den Benutzer cloudera werden wieder zwei neue Linux Prozesse auf dem Edge Node gestartet. Zuerst wurde ein Python Prozess (PID 9630) gestartet. Dieser stellt die Python Shell bereit. Anschließend hat dieser Prozess eine Java VM gestartet (PID 9686). Der Prozessbaum für die PySpark Shell unterscheidet sich von der Scala Spark-Shell dadurch, dass zuerst ein Python Interpreter gestartet wird und keine Bash Shell:
9630 python -> 9686 /usr/java/default/bin/java ..
Die YARN Prozesse werden wieder über die Application ID ermittelt:
[cloudera]$ yarn application -list 19/07/20 00:01:43 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1563598094160_0003 PySparkShell SPARK cloudera root.users.cloudera RUNNING UNDEFINED 10% http://quickstart.cloudera:4040 [cloudera]$ yarn applicationattempt -list application_1563598094160_0003 19/07/20 01:32:11 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 Total number of application attempts :1 ApplicationAttempt-Id State AM-Container-Id Tracking-URL appattempt_1563598094160_0003_000001 RUNNING container_1563598094160_0003_01_000001 http://quickstart.cloudera:8088/proxy/application_1563598094160_0003/
Mit ps und grep werden die Linux Prozesse angezeigt und gefiltert:
[cloudera]$ ps -ef | grep 1563598094160_0003 yarn 22053 11125 0 Jul19 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/default_container_executor.sh yarn 22055 22053 0 Jul19 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg 'quickstart.cloudera:36568' --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/__spark_conf__/__spark_conf__.properties 1> /var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001/stdout 2> /var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001/stderr yarn22058 220553 Jul19 ? 00:00:19 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0003/container_1563598094160_0003_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg quickstart.cloudera:36568 --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/__spark_conf__/__spark_conf__.properties cloudera 25046 20046 0 00:01 pts/1 00:00:00 grep 1563598094160_0003
Auch in diesem Fall werden 3 Linux Prozesse im Cluster gestartet. Auffällig ist, dass es im Cluster noch keinen Python Prozess gibt.
Erst bei der Verarbeitung von Spark Tasks im Cluster werden von YARN Executor Prozesse gestartet. Im Fall von PySpark laufen dann auch Python Prozesse im Betriebssystem des Worker Nodes. Mit dem folgenden Spark Script lässt sich das leicht testen:
from time import sleep rdd1 = sc.parallelize([1]) sleep_seconds=120 rdd2 = rdd1.map(lambda i: sleep(sleep_seconds)) rdd2.collect()
Der Befehl yarn container -list zeigt jetzt einen zweiten Container:
[cloudera]$ yarn container -list appattempt_1563598094160_0003_000001 19/07/20 01:27:53 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 Total number of containers :2 Container-Id Start Time Finish Time State Host LOG-URL container_1563598094160_0003_01_000001 Fri Jul 19 23:52:02 -0700 2019 N/A RUNNING quickstart.cloudera:8041 http://quickstart.cloudera:8042/node/containerlogs/container_1563598094160_0003_01_000001/cloudera container_1563598094160_0003_01_000010 Sat Jul 20 01:27:47 -0700 2019 N/A RUNNING quickstart.cloudera:8041 http://quickstart.cloudera:8042/node/containerlogs/container_1563598094160_0003_01_000010/cloudera
Die Prozesse können wieder mit dem ps Kommando angezeigt werden. Die Python Prozesse können allerdings nicht über grep und die Application ID gefunden werden, da die ID nicht als Argument über die Kommandozeile übergeben wird. In einem kleinen Cluster können einfach alle Prozesse des Benutzers yarn angezeigt werden. In größeren Clustern ist es einfacher nach den PPIDs zu suchen. Es folgt die Prozessliste der beiden gestarteten YARN Container. Damit es übersichtlicher wird, wurde in diesem Fall die Ausgabe abgeschnitten und mit # wurden Kommentare eingefügt.
# Executor Processe yarn18700 11125 0 01:27 ?00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000010/default_container_executor.sh yarn18703 187000 01:27 ?00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/l yarn18706 18703 36 01:27 ?00:00:18 /usr/java/default/bin/java -server -Xmx1024m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1 yarn18859 187062 01:27 ?00:00:00 python -m pyspark.daemon yarn18888 188590 01:27 ?00:00:00 python -m pyspark.daemon # ApplicationMaster Prozesse yarn22053 111250 Jul19 ?00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_1563598094160_0003_01_000001/default_container_executor.sh yarn22055 220530 Jul19 ?00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/l yarn22058 220550 Jul19 ?00:00:32 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0003/container_15
Die Prozesse des ApplicationMasters existieren wie zuvor. Für den Executor wurden zusätzliche Prozesse gestartet. Da PySpark verwendet wird, werden in diesem Fall auch Python Prozesse gestartet. Das sind die Prozesse mit den PIDs 18859 und 18888.
Nachdem die Spark Tasks beendet wurden, d.h. nachdem collect() ausgeführt wurde, werden der Container für den Executor und damit auch alle Prozesse im Betriebssystem beendet.
Hier der vereinfachte Prozessbaum der im YARN Cluster gestarteten Prozesse:
11125/usr/java/default/bin/java .. NodeManager -> 22053 bash default_container_executor.sh -> 22055 /bin/bash .. java ExecutorLauncher .. -> 22058 /usr/java/default/bin/java ExecutorLauncher .. -> 18700 bash .. default_container_executor.sh -> 18703 /bin/bashjava .. CoarseGrainedExecutorBackend -> 18706 /usr/java/default/bin/java .. CoarseGrainedExecutorBackend -> 18859 python -m pyspark.daemon -> 18888 python -m pyspark.daemon
Fortsetzung
Im dritten Teil geht Olaf Hein auf die Verwendung unterschiedlicher Python Versionen ein.
Bei Updates im Blog, informieren wir per E-Mail.
Kommentare