(PySpark) on YARN - Behind the Scenes (Teil 1)

pyspark-1

In den letzten Wochen wurde ich mehrfach von Kunden gefragt, was im Betriebssystem passiert, wenn Spark bzw. PySpark Jobs ausgeführt werden. Dabei waren vor allem die folgenden Fragen von Interesse:

  • Welche Prozesse werden im Betriebssystem gestartet?
  • Mit welchen Rechten werden diese Prozesse ausgeführt?
  • Welcher Python Interpreter und welche Bibliotheken werden verwendet?
  • Wie verhält sich YARN beim Einsatz von Kerberos?

Dieser Artikel gibt Antworten auf diese und noch einige andere Fragen. ​

Kurze Wiederholung: Spark on YARN Basics  

Beim Ausführen eines Spark- oder PySpark Jobs mit YARN, wird von Spark zuerst ein Driver Prozess gestartet. In den folgenden Beispielen wird dazu die Spark-Shell auf einem der Edge Nodes gestartet (Siehe Abbildung 1). Der Driver kommuniziert mit dem RessourceManger auf dem Master Node, um eine YARN Applikation zu starten. Der RessourceManager wählt einen Worker Node aus und startet dort einen YARN Container mit dem ApplicationMaster Prozess für diesen Spark Job. Ein YARN Container ist, etwas vereinfacht, ein von YARN kontrollierter Java Prozess. Einem Container werden Ressourcen (CPU Cores und Hauptspeicher) zugewiesen und in ihm findet die eigentliche Verarbeitung der Daten im Cluster statt. Für den Start von Spark Executoren fordert der ApplicationMaster neue Ressourcen beim Ressource Manager an. Anschließend kommuniziert der ApplicationMaster mit den NodeManagern auf den zugewiesenen Worker Nodes, die anschließend weitere Container starten  

Abbildung 1: YARN Prozesse

Testumgebung  

Die folgenden Beispiele wurden mit der Cloudera Distribution for Hadoop (CDH) in einer Quickstart VM in einem pseudo-distributed Cluster (ein Cluster mit nur einem Knoten) durchgeführt. Für die Tests hat das den Vorteil, dass alle Prozesse auf demselben Knoten ausgeführt werden. Eine Ausführung in einem Cluster mit mehreren Knoten unterscheidet sich dadurch, dass die YARN Container dann auf beliebigen Worker Knoten gestartet werden. Bei der getesteten Funktionalität handelt es sich um Standard Hadoop und Spark Funktionen. Ein Cluster, der mit der Hortonworks Data Platform (HDP), der MapR Data Platform oder den Open Source Paketen aufgebaut wurde, wird sich mit einer vergleichbaren Konfiguration identisch verhalten.

In der VM wurde Spark 2 zusätzlich zu Spark 1 installiert. Für den Start von Spark 2 stellt Cloudera die Kommandos spark2-shell, spark2-submit und pyspark2 zur Verfügung. In den Beispielen wird immer mit der Scala oder Python Shell gearbeitet. Für die Experimente hat das den Vorteil, dass automatisch eine YARN Applikation erzeugt wird. Die Applikation verursacht keine unnötige Last auf dem System und wird erst beendet, wenn die Shell vom Benutzer geschlossen wird.  

Starten der Spark-Shell

Für das erste Beispiel wird eine Spark Shell im YARN Cluster gestartet. 

[cloudera]$ spark2-shell --master yarn 

In einem zweiten Terminal kann jetzt überprüft werden, welche YARN Container und welche dazugehörigen Prozesse im Betriebssystem gestartet wurden. Dazu wird das Kommandozeilen-Tool yarn verwendet. Um die Container aufzulisten, wird die "Application Attempt ID" benötigt. Diese kann mit Hilfe der Kommandos application und applicationattempts angezeigt werden.

[cloudera]$ yarn application -list
19/07/19 22:34:27 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id   Application-Name   Application-Type   User   Queue   State   Final-State   Progress   Tracking-URL
application_1563598094160_0002   Spark shell   SPARK   cloudera   root.users.cloudera   RUNNING   UNDEFINED   10%   http://quickstart.cloudera:4040

[cloudera]$ yarn applicationattempt -list application_1563598094160_0002
19/07/19 22:34:36 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
Total number of application attempts :1
ApplicationAttempt-Id   State   AM-Container-Id   Tracking-URL
appattempt_1563598094160_0002_000001   RUNNING   container_1563598094160_0002_01_000001   http://quickstart.cloudera:8088/proxy/application_1563598094160_0002/

[cloudera]$ yarn container -list appattempt_1563598094160_0002_000001
19/07/19 22:34:51 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
Total number of containers :1
Container-Id   Start Time   Finish Time   State   Host   LOG-URL
container_1563598094160_0002_01_000001   Fri Jul 19 22:11:00 -0700 2019   N/A   RUNNING   quickstart.cloudera:8041  http://quickstart.cloudera:8042/node/containerlogs/container_1563598094160_0002_01_000001/cloudera 

Auffällig ist, dass es nur einen Container gibt. Das liegt daran, dass die Spark Shell nur den Application Master gestartet hat. Es gibt noch keine Container für die Spark Executor Prozesse. Diese werden erst bei Bedarf angelegt.

Als nächstes werden die zugehörigen Prozesse im Betriebssystem angezeigt. Dazu wird das Linux Tool ps verwendet.

[cloudera]$ ps -fU cloudera
...
cloudera 23786 5918 0 22:10 pts/0 00:00:00 bash /opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/bin/../lib/spark2/bin/spark-shell --master yarn
cloudera 23809 23786 6 22:10 pts/0 00:02:04 /usr/java/default/bin/java -cp /opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/conf/:/opt/cloudera 

Für den Benutzer cloudera werden in diesem Fall 2 neue Prozesse im Betriebssystem gestartet. Der Erste Prozess mit der PID 23786 ist der Bash Prozess des spark-shell Scripts. Der zweite Prozess mit der PID 23809 ist die Java VM der Spark-Shell, die vom spark-shell Script gestartet wurde. Der Driver wird in diesem Java Prozess ausgeführt. Die YARN Prozesse werden nicht mit den Rechten des Benutzers cloudera gestartet und ausgeführt. Somit tauchen Sie in der Ausgabe auch nicht auf.

Auf das Wesentliche reduziert, ergibt sich der folgende Prozessbaum für die Spark-Shell:

23786 bash .. spark-shell --master yarn
-> 23809 /usr/java/default/bin/java .. 

Mit Hilfe der YARN "Application ID", können die gestarteten Linux Prozesse leicht mit ps und grep angezeigt werden. In einem Cluster aus mehreren Knoten müssen die Befehle auf den Worker Nodes ausgeführt werden, die als "Host" beim Kommando yarn clontainer -list ausgegeben werden.  

[cloudera]$ ps -ef | grep 1563598094160_0002
cloudera 10542 20046 0 23:15 pts/100:00:00 grep 1563598094160_0002
yarn 24080 11125 0 22:11 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/default_container_executor.sh

yarn 24082 24080 0 22:11 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg 'quickstart.cloudera:49810' --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/__spark_conf__/__spark_conf__.properties 1> /var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001/stdout 2> /var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001/stderr

yarn 24085 24082 0 22:11 ? 00:00:30 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1563598094160_0002/container_1563598094160_0002_01_000001 org.apache.spark.deploy.yarn.ExecutorLauncher --arg quickstart.cloudera:49810 --properties-file /yarn/nm/usercache/cloudera/appcache/application_1563598094160_0002/container_1563598094160_0002_01_000001/__spark_conf__/__spark_conf__.properties 

Neben dem grep Befehl gibt es 3 weitere Prozesse mit der gesuchten Application ID. Der erste, mit der PID 24080, ist das YARN Bash Script default_container_executor.sh. Dieses Script startet einen weiteren Bash Prozess mit der PID 24082. Dieser zweite Bash Prozess startet dann die Java VM mit der PID 24085. Die Java VM wird mit der Klasse ExecutorLauncher gestartet. Der YARN Container mit dem ApplicationMaster läuft in diesem Java Prozess.

Der erste Prozess, mit der PID 24080, wurde direkt vom NodeManager gestartet. Dass lässt sich mit der PPID 11125 leicht überprüfen:

[cloudera]$ ps -fp 11125 | cat
UID   PID   PPID   C STIME TTY   TIME CMD
yarn1112553594 21:47 ?00:04:29 /usr/java/default/bin/java -Dproc_nodemanager -Xmx1000m -Djava.net.preferIPv4Stack=true -server -Xms52428800 -Xmx52428800 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -Dlibrary.leveldbjni.path=/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER -Dhadoop.event.appender=,EventCatcher -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/yarn_yarn-NODEMANAGER_pid11125.hprof -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh -Dhadoop.log.dir=/var/log/hadoop-yarn -Dyarn.log.dir=/var/log/hadoop-yarn -Dhadoop.log.file=hadoop-cmf-yarn-NODEMANAGER-quickstart.cloudera.log.out -Dyarn.log.file=hadoop-cmf-yarn-NODEMANAGER-quickstart.cloudera.log.out -Dyarn.home.dir=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn -Dhadoop.home.dir=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn -Dhadoop.root.logger=INFO,RFA -Dyarn.root.logger=INFO,RFA -Djava.library.path=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native -classpath /var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER:/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER:/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs/./:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-mapreduce/.//*:/usr/share/cmf/lib/plugins/event-publish-5.13.0-shaded.jar:/usr/share/cmf/lib/plugins/tt-instrumentation-5.13.0.jar:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn/lib/*:/var/run/cloudera-scm-agent/process/125-yarn-NODEMANAGER/nm-config/log4j.properties org.apache.hadoop.yarn.server.nodemanager.NodeManager 

Der folgende Baum zeigt die gestarteten Prozesse im YARN Cluster.

11125/usr/java/default/bin/java .. NodeManager
-> 24080 bash .. default_container_executor.sh
   -> 22055 /bin/bash .. java .. ExecutorLauncher
      -> 22058 /usr/java/default/bin/java .. ExecutorLauncher 

Fortsetzung

Im zweiten Teil dieses Beitrags führt Olaf Hein aus, wie die Python Shell mit PySpark gestartet wird.


 

Kommentare

Derzeit gibt es keine Kommentare. Schreibe den ersten Kommentar!
Gäste
Donnerstag, 17. Oktober 2019

Sicherheitscode (Captcha)