(PySpark) on YARN - Behind the Scenes (Teil 1)
In den letzten Wochen wurde ich mehrfach von Kunden gefragt, was im Betriebssystem passiert, wenn Spark bzw. PySpark Jobs ausgeführt werden. Dabei waren vor allem die folgenden Fragen von Interesse:
- Welche Prozesse werden im Betriebssystem gestartet?
- Mit welchen Rechten werden diese Prozesse ausgeführt?
- Welcher Python Interpreter und welche Bibliotheken werden verwendet?
- Wie verhält sich YARN beim Einsatz von Kerberos?
Dieser Artikel gibt Antworten auf diese und noch einige andere Fragen.
Kurze Wiederholung: Spark on YARN Basics
Beim Ausführen eines Spark- oder PySpark Jobs mit YARN, wird von Spark zuerst ein Driver Prozess gestartet. In den folgenden Beispielen wird dazu die Spark-Shell auf einem der Edge Nodes gestartet (Siehe Abbildung 1). Der Driver kommuniziert mit dem RessourceManger auf dem Master Node, um eine YARN Applikation zu starten. Der RessourceManager wählt einen Worker Node aus und startet dort einen YARN Container mit dem ApplicationMaster Prozess für diesen Spark Job. Ein YARN Container ist, etwas vereinfacht, ein von YARN kontrollierter Java Prozess. Einem Container werden Ressourcen (CPU Cores und Hauptspeicher) zugewiesen und in ihm findet die eigentliche Verarbeitung der Daten im Cluster statt. Für den Start von Spark Executoren fordert der ApplicationMaster neue Ressourcen beim Ressource Manager an. Anschließend kommuniziert der ApplicationMaster mit den NodeManagern auf den zugewiesenen Worker Nodes, die anschließend weitere Container starten
Testumgebung
Die folgenden Beispiele wurden mit der Cloudera Distribution for Hadoop (CDH) in einer Quickstart VM in einem pseudo-distributed Cluster (ein Cluster mit nur einem Knoten) durchgeführt. Für die Tests hat das den Vorteil, dass alle Prozesse auf demselben Knoten ausgeführt werden. Eine Ausführung in einem Cluster mit mehreren Knoten unterscheidet sich dadurch, dass die YARN Container dann auf beliebigen Worker Knoten gestartet werden. Bei der getesteten Funktionalität handelt es sich um Standard Hadoop und Spark Funktionen. Ein Cluster, der mit der Hortonworks Data Platform (HDP), der MapR Data Platform oder den Open Source Paketen aufgebaut wurde, wird sich mit einer vergleichbaren Konfiguration identisch verhalten.
In der VM wurde Spark 2 zusätzlich zu Spark 1 installiert. Für den Start von Spark 2 stellt Cloudera die Kommandos spark2-shell, spark2-submit und pyspark2 zur Verfügung. In den Beispielen wird immer mit der Scala oder Python Shell gearbeitet. Für die Experimente hat das den Vorteil, dass automatisch eine YARN Applikation erzeugt wird. Die Applikation verursacht keine unnötige Last auf dem System und wird erst beendet, wenn die Shell vom Benutzer geschlossen wird.
Starten der Spark-Shell
Für das erste Beispiel wird eine Spark Shell im YARN Cluster gestartet.
[cloudera]$ spark2-shell --master yarn
In einem zweiten Terminal kann jetzt überprüft werden, welche YARN Container und welche dazugehörigen Prozesse im Betriebssystem gestartet wurden. Dazu wird das Kommandozeilen-Tool yarn verwendet. Um die Container aufzulisten, wird die "Application Attempt ID" benötigt. Diese kann mit Hilfe der Kommandos application und applicationattempts angezeigt werden.
[cloudera]$ yarn application -list 19/07/19 22:34:27 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1563598094160_0002 Spark shell SPARK cloudera root.users.cloudera RUNNING UNDEFINED 10% http://quickstart.cloudera:4040 [cloudera]$ yarn applicationattempt -list application_1563598094160_0002 19/07/19 22:34:36 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 Total number of application attempts :1 ApplicationAttempt-Id State AM-Container-Id Tracking-URL appattempt_1563598094160_0002_000001 RUNNING container_1563598094160_0002_01_000001 http://quickstart.cloudera:8088/proxy/application_1563598094160_0002/ [cloudera]$ yarn container -list appattempt_1563598094160_0002_000001 19/07/19 22:34:51 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 Total number of containers :1 Container-Id Start Time Finish Time State Host LOG-URL container_1563598094160_0002_01_000001 Fri Jul 19 22:11:00 -0700 2019 N/A RUNNING quickstart.cloudera:8041 http://quickstart.cloudera:8042/node/containerlogs/container_1563598094160_0002_01_000001/cloudera
Auffällig ist, dass es nur einen Container gibt. Das liegt daran, dass die Spark Shell nur den Application Master gestartet hat. Es gibt noch keine Container für die Spark Executor Prozesse. Diese werden erst bei Bedarf angelegt.
Als nächstes werden die zugehörigen Prozesse im Betriebssystem angezeigt. Dazu wird das Linux Tool ps verwendet.
[cloudera]$ ps -fU cloudera ... cloudera 23786 5918 0 22:10 pts/0 00:00:00 bash /opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/bin/../lib/spark2/bin/spark-shell --master yarn cloudera 23809 23786 6 22:10 pts/0 00:02:04 /usr/java/default/bin/java -cp /opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/conf/:/opt/cloudera
Für den Benutzer cloudera werden in diesem Fall 2 neue Prozesse im Betriebssystem gestartet. Der Erste Prozess mit der PID 23786 ist der Bash Prozess des spark-shell Scripts. Der zweite Prozess mit der PID 23809 ist die Java VM der Spark-Shell, die vom spark-shell Script gestartet wurde. Der Driver wird in diesem Java Prozess ausgeführt. Die YARN Prozesse werden nicht mit den Rechten des Benutzers cloudera gestartet und ausgeführt. Somit tauchen Sie in der Ausgabe auch nicht auf.
Auf das Wesentliche reduziert, ergibt sich der folgende Prozessbaum für die Spark-Shell:
23786 bash .. spark-shell --master yarn -> 23809 /usr/java/default/bin/java ..
Mit Hilfe der YARN "Application ID", können die gestarteten Linux Prozesse leicht mit ps und grep angezeigt werden. In einem Cluster aus mehreren Knoten müssen die Befehle auf den Worker Nodes ausgeführt werden, die als "Host" beim Kommando yarn clontainer -list ausgegeben werden.
[cloudera]$ ps -ef | grep 1563598094160_0002 cloudera 10542 20046 0 23:15 pts/100:00:00 grep 1563598094160_0002 yarn 24080 11125 0 22:11 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/default_container_executor.sh yarn 24082 24080 0 22:11 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg 'quickstart.cloudera:49810' --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/__spark_conf__/__spark_conf__.properties 1> /var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001/stdout 2> /var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001/stderr yarn 24085 24082 0 22:11 ? 00:00:30 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg quickstart.cloudera:49810 --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/__spark_conf__/__spark_conf__.properties
Neben dem grep Befehl gibt es 3 weitere Prozesse mit der gesuchten Application ID. Der erste, mit der PID 24080, ist das YARN Bash Script default_container_executor.sh. Dieses Script startet einen weiteren Bash Prozess mit der PID 24082. Dieser zweite Bash Prozess startet dann die Java VM mit der PID 24085. Die Java VM wird mit der Klasse ExecutorLauncher gestartet. Der YARN Container mit dem ApplicationMaster läuft in diesem Java Prozess.
Der erste Prozess, mit der PID 24080, wurde direkt vom NodeManager gestartet. Dass lässt sich mit der PPID 11125 leicht überprüfen:
[cloudera]$ ps -fp 11125 | cat UID PID PPID C STIME TTY TIME CMD yarn1112553594 21:47 ?00:04:29 /usr/java/default/bin/java -Dproc_nodemanager -Xmx1000m -Djava.net.preferIPv4Stack=true -server -Xms52428800 -Xmx52428800 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -Dlibrary.leveldbjni.path=/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER -Dhadoop.event.appender=,EventCatcher -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/yarn_yarn-NODEMANAGER_pid11125.hprof -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh -Dhadoop.log.dir=/var/log/hadoop-yarn -Dyarn.log.dir=/var/log/hadoop-yarn -Dhadoop.log.file=hadoop-cmf-yarn-NODEMANAGER-quickstart.cloudera.log.out -Dyarn.log.file=hadoop-cmf-yarn-NODEMANAGER-quickstart.cloudera.log.out -Dyarn.home.dir=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn -Dhadoop.home.dir=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn -Dhadoop.root.logger=INFO,RFA -Dyarn.root.logger=INFO,RFA -Djava.library.path=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native -classpath /var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER:/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER:/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs/./:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-mapreduce/.//*:/usr/share/cmf/lib/plugins/event-publish-5.13.0-shaded.jar:/usr/share/cmf/lib/plugins/tt-instrumentation-5.13.0.jar:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/lib/*:/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER/nm-config/log4j.properties org.apache.hadoop.yarn.server.nodemanager.NodeManager
Der folgende Baum zeigt die gestarteten Prozesse im YARN Cluster.
11125/usr/java/default/bin/java .. NodeManager -> 24080 bash .. default_container_executor.sh -> 22055 /bin/bash .. java .. ExecutorLauncher -> 22058 /usr/java/default/bin/java .. ExecutorLauncher
Fortsetzung
Im zweiten Teil dieses Beitrags führt Olaf Hein aus, wie die Python Shell mit PySpark gestartet wird.
- (PySpark) on YARN - Behind the Scenes (Teil 1)
- (PySpark) on YARN - Behind the Scenes (Teil 2)
- (PySpark) on YARN - Behind the Scenes (Teil 3)
- (PySpark) on YARN - Behind the Scenes (Teil 4)
Bei Updates im Blog, informieren wir per E-Mail.
Kommentare