(PySpark) on YARN - Behind the Scenes (Teil 3)

pyspark-3

In Teil 2 wurde gezeigt, welche Prozesse bei der Verwendung der PySpark Shell verwendet werden. Im dritten Teil geht Olaf Hein auf die Verwendung unterschiedlicher Python Versionen ein.

Verwendung unterschiedlicher Python Versionen 

Im letzten Beispiel wurde die Standard Python Installation des Betriebssystems verwendet. Wenn eine andere Python Version verwendet werden soll, dann muss diese auf den Worker und Edge Knoten des Clusters installiert werden. Für Data Science und Maschinelles Lernen wird oft die Anaconda Python Distribution verwendet. Anaconda wird aktuell für Python 2.7 oder 3.7 bereitgestellt. Für die folgenden Beispiele werden die beiden Anaconda Versionen auf allen Knoten in den folgenden Verzeichnissen installiert:

/opt/bigdata/anaconda2
/opt/bigdata/anaconda3 

Alternativ kann Anaconda auch nur einmal installiert werden. Mit dem Paketmanager conda können dann unterschiedliche Umgebungen für unterschiedliche Python Versionen erstellt werden.

In Spark kann die verwendete Python Version in der Datei spark-env.sh mit den folgenden Variablen konfiguriert werden:

# Pfad des Python Interpreters für die Executor Prozesse
PYSPARK_PYTHON=/opt/bigdata/anaconda2/bin/python
# Pfad des Python Interpreters für den Driver
# Wenn leer, dann wird PYSPARK_PYTHON als default verwendet
PYSPARK_DRIVER_PYTHON 

Im Cloudera Manager kann die Konfiguration über die Web Oberfläche erfolgen und im Cluster verteilt werden. Dabei ist darauf zu achten, dass die Konfiguration sowohl für die Worker als auch für die Edge Nodes durchgeführt wird. Im Clustermanager gibt es dafür unterschiedliche Konfigurationsparameter. Die verwendeten Python Versionen des Drivers und der Executoren müssen beim Ausführen eines Jobs identisch sein!

Nach der Anaconda Installation und dem Verteilen der Spark Konfiguration wird eine neue PySpark Shell gestartet: 

[cloudera]$ pyspark2 --master yarn
Python 2.7.15 |Anaconda, Inc.| (default, May1 2018, 23:32:55)
[GCC 7.2.0] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera3
      /_/

Using Python version 2.7.15 (default, May1 2018 23:32:55)
SparkSession available as 'spark'.
>>> 

Während eine Python Task ausgeführt wird, laufen diese relevanten Prozesse im Betriebssystem:

# ApplicationMaster Processe
yarn205117250 0 05:25 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000001/default_container_executor.sh
yarn20514 20511 0 05:25 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1
yarn20517 20514 34 05:25 ? 00:00:20 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000001/tmp -Dspark.yarn.a

# Executor Processe
yarn207747250 0 05:26 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000002/default_container_executor.sh
yarn20776 20774 0 05:26 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1
yarn20781 20776 99 05:26 ? 00:00:18 /usr/java/default/bin/java -server -Xmx1024m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000002/tmp -Dspark.authe
yarn20931 20781 15 05:26 ? 00:00:00 /opt/bigdata/anaconda2/bin/python -m pyspark.daemon
yarn20936 20931 0 05:26 ? 00:00:00 /opt/bigdata/anaconda2/bin/python -m pyspark.daemon 

Über die Parameter in der Datei spark-env.sh kann immer nur eine Python Version für alle Jobs im Cluster konfiguriert werden. Diese Version ist dann der Default für Jobs im Cluster.

Zusätzlich kann der Pfad zum Python Interpreter und damit zur Python Version je Spark Job über die folgenden Parameter angegeben werden:

spark.pyspark.driver.python
spark.pyspark.python 

Durch die Nutzung dieser Parameter ist es problemlos möglich, mehrere Python Installationen parallel in einem Cluster zu verwenden. Mit dem folgenden Aufruf wird die PySpark Shell mit Anaconda 3 und somit Python 3 gestartet. Die Parameter überschreiben die Standard Einstellungen in der Datei spark-env.sh.

[cloudera]$ pyspark2 --master yarn --conf spark.pyspark.python=/opt/bigdata/anaconda3/bin/python
Python 3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera3
      /_/

Using Python version 3.7.3 (default, Mar 27 2019 22:11:17)
SparkSession available as 'spark'.
>>> 

Beim Ausführen einer Python Task, zeigt ps jetzt die Python Prozesse aus der Anaconda 3 Installation an:

# ApplicationMaster Processe
yarn23595 7250 0 05:34 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000001/default_container_executor.sh
yarn23597 23595 0 05:34 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1
yarn23600 23597 32 05:34 ? 00:00:19 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000001/tmp -Dspark.yarn.a

# Executor Processe
yarn238967250 0 05:35 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000002/default_container_executor.sh
yarn23899 23896 0 05:35 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1
yarn23911 23899 99 05:35 ? 00:00:20 /usr/java/default/bin/java -server -Xmx1024m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000002/tmp -Dspark.drive
yarn24006 23911 13 05:35 ? 00:00:00 /opt/bigdata/anaconda3/bin/python -m pyspark.daemon
yarn24012 24006 0 05:35 ? 00:00:00 /opt/bigdata/anaconda3/bin/python -m pyspark.daemon 

Fortsetzung

Im letzten Teil dieses Beitrags führt Olaf Hein unter anderem aus, wie mehr Sicherheit mit dem YARN-Container Executor realisiert wird.

 

Kommentare

Derzeit gibt es keine Kommentare. Schreibe den ersten Kommentar!
Gäste
Donnerstag, 17. Oktober 2019

Sicherheitscode (Captcha)