Apache NiFi: Zurück zum Code? Was kann die nipyapi?

apache-nifi-titelbild

Apache NiFi erfreut sich weiter steigender Beliebtheit. Vor allem Entwickler:innen schätzen die über den Browser erreichbare Web-UI. Dadurch lassen sich mit wenigen Klicks anspruchsvolle Flows erstellen, die Daten transformieren und transportieren können.

Für NiFi Admins oder Operators kann die UI jedoch unbequem werden. Immer wiederkehrende Tätigkeiten wie das Anlegen von Usern oder Policies müssen jedes Mal aufs Neue händisch durchgeführt werden. Globales Ausrollen oder ein Update von Variablen? Beides ist nur manuell über den Browser möglich.

Dass hier Automatisierungsbedarf vorhanden ist, hat auch die NiFi-Community bemerkt und so verfügt NiFi seit einiger Zeit über eine REST-API, mit der sich NiFi administrieren und verwalten lässt. Passend dazu gibt es das Toolkit, eine Sammlung von Programmen zur besseren Verwaltung von NiFi. Dazu gehört die CLI, ein Java-Programm, das einige wichtige vordefinierte Befehle enthält, die ebenfalls die API ansprechen. Zur CLI wird es einen seperaten Blogartikel geben.

Neben der CLI haben einige Python-Entwickler:innen das Paket nipyapi entwickelt. Ebenfalls unter der Apache Software License 2.0 veröffentlicht erlaubt es die Ansteuerung der API über Python, was den Admins neue Möglichkeiten bietet. Darüber hinaus verfügt das Paket über vordefinierte Abfragen. Es können aber im Gegensatz zur CLI auch manuell alle verfügbaren Endpunkte angesprochen werden.

Das Projekt nipyapi wird aktuell öffentlich auf GitHubgehostet und entwickelt. Die Dokumentation ist sehr ausführlich und enthält neben den eigentlichen Funktionen auch Beispiele.

In diesem Blogartikel wollen wir uns die nipyapi näher anschauen und die folgenden exemplarische Aufgaben lösen:

  1. Das Management von Variablen in NiFi
  2. Die Verwaltung von Flows (hierbei wird auch die NiFi Registry angesprochen)
  3. Die Verwaltung von Templates in NiFi

Installation und Grundlagen

Bevor wir zu den Beispielen kommen, zunächst ein paar Grundlagen. Es werden insgesamt drei Abstraktionslevel geboten, um mit der NiFi API zu interagieren.

  • Level 3 (High Level): Komplette Beispiele und Demos; so kann mit dem Paket beispielsweise auch eine NiFi-Docker-Instanz gestartet werden.
  • Level 2 (Mid Level): Vordefinierte Funktionen, die für das tägliche Arbeiten meist unerlässlich sind. Mit diesen Funktionen wird am häufigsten gearbeitet.
  • Level 1 (Low Level): Hierbei handelt es sich um Funktionen, die direkt die Endpunkte der NiFi API ansprechen. Möchte man also Funktionen der API nutzen, die noch nicht in Level 2 implementiert wurden, kann man selbst alle Endpunkte ansprechen und eigene Funktionen erstellen.

Das Paket lässt sich einfach Mithilfe des Paketmanagers pip installieren:

pipinstall nipyapi

Um das Paket in Python zu nutzen, muss es nur noch importiert werden.

Alle notwendigen Einstellungen, wie die Adresse der NiFi-Instanz, lassen sich über globale Variablen tätigen.

import nipyapi 

nipyapi.config.nifi_config.host = "https://localhost:8443/nifi-api" 
nipyapi.config.nifi_config.ssl_ca_cert = "/certs/nifi-cert.pem"
nipyapi.config.nifi_config.verify_ssl = False 

nipyapi.config.nifi_config.username = "ordix" 
nipyapi.config.nifi_config.password = "changeit" 

nipyapi.security.service_login(service="nifi",username="ordix", password="changeit") 

print(nipyapi.canvas.get_root_pg_id())  

Ein erstes kleines Beispiel, das die ID der Root-Level-Prozessgruppe ausgibt, wird hier aufgezeigt. Das Paket wird zunächst importiert und anschließend werden alle notwendigen Einstellungen getroffen.

In diesem Beispiel wird eine lokale NiFi Instanz aufgerufen. Diese ist per HTTPS verschlüsselt. Da es sich um ein selbst signiertes Zertifikat handelt und in einer Testumgebung gearbeitet wird, geben wir den Pfad des CA-Zertifikates über den Parameter nipyapi.config.nifi_config.ssl_ca_cert an.

Die Zeilen 6 und 7 sind für die Authentifizierung des Users nötig. Hier sollte in Produktiv-Umgebungen möglichst ein User-Input oder ein Token verwendet werden.

Zeile 9 loggt sich in der NiFi Instanz mit den gegebenen Credentials ein.

In Zeile 11 wird dann eine der Mid Level Funktionen aufgerufen, um die process group id (pgid) der Root-Level Prozessgruppe auszugeben.

Mit den gegebenen Konfigurationen können schon erste Aufrufe gegen die API gestellt werden und mit diesem Grundgerüst werden wir nun weiterarbeiten.

Variablen verwalten

Sollen Variablen in NiFi global verwaltet werden, weil sie beispielsweise in mehreren Umgebungen verwendet werden, bietet sich eine Automatisierung an.

Wir wollen in diesem Beispiel folgendes Szenario betrachten: Eine Liste mit Variablen soll automatisiert ausgerollt und später aktualisiert werden.

Um Variablen zu erstellen, zu aktualisieren oder zu löschen, bietet sich die Funktion update_variable_registry() aus der Klasse canvas an. Diese Methode verlangt zwei Übergabeparameter (eine Prozessgruppe und eine Liste mit Variablen), jeweils als Key-Value Paar. Für die Prozessgruppe reicht die process group id (pgid) zunächst nicht aus. Wir behelfen uns mit der Methode get_process_group(), die eine Prozessgruppe anhand des Namens oder der ID identifiziert und zurückgibt. Dazu nehmen wir die ID der root-Level Prozessgruppe aus dem vorherigen Beispiel.

Die Liste mit den Variablen erstellen wir direkt in Python.

Wir erweitern unser Beispiel also um folgende Zeilen:

variables = [
("env", "DEV"),
("company", "ORDIX AG"),
("grafanaURL", "grafana.local"),
("grafanaPORT", 3003)
]
pgid = nipyapi.canvas.get_root_pg_id()
pg = nipyapi.canvas.get_process_group(pgid, identifier_type="id")
nipyapi.canvas.update_variable_registry(pg, variables) 

In Zeile 1 wird zunächst die Liste mit Variablen definiert. In der Liste befinden sich Tupel, die wiederum im Key-Value Format gespeichert sind. Als Nächstes wird die process group id (pgid) der root-Level Prozessgruppe in der Variablen pgid gespeichert und in Zeile 8 wird die gesamte Prozessgruppe anhand der pgid identifiziert und in der Variablen pg gespeichert.

In der letzten Zeile erfolgt dann der eigentliche Rollout der Variablen. Der Funktion update_variable_registry() werden nur noch die vorher gespeicherte Prozessgruppe pg und die Liste mit Variablen übergeben.

Wie der Screenshot zeigt, sind die Variablen sofort in NiFi verfügbar und können genutzt werden. Möchte man nun die erstellten Variablen updaten, kann man einfach den Wert in der Liste ändern und das Skript erneut ausführen.

Verwaltung von Flows

Nachdem wir bereits erfolgreich Variablen verwaltet haben, soll unser nächstes Ziel die Verwaltung von Flows in NiFi sein. Dazu werden wir zunächst die Versionsinformation eines Flows aus der NiFi Registry holen und eine neue Version committen. Nach der Verwendung der NiFi Registry werden wir uns die Verwaltung von Templates mit der nipyapi anschauen. Hierbei sollen existierende Templates aufgelistet und heruntergeladen werden.

NiFi Registry

Um die NiFi Registry anzusprechen, werden wir eine weitere Konfigurationsvariable setzen: registry_config.host Da die NiFi Registry in diesem Beispiel ohne weitere Authentifizierung konfiguriert ist, werden auch keine User-Credentials benötigt.

Der bisherige Code wird also um die folgenden zwei Zeilen erweitert, um die NiFi Registry zu konfigurieren und anschließend alle Buckets aufzulisten.

nipyapi.config.registry_config.host = "http://localhost:18080/nifi-registry-api"

print(nipyapi.versioning.list_registry_buckets()) 

Als Ausgabe gibt uns die Funktion eine Liste mit JSON-Daten zurück:

[
    {
        "allow_bundle_redeploy": False,
        "allow_public_read": False,
        "created_timestamp": 1621510766495,
        "description": None,
        "identifier": "77319271-8036-41b8-8a8d-152c5cf4ebcf",
        "link": {
            "href": "buckets/77319271-8036-41b8-8a8d-152c5cf4ebcf",
            "params": {"rel": "self"},
        },
        "name": "Monitoring",
        "permissions": {"can_delete": True, "can_read": True, "can_write": True},
        "revision": {"client_id": None, "last_modifier": None, "version": 0},
    },
    {
        "allow_bundle_redeploy": False,
        "allow_public_read": False,
        "created_timestamp": 1619021864010,
        "description": None,
        "identifier": "79fe1168-fc86-44db-89aa-4d1474e0825c",
        "link": {
            "href": "buckets/79fe1168-fc86-44db-89aa-4d1474e0825c",
            "params": {"rel": "self"},
        },
        "name": "Split-Merge-Conflict",
        "permissions": {"can_delete": True, "can_read": True, "can_write": True},
        "revision": {"client_id": None, "last_modifier": None, "version": 0},
    },
]
 

Wie man sieht, existieren aktuell zwei Buckets in der NiFi Registry. Die Ausgabe lässt sich in Python auch schöner formatieren und speziell nur die Namen ausgeben. Dazu speichern wir die Rückgabe der Funktion in einer Liste und iterieren durch diese.

buckets = nipyapi.versioning.list_registry_buckets()
for b in buckets: print(b.name) 

So entsteht die viel übersichtlichere Ausgabe:

Monitoring
Split-Merge-Conflict 

Als Nächstes möchten wir uns die Versionsinformationen einer Prozessgruppe anschauen. Dazu verwenden wir die Funktion 'get_version_info' und die Prozessgruppe `Monitoring` mit der ID `83f5de72-0179-1000-a480-791232f520b9`.

pgid = "44d02b4d-017a-1000-c843-53dd8faa5f6f"
pg = nipyapi.canvas.get_process_group(pgid, identifier_type="id")
print(nipyapi.versioning.get_version_info(pg)) 

Als Ausgabe bekommen wir wieder ein als JSON formatiertes Objekt:

{
    "disconnected_node_acknowledged": None,
    "process_group_revision": {
        "client_id": "44cff9a6-017a-1000-787a-f963435afc9b",
        "last_modifier": None,
        "version": 2,
    },
    "version_control_information": {
        "bucket_id": "77319271-8036-41b8-8a8d-152c5cf4ebcf",
        "bucket_name": "Monitoring",
        "flow_description": "",
        "flow_id": "9ea7b132-05e5-43bc-96d5-834b7a00e06f",
        "flow_name": "SitetoSite Monitoring",
        "group_id": "9ea7b132-05e5-43bc-96d5-834b7a00e06f",
        "registry_id": "84f62aae-0179-1000-c599-9624db9bf81e",
        "registry_name": "My",
        "state": "LOCALLY_MODIFIED",
        "state_explanation": "Local changes have been " "made",
        "version": 1,
    },
}
 

Die für uns interessanten Attribute liegen unter version_control_information und sind state, version sowie flow_name.

Man sieht, dass der Flow in Version 1 vorliegt, es aber lokale Änderungen gibt, die noch nicht in der NiFi Registry vorhanden sind.

Daher wollen wir diese als nächstes committen und damit eine neue Version 2 erzeugen.

Um dies zu realisieren, werden wir die Funktion save_flow_ver() nutzen:

client = nipyapi.versioning.get_registry_client("My")
bucket = nipyapi.versioning.get_registry_bucket("Monitoring")
flow_id = flow.version_control_information.flow_id
flow_name = flow.version_control_information.flow_name
nipyapi.versioning.save_flow_ver(process_group=pg, registry_client=client, bucket=bucket, flow_name=flow_name, flow_id=flow_id , comment="New Version with Registry") 

Zunächst holen wir uns einige Metainformationen, wie den Registry-Client in Zeile 1, den Bucket in Zeile 2 und die Flow-Informationen in Zeile 3 & 4.

Diese Metainformationen werden dann in Zeile 6 der Funktion übergeben. Damit kann diese den bestehenden Flow in der Registry identifizieren und eine neue Version, anstatt eines neuen Flows, anlegen. Dazu geben wir auch noch mit dem Attribut comment eine commit-Nachricht mit.

In der Registry erscheint die neue Version samt commit-Nachricht:

Templates

Falls keine NiFi Registry in der Umgebung verfügbar ist oder Flows außerhalb des Netzwerkes geteilt werden sollen, eignen sich nach wie vor Templates. Daher wollen wir uns auch hier anschauen, wie wir diese mit der nipyapi verwalten können.

Wir beginnen mit dem bereits bekannten Grundgerüst, indem wir die NiFi Instanz konfigurieren:

import nipyapi

nipyapi.config.nifi_config.host = "https://localhost:8443/nifi-api"
nipyapi.config.nifi_config.ssl_ca_cert = "/certs/nifi-cert.pem"

nipyapi.config.nifi_config.username = "ordix"
nipyapi.config.nifi_config.password = "changeit"

nipyapi.security.service_login(service="nifi",username="ordix", password="changeit") 

Um mit Templates zu interagieren, werden wir die Funktionen der Klasse templates nutzen. Zunächst möchten wir alle vorhandenen Templates auflisten, dazu erweitern wir das Grundgerüst um folgende Zeile:

print(nipyapi.templates.list_all_templates()) 

Die Funktion gibt uns eine Liste mit allen Template-Objekten zurück:

{
    "bulletins": None,
    "disconnected_node_acknowledged": None,
    "id": "0a30918e-bab0-3f2b-be72-9924c2a5ab17",
    "permissions": {"can_read": True, "can_write": True},
    "position": None,
    "revision": None,
    "template": {
        "description": "In diesem Beispiel werden zwei "
        "verschiedene HTTP-Requests "
        "getätigt. Sind die korrekten "
        "Namen enthalten, werden die "
        "FlowFiles gezählt, der Dateiname "
        "(filename) geändert und diese "
        "werden gespeichert.",
        "encoding_version": None,
        "group_id": "83c396e1-0179-1000-8bfa-f83a7d2e86f0",
        "id": "0a30918e-bab0-3f2b-be72-9924c2a5ab17",
        "name": "Example_HTTP-Request",
        "snippet": None,
        "timestamp": "10/28/2020 11:46:28 CET",
        "uri": "https://localhost:8443/nifi-api/templates/0a30918e-bab0-3f2b-be72-9924c2a5ab17",
    },
    "uri": None,
}
 

Neben all den Metainformationen interessiert uns hier vor allem der Name und die ID des Templates, daher parsen wir den Output wieder:

templates = nipyapi.templates.list_all_templates()

for template in templates.templates:

    print(f"Name: {template.template.name}")
    print(f"ID: {template.template.id}\n") 

Die Ausgabe ist nun deutlich übersichtlicher und enthält nur alle Informationen, die wir benötigen:

Name: Hello NiFi Web Service
ID: e06597a0-7388-3d44-87cf-4b92784c92e1

Name: Beispiel_DataRouting
ID: 50358fcb-e294-3cf0-ae7f-a71670a484fa

Name: Beispiel_Prozessgruppen
ID: 99d7ff5c-2aed-35b0-ab33-95e077119eea

Name: Example_HTTP-Request
ID: 0a30918e-bab0-3f2b-be72-9924c2a5ab17

Name: Beispiel_MySQL-Datenbank
ID: ea761f1f-6122-30c7-9aa1-920c8b952069 

Nachdem wir nun eine Liste aller Templates mit deren ID haben, wollen wir im nächsten Schritt eines dieser Templates exportieren. Dazu werden wir die Funktion export_template() nutzen:

id = "e06597a0-7388-3d44-87cf-4b92784c92e1"

nipyapi.templates.export_template(t_id=id, output="file", file_path="./nipyapi_template.xml") 

In unserem Dateisystem erscheint prompt die neue Datei 'nipyapi_template.xml':

$ ls nipyapi_template.xml

nipyapi_template.xml 

Diese Funktionalität wollen wir nun erweitern und alle vorhandenen Templates mit ihrem Namen exportieren:

templates = nipyapi.templates.list_all_templates()

for template in templates.templates:

    id = template.template.id
    name = template.template.name
    print(f"Name: {name}")
    print(f"ID: {id}\n")
    nipyapi.templates.export_template(t_id=id, output="file", file_path=f"./templates/{name}.xml") 

Das lässt sich einfach realisieren, indem wir den Export der Templates in der For-Schleife ausführen. Ein ls -l in der Konsole zeigt uns ebenfalls das erwartete Ergebnis.

$ ls -l templates

-rwxrwxrwx 1 jax jax 33175 Jun 25 22:40 Beispiel_DataRouting.xml
-rwxrwxrwx 1 jax jax 43115 Jun 25 22:40 Beispiel_MySQL-Datenbank.xml
-rwxrwxrwx 1 jax jax 62408 Jun 25 22:40 Beispiel_Prozessgruppen.xml
-rwxrwxrwx 1 jax jax 73735 Jun 25 22:40 Example_HTTP-Request.xml
-rwxrwxrwx 1 jax jax 37009 Jun 25 22:40 'Hello NiFi Web Service.xml' 

Fazit

Anhand dieser Beispiele kann man schon deutlich sehen, wie sehr eine mächtige Programmiersprache wie Python in Kombination mit einem gut dokumentierten und feature-reichem Paket einem Administrator viel Arbeit abnehmen kann.

Neben den gezeigten Anwendungsfällen gibt es noch eine Vielzahl weiterer Mid-Level Funktionen, die das Paket nipyapi bietet. Diese gehen weit über die vorhandenen Funktionen der CLI hinaus. So wurden innerhalb dieses Blog-Artikels bspw. nur einige wenige Funktionen der canvas Klasse genutzt und aus der system Klasse wurden gar keine Funktionen aufgerufen.

By accepting you will be accessing a service provided by a third-party external to https://blog.ordix.de/