Von ORDIX AG auf Donnerstag, 19. September 2019
Kategorie: Data Management

(PySpark) on YARN - Behind the Scenes (Teil 1)

In den letzten Wochen wurde ich mehrfach von Kunden gefragt, was im Betriebssystem passiert, wenn Spark bzw. PySpark Jobs ausgeführt werden. Dabei waren vor allem die folgenden Fragen von Interesse:


Dieser Artikel gibt Antworten auf diese und noch einige andere Fragen. ​

Kurze Wiederholung: Spark on YARN Basics  

Beim Ausführen eines Spark- oder PySpark Jobs mit YARN, wird von Spark zuerst ein Driver Prozess gestartet. In den folgenden Beispielen wird dazu die Spark-Shell auf einem der Edge Nodes gestartet (Siehe Abbildung 1). Der Driver kommuniziert mit dem RessourceManger auf dem Master Node, um eine YARN Applikation zu starten. Der RessourceManager wählt einen Worker Node aus und startet dort einen YARN Container mit dem ApplicationMaster Prozess für diesen Spark Job. Ein YARN Container ist, etwas vereinfacht, ein von YARN kontrollierter Java Prozess. Einem Container werden Ressourcen (CPU Cores und Hauptspeicher) zugewiesen und in ihm findet die eigentliche Verarbeitung der Daten im Cluster statt. Für den Start von Spark Executoren fordert der ApplicationMaster neue Ressourcen beim Ressource Manager an. Anschließend kommuniziert der ApplicationMaster mit den NodeManagern auf den zugewiesenen Worker Nodes, die anschließend weitere Container starten  

Testumgebung  

Die folgenden Beispiele wurden mit der Cloudera Distribution for Hadoop (CDH) in einer Quickstart VM in einem pseudo-distributed Cluster (ein Cluster mit nur einem Knoten) durchgeführt. Für die Tests hat das den Vorteil, dass alle Prozesse auf demselben Knoten ausgeführt werden. Eine Ausführung in einem Cluster mit mehreren Knoten unterscheidet sich dadurch, dass die YARN Container dann auf beliebigen Worker Knoten gestartet werden. Bei der getesteten Funktionalität handelt es sich um Standard Hadoop und Spark Funktionen. Ein Cluster, der mit der Hortonworks Data Platform (HDP), der MapR Data Platform oder den Open Source Paketen aufgebaut wurde, wird sich mit einer vergleichbaren Konfiguration identisch verhalten.

In der VM wurde Spark 2 zusätzlich zu Spark 1 installiert. Für den Start von Spark 2 stellt Cloudera die Kommandos spark2-shell, spark2-submit und pyspark2 zur Verfügung. In den Beispielen wird immer mit der Scala oder Python Shell gearbeitet. Für die Experimente hat das den Vorteil, dass automatisch eine YARN Applikation erzeugt wird. Die Applikation verursacht keine unnötige Last auf dem System und wird erst beendet, wenn die Shell vom Benutzer geschlossen wird.  

Starten der Spark-Shell

Für das erste Beispiel wird eine Spark Shell im YARN Cluster gestartet. 

In einem zweiten Terminal kann jetzt überprüft werden, welche YARN Container und welche dazugehörigen Prozesse im Betriebssystem gestartet wurden. Dazu wird das Kommandozeilen-Tool yarn verwendet. Um die Container aufzulisten, wird die "Application Attempt ID" benötigt. Diese kann mit Hilfe der Kommandos application und applicationattempts angezeigt werden.

Auffällig ist, dass es nur einen Container gibt. Das liegt daran, dass die Spark Shell nur den Application Master gestartet hat. Es gibt noch keine Container für die Spark Executor Prozesse. Diese werden erst bei Bedarf angelegt.

Als nächstes werden die zugehörigen Prozesse im Betriebssystem angezeigt. Dazu wird das Linux Tool ps verwendet.

Für den Benutzer cloudera werden in diesem Fall 2 neue Prozesse im Betriebssystem gestartet. Der Erste Prozess mit der PID 23786 ist der Bash Prozess des spark-shell Scripts. Der zweite Prozess mit der PID 23809 ist die Java VM der Spark-Shell, die vom spark-shell Script gestartet wurde. Der Driver wird in diesem Java Prozess ausgeführt. Die YARN Prozesse werden nicht mit den Rechten des Benutzers cloudera gestartet und ausgeführt. Somit tauchen Sie in der Ausgabe auch nicht auf.

Auf das Wesentliche reduziert, ergibt sich der folgende Prozessbaum für die Spark-Shell:

Mit Hilfe der YARN "Application ID", können die gestarteten Linux Prozesse leicht mit ps und grep angezeigt werden. In einem Cluster aus mehreren Knoten müssen die Befehle auf den Worker Nodes ausgeführt werden, die als "Host" beim Kommando yarn clontainer -list ausgegeben werden.  

Neben dem grep Befehl gibt es 3 weitere Prozesse mit der gesuchten Application ID. Der erste, mit der PID 24080, ist das YARN Bash Script default_container_executor.sh. Dieses Script startet einen weiteren Bash Prozess mit der PID 24082. Dieser zweite Bash Prozess startet dann die Java VM mit der PID 24085. Die Java VM wird mit der Klasse ExecutorLauncher gestartet. Der YARN Container mit dem ApplicationMaster läuft in diesem Java Prozess.

Der erste Prozess, mit der PID 24080, wurde direkt vom NodeManager gestartet. Dass lässt sich mit der PPID 11125 leicht überprüfen:

Der folgende Baum zeigt die gestarteten Prozesse im YARN Cluster.

Fortsetzung

Im zweiten Teil dieses Beitrags führt Olaf Hein aus, wie die Python Shell mit PySpark gestartet wird.

Kommentare hinterlassen