(PySpark) on YARN - Behind the Scenes (Teil 3)
In Teil 2 wurde gezeigt, welche Prozesse bei der Verwendung der PySpark Shell verwendet werden. Im dritten Teil geht Olaf Hein auf die Verwendung unterschiedlicher Python Versionen ein.
Verwendung unterschiedlicher Python Versionen
Im letzten Beispiel wurde die Standard Python Installation des Betriebssystems verwendet. Wenn eine andere Python Version verwendet werden soll, dann muss diese auf den Worker und Edge Knoten des Clusters installiert werden. Für Data Science und Maschinelles Lernen wird oft die Anaconda Python Distribution verwendet. Anaconda wird aktuell für Python 2.7 oder 3.7 bereitgestellt. Für die folgenden Beispiele werden die beiden Anaconda Versionen auf allen Knoten in den folgenden Verzeichnissen installiert:
/opt/bigdata/anaconda2 /opt/bigdata/anaconda3
Alternativ kann Anaconda auch nur einmal installiert werden. Mit dem Paketmanager conda können dann unterschiedliche Umgebungen für unterschiedliche Python Versionen erstellt werden.
In Spark kann die verwendete Python Version in der Datei spark-env.sh mit den folgenden Variablen konfiguriert werden:
# Pfad des Python Interpreters für die Executor Prozesse PYSPARK_PYTHON=/opt/bigdata/anaconda2/bin/python # Pfad des Python Interpreters für den Driver # Wenn leer, dann wird PYSPARK_PYTHON als default verwendet PYSPARK_DRIVER_PYTHON
Im Cloudera Manager kann die Konfiguration über die Web Oberfläche erfolgen und im Cluster verteilt werden. Dabei ist darauf zu achten, dass die Konfiguration sowohl für die Worker als auch für die Edge Nodes durchgeführt wird. Im Clustermanager gibt es dafür unterschiedliche Konfigurationsparameter. Die verwendeten Python Versionen des Drivers und der Executoren müssen beim Ausführen eines Jobs identisch sein!
Nach der Anaconda Installation und dem Verteilen der Spark Konfiguration wird eine neue PySpark Shell gestartet:
[cloudera]$ pyspark2 --master yarn Python 2.7.15 |Anaconda, Inc.| (default, May1 2018, 23:32:55) [GCC 7.2.0] on linux2 Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera3 /_/ Using Python version 2.7.15 (default, May1 2018 23:32:55) SparkSession available as 'spark'. >>>
Während eine Python Task ausgeführt wird, laufen diese relevanten Prozesse im Betriebssystem:
# ApplicationMaster Processe yarn205117250 0 05:25 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000001/default_container_executor.sh yarn20514 20511 0 05:25 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1 yarn20517 20514 34 05:25 ? 00:00:20 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000001/tmp -Dspark.yarn.a # Executor Processe yarn207747250 0 05:26 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000002/default_container_executor.sh yarn20776 20774 0 05:26 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1 yarn20781 20776 99 05:26 ? 00:00:18 /usr/java/default/bin/java -server -Xmx1024m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0003/container_1563711484737_0003_01_000002/tmp -Dspark.authe yarn20931 20781 15 05:26 ? 00:00:00 /opt/bigdata/anaconda2/bin/python -m pyspark.daemon yarn20936 20931 0 05:26 ? 00:00:00 /opt/bigdata/anaconda2/bin/python -m pyspark.daemon
Über die Parameter in der Datei spark-env.sh kann immer nur eine Python Version für alle Jobs im Cluster konfiguriert werden. Diese Version ist dann der Default für Jobs im Cluster.
Zusätzlich kann der Pfad zum Python Interpreter und damit zur Python Version je Spark Job über die folgenden Parameter angegeben werden:
spark.pyspark.driver.python spark.pyspark.python
Durch die Nutzung dieser Parameter ist es problemlos möglich, mehrere Python Installationen parallel in einem Cluster zu verwenden. Mit dem folgenden Aufruf wird die PySpark Shell mit Anaconda 3 und somit Python 3 gestartet. Die Parameter überschreiben die Standard Einstellungen in der Datei spark-env.sh.
[cloudera]$ pyspark2 --master yarn --conf spark.pyspark.python=/opt/bigdata/anaconda3/bin/python Python 3.7.3 (default, Mar 27 2019, 22:11:17) [GCC 7.3.0] :: Anaconda, Inc. on linux Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera3 /_/ Using Python version 3.7.3 (default, Mar 27 2019 22:11:17) SparkSession available as 'spark'. >>>
Beim Ausführen einer Python Task, zeigt ps jetzt die Python Prozesse aus der Anaconda 3 Installation an:
# ApplicationMaster Processe yarn23595 7250 0 05:34 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000001/default_container_executor.sh yarn23597 23595 0 05:34 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1 yarn23600 23597 32 05:34 ? 00:00:19 /usr/java/default/bin/java -server -Xmx512m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000001/tmp -Dspark.yarn.a # Executor Processe yarn238967250 0 05:35 ? 00:00:00 bash /yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000002/default_container_executor.sh yarn23899 23896 0 05:35 ? 00:00:00 /bin/bash -c LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/../../../CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/lib/native::/opt/cloudera/parcels/CDH-5.13.0-1 yarn23911 23899 99 05:35 ? 00:00:20 /usr/java/default/bin/java -server -Xmx1024m -Djava.io.tmpdir=/yarn/nm/usercache/cloudera/appcache/application_1563711484737_0004/container_1563711484737_0004_01_000002/tmp -Dspark.drive yarn24006 23911 13 05:35 ? 00:00:00 /opt/bigdata/anaconda3/bin/python -m pyspark.daemon yarn24012 24006 0 05:35 ? 00:00:00 /opt/bigdata/anaconda3/bin/python -m pyspark.daemon
Fortsetzung
Im letzten Teil dieses Beitrags führt Olaf Hein unter anderem aus, wie mehr Sicherheit mit dem YARN-Container Executor realisiert wird.
Bei Updates im Blog, informieren wir per E-Mail.
Kommentare