Warum wird eine weitere Datenbank benötigt?
Im Umfeld von Hadoop gab es bisher zwei gängige Arten für die Speicherung strukturierter Daten. Zum einen als Datei im HDFS und zum anderen in der NoSQL-Datenbank HBase. HDFS ist sehr gut für die Speicherung und Verarbeitung riesiger Datenmengen geeignet. Allerdings ist es in HDFS nicht möglich, effizient einzelne Datensätze zu lesen (wahlfreier Zugriff) und einzufügen. Weiterhin können einmal geschriebene Datensätze nicht wieder verändert oder gelöscht werden. Das ganze System ist für die Verarbeitung großer Dateien im Batch-Betrieb optimiert. HBase auf der anderen Seite ist im Kern eine Key-Value-Datenbank. Das System speichert seine Daten sortiert nach dem Primary Key ab. Einzelne Datensätze können in HBase schnell eingefügt, geändert, selektiert und gelöscht werden. Range-Scans von mehreren tausend Datensätzen (Short Scans) sind auch sehr gut möglich. Sollen aber Millionen oder gar Milliarden von Datensätzen analysiert werden (Large Scans), dann ist HBase häufig zu langsam.
Mit HDFS und HBase gibt es somit zwei sehr gegensätzliche Systeme, die für sehr unterschiedliche Anwendungsfälle und Zugriffsmuster optimiert wurden. In vielen Fällen wird aber weder das eine noch das andere Extrem benötigt. Stattdessen werden häufig sowohl die Batch-Fähigkeiten von HDFS, als auch die Realtime-Fähigkeiten von HBase benötigt. Bisher wurden solche Systeme durch den Einsatz beider Technologien realisiert. Das führt allerdings oft zu relativ komplexen Architekturen der Anwendungen und zu redundanter Datenhaltung. Wie in Abbildung 1 dargestellt gibt es hier eine Lücke, die mit Kudu geschlossen werden kann. Kudu wurde als universell einsetzbares Storage- System konzipiert. Mit dem Einsatz von Kudu lassen sich die Architekturen vieler Anwendungen deutlich vereinfachen, sodass die Daten nur noch in einem System gespeichert werden müssen.
Was ist Kudu?
Kudu ist ein Top Level Apache-Projekt. Die Entwicklung wird aktuell hauptsächlich von Cloudera vorangetrieben. Die Version 1.0 wurde im September 2016 veröffentlich. Im Januar 2017 wurde die Version 1.2 vollständig in die Cloudera Enterprise Plattform integriert und für den produktiven Einsatz freigegeben. Zurzeit ist das Projekt sehr aktiv und ca. alle drei Monate wird eine neue Version freigegeben.
Die wichtigsten Eigenschaften von Kudu sind:
- Speicherung der Daten in Tabellen
- Tabellen haben ein vordefiniertes Schema
- Jeder Datensatz wird durch seinen eindeutigen Primary Key identifiziert
- Datensätze werden sortiert nach dem Primary Key gespeichert
- Verteilung der Daten im Cluster durch Partitionierung
- Hochverfügbare Architektur durch Replikation der Daten und Metadaten
- Niedrige Latenz beim wahlfreien Zugriff
- Hohe Geschwindigkeit beim sequentiellen Zugriff
- CRUD-Operationen werden vollständig unterstützt
- Datenbanktreiber für C++, Java und Python
- Integration in Spark, MapReduce und Impala
- Transportverschlüsselung mit TLS
- Authentifizierung mit Kerberos
Welches Datenmodell wird verwendet?
Das Datenbankschema von Kudu ähnelt sehr stark dem Schema einer relationalen Datenbank. Daten werden in Tabellen gespeichert, die wiederum aus Spalten mit vorgegebenen Datentypen und einem Primary Key bestehen. Bereits beim Einfügen findet eine strenge Typprüfung der Daten statt (Schema on write). Als Datentypen werden aktuell die wichtigsten elementaren Typen unterstützt (siehe Abbildung 2). Datentypen für Dezimalzahlen, Maps oder Arrays fehlen zurzeit.Die Datenbank arbeitet spaltenorientiert. Die einzelnen Spalten werden physikalisch in getrennten Dateien abgelegt. Dabei kann für jede Spalte einzeln definiert werden, wie die Daten kodiert und komprimiert werden.
Jede Tabelle hat einen Primary Key. Dieser kann aus einer oder aus mehreren Spalten bestehen. Die Daten einer Tabelle werden nach dem Primary Key sortiert abgespeichert. Ein weiterer Index kann nicht angelegt werden. Die Modellierung des Primary Keys ist, wie auch bei HBase, entscheidend für die spätere Scan Performance.
Bei einer Leseanfrage können als Selektionskriterium beliebige Attribute angegeben werden. Für eine optimale Performance müssen allerdings die Attribute des Primary Key in der Reihenfolge verwendet werden, in der sie auch bei der Definition des Schemas verwendet wurden. Dann kann die Datenbank sehr schnell zum ersten Datensatz springen und die Datensätze sequentiell lesen.
Datentyp |
Beschreibung |
INT8, INT16, INT32, INT64 |
Nummerische Datentypen für ganze Zahlen |
FLOAT, DOUBLE | Nummerische Datentypen für Fließkommazahlen |
BOOL | Boolscher Datentype |
STRING | UTF-8 kodierte Zeichenketten |
BINARY | Binäre Daten mit max. 64k je Feld |
UNIX_MICROS | Unix Zeitstempel in Microsekunden |
Können Tabellen partitioniert werden?
Um die Skalierbarkeit und die Performance zu erhöhen, können Tabellen partitioniert und dadurch in mehrere sogenannte Tablets aufgeteilt werden. Diese Tablets entsprechen Partitionen in anderen Datenbanksystemen. Eine nicht partitionierte Tabelle besteht aus genau einem Tablet.
Bei partitionierten Tabellen entspricht jede Partition einem Tablet. Ein Datensatz gehört immer zu genau einem Tablet. Durch die Partitionierung werden die Daten im Cluster verteilt. Die folgenden Partitionierungsarten können verwendet werden:
- Hash
- Range
Bei der Hash-Partitionierung werden die Spalten, nach denen partitioniert wird, und die Anzahl der Partitionen (Buckets) definiert. Die Daten werden anschließend anhand eines Hash-Wertes auf die Tablets verteilt. Die Partitionierung muss beim Anlegen der Tabelle durchgeführt werden und kann nachträglich nicht verändert werden. Typische Attribute für eine Hash-Partitionierung sind zum Beispiel Kundennummer oder Kontonummer.
Um beispielsweise eine Tabelle mit Umsatzdaten einer Bank zu partitionieren, bietet sich die IBAN des Kontos als Partitionierungsschlüssel an. Durch diese Partitionierung wird sichergestellt, dass aktuelle Umsätze beim Einfügen auf viele Partitionen verteilt werden. Dadurch werden „hot-blocks" vermieden und es können sehr viele Daten sehr schnell eingefügt werden. Weiterhin können die Umsätze eines einzelnen Kunden schnell gelesen werden, da alle Daten für eine IBAN immer auf einem Knoten im Cluster gespeichert werden und nicht umständlich zusammengesucht werden müssen. Allerdings ist der Zugriff auf die Umsätze aller Kunden in einem bestimmten Zeitraum nicht sehr effizient. Weiterhin wird das System über die Zeit nicht gut skalieren, da die Anzahl der Hash- Partitionen nicht erhöht werden kann.
Für die Range-Partitionierung werden die Daten anhand eines Wertebereichs auf die Tablets verteilt. Im Gegensatz zur Hash-Partitionierung, können einzelne Partitionen nachträglich erzeugt und auch wieder gelöscht werden. Oft wird die Range-Partitionierung anhand der Zeit durchgeführt. Für das Beispiel der Umsatzdaten bietet sich das Jahr der Buchung für die Range-Partitionierung an. Wird zusätzlich zum Jahr das vollständige Buchungsdatum als Teil des Primary Key definiert, sind alle Datensätze innerhalb einer Partition nach dem Datum sortiert. Dadurch können die Umsätze aller Kunden in einem definierten Zeitraum sehr schnell selektiert werden. Allerdings wird das Einfügen neuer Umsätze langsamer, da alle neuen Daten in dieselbe Partition geschrieben werden müssen. Weiterhin können die Umsätze eines einzelnen Kunden nicht mehr effizient gelesen werden, da sie in der Regel über mehrere Partitionen verteilt sind.
Die beiden Partitionierungsarten dürfen auch kombiniert werden. Eine oder mehrere Hash-Partitionierungen können zusammen mit einer Range-Partitionierung verwendet werden. Insbesondere bei der Speicherung von Time- Series-Daten wird sehr oft die Hash-Partitionierung zusammen mit der Range-Partitionierung genutzt. Bei den Umsatzdaten ist zum Beispiel die Hash-Partitionierung über die IBAN und die Range-Partitionierung über das Jahr des Buchungsdatums eine sinnvolle Partitionierung. In Abbildung 3 wird das nochmals verdeutlicht.
Die Kombination von Hash- und Range-Partitionierung sorgt dafür, dass die aktuellen Daten auf mehre Tablets verteilt werden. Insbesondere beim Einfügen von Time- Series-Daten kann dadurch verhindert werden, dass alle Daten in das gleiche Tablet eingefügt werden und dadurch Performance-Probleme verursachen. Durch die Range- Partitionierung nach dem Jahr wird sichergestellt, dass das System auf Dauer skaliert und nicht im Lauf der Jahre langsamer wird.
Wie sieht die Architektur aus?
Kudu besteht aus zwei Arten von Server-Prozessen. In Abbildung 4 wird der grundsätzliche Aufbau dargestellt:- Kudu Master
- Kudu Tablet Server
Der Kudu Master verwaltet die Metadaten des Systems. Das sind unter anderem Informationen zu den Tabellen, Tablets und Tablet Servern. Weiterhin koordiniert der Master alle Metadaten Operationen wie zum Beispiel das Anlegen, Ändern und Löschen von Tabellen. Es gibt immer genau einen aktiven Master. Dieser wird „Leader" genannt. In einfachen Umgebungen reicht ein Master Server häufig aus. Soll die Verfügbarkeit des Systems erhöht werden, dann können mehrere Master auf unterschiedlichen Hosts gestartet werden. Die weiteren Master werden in diesem Fall „Follower" genannt. Beim Ausfall des Leaders bestimmen die verbleibenden Master automatisch einen neuen Leader, sodass das System weiter verfügbar ist. Um einen Leader zu bestimmen, arbeitet das System intern mit dem „Raft Consensus Algorithmus" [Q3]. Bei diesem Algorithmus muss immer die Mehrzahl der Server verfügbar sein. Für eine Erhöhung der Verfügbarkeit wird eine ungerade Anzahl an Servern benötigt. In hochverfügbaren Umgebungen sind drei Master Server üblich.
Die Kudu Tablet Server speichern die eigentlichen Daten. Wie auch im HDFS, werden die Daten auf lokalen Festplatten des Server und nicht in einem hochverfügbaren Storage System abgelegt. Dadurch können sehr große Datenmengen relativ günstig gespeichert werden. Um sich vor Datenverlust zu schützen werden die Daten in der Regel repliziert. Ein Replikationsfaktor von 3 oder 5 ist üblich. Die Tablet Server verwenden ebenfalls den Raft Consensus Algorithmus, um einen Leader zu bestimmen. Dabei wird für jedes einzelne Tablet (d.h. für jede Partition einer Tabelle) der Leader ausgehandelt. Daten können nur über den Leader verändert werden. Leseanfragen können dagegen von allen Tablet Servern beantwortet werden.
In der Praxis werden die Master- und Tablet-Server- Prozesse auf unterschiedlichen Knoten im Cluster installiert. Client-Applikationen verbinden sich zum einen mit den Master Servern, um Metadaten auszutauschen. Dazu muss der Client alle Master Server kennen und es liegt in der Verantwortung des Client, sich mit dem aktuell aktiven Leader zu verbinden. Der Master teilt dem Client mit, auf welchen Tablets und somit auf welchen Tablet Servern welche Daten gespeichert werden. Für das Lesen und Schreiben von Daten verbindet sich der Client anschließend direkt mit den einzelnen Tablet Servern. Der Datenaustausch findet immer direkt zwischen dem Client und den Tablet Servern statt.
Wie kann Kudu integriert werden?
Für die Integration von Kudu in ein bestehendes Hadoop- System oder aber in eine neue Anwendung stehen verschiedene Schnittstellen und Treiber bereit.
Datenbanktreiber gibt es für die folgenden Programmiersprachen:
- C++
- Python
- Java
Kudu selbst wird in C++ implementiert. Der C++-Treiber und dessen Dokumentation sind sehr gut gepflegt. Der Python-Treiber stellt ein Interface für den C++-Treiber zur Verfügung. Beim Einsatz von Python muss zusätzlich der C++-Treiber installiert werden. Im Gegensatz dazu ist der Java-Treiber vollständig in Java implementiert und hat keine weiteren Abhängigkeiten. Der C++-Treiber wird in diesem Fall nicht benötigt. Bei der Programmierung mit Java und Python kann es sinnvoll sein, einen Blick in die C++-Dokumentation zu werfen, da diese in einigen Fällen etwas detaillierter ist.
Weiterhin ist Kudu sehr gut in die folgenden Komponenten des Hadoop-Ökosystems integriert:
- Impala
- Spark
- MapReduce
Kudu selbst hat keine Shell, um auf die Daten in der Datenbank zuzugreifen. Allerdings ist Kudu sehr gut in Impala integriert. Über Impala können SQL-Statements direkt gegen Kudu-Tabellen abgesetzt werden. Weiterhin können DDL-Statements zum Anlegen, Ändern und Löschen von Tabellen und Partitionen mit Impala ausgeführt werden.
Die Spark-Integration erfolgt über einen speziellen Kudu- Treiber. Das strenge Kudu-Tabellenschema ist für die Verarbeitung mit Spark und insbesondere mit Spark SQL sehr gut geeignet. Neben den üblichen CRUD-Operationen können mit Spark SQL auch DDL-Operationen ausgeführt werden.
Auch wenn MapReduce häufig durch Spark-Jobs ersetzt wird, so hat es für einige Randfälle immer noch seine Daseinsberechtigung. Der Kudu-Java-Treiber enthält alle notwendigen Klassen, um mit MapReduce Jobs auf Kudu- Daten zugreifen zu können.
Gibt es auch Lücken und Schwächen?
Kudu ist ein sehr junges System. Somit hat es noch nicht alle Features, die man von ausgereifteren Systemen, wie zum Beispiel HBase kennt. Erst mit der Version 1.3 wurden elementare Security Features integriert. Diese decken die wesentlichen Anforderungen wie zum Beispiel TLSVerschlüsselung und Authentifizierung über Kerberos ab. An weiteren Features wie zum Beispiel der Zugriffskontrolle für einzelne Tabellen wird noch gearbeitet.
Eine weitere offene Baustelle ist Backup und Recovery. Durch die Replikation der Daten ist das System vor Datenverlust durch den Defekt einzelner Server sehr gut abgesichert. Vor dem Verlust von Daten durch den Ausfall eines ganzen Rechenzentrums oder durch das Löschen von Daten innerhalb der Datenbank gibt es zurzeit keinen hinreichenden Schutz. Der regelmäßige Export der Kudu- Tabellen in HDFS-Dateien ist ein üblicherweise angewendeter Workaround für dieses Problem. Das kann zum Beispiel durch ein einfaches INSERT INTO ... SELECT ... Statement in Impala umgesetzt werden.
Das Projekt Apache Kudu und auch das Projekt Apache Impala werden massiv von Cloudera vorangetrieben. Entsprechend gut ist auch die Integration in die Hadoop- Distribution von Cloudera. In den anderen großen Distributionen von Hortonworks und MapR wird Kudu aktuell nicht unterstützt. Somit muss beim Einsatz von Kudu in einer dieser Distributionen mit zusätzlichem Installations- und Integrationsaufwand gerechnet werden. Weiterhin fehlen in diesem Fall auch komfortable Werkzeuge für die Überwachung und Konfiguration des Clusters.
Fazit
Kudu ist eine sehr interessante Datenbank. Insbesondere, wenn die Vielseitigkeit wichtiger ist als die Spezialisierung auf einige wenige Anwendungsfälle, kann Kudu seine Vorteile ausspielen. In der Praxis wird in vielen Fällen zusätzlich ein HDFS eingesetzt. Dieses speichert dann zusätzliche Daten oder dient als Backup System. Ein weiterer großer Vorteil von Kudu ist die sehr gute SQLUnterstützung durch Impala. Kudu ist noch sehr jung und verfügt in einigen Bereichen noch nicht über alle üblichen Features einer ausgereiften Datenbank. Insbesondere gibt es noch Lücken rund um Backup & Recovery und Security. Zurzeit ist das Projekt sehr aktiv und es ist damit zu rechnen, dass viele dieser Lücken im Lauf der Zeit geschlossen werden. Auf den Github [Q4] und Jira [Q5] Seiten des Projektes, kann sich jeder Interessierte über den Status der aktuellen Entwicklung informieren.
Glossar
ACID
ACID steht für die folgenden Eigenschaften eines Datenbanksystems: Atomar (Atomicity), Konsistenz (Consistency), Abgegrenzt (Isolation) und Dauerhaft (Durability).
Cloudera
Cloudera ist ein Hadoop Distributor. Neben einer kostenlosen Version bietet Cloudera auch Support und zusätzliche Funktionalitäten als Teil von kostenpflichtigen Distributionen an.
CRUD
Das Akronym CRUD umfasst die elementaren Datenbankoperationen Create, Retrieve, Update und Delete.
HBase
NoSQL-Datenbank, die ihre Daten im HDFS speichert.
HDFS
Das Hadoop Distributed File System ist ein verteiltes Dateisystem zur Speicherung sehr großer Datenmengen.
Impala
System zur Ausführung von SQL-Statements im Hadoop-Ökosystem. Unterstützt werden zum Beispiel HDFS, HBase und Kudu. NoSQL Steht für „Not only SQL" und bezeichnet Datenbanken, die nicht relational sind.
Raft Consensus Algorithmen
Verfahren, um in einem verteilten System die Konsistenz und Verfügbarkeit zu garantieren.
Schema on write
Verfahren im Datenbankumfeld, bei dem vor dem Schreiben geprüft wird, ob die Daten zum vordefinierten Schema der Tabelle passen.
Spark
Framework für die verteilte Verarbeitung von Daten in einem Cluster. Vergleichbar mit MapReduce, aber in vielen Fällen deutlich schneller.
Links/Quellen
[1] Big Data Seminar der ORDIX AG https://seminare.ordix.de/seminare/big-data-und-data-warehouse.html
[2] ORDIX® news Artikel 02/2013 – „NoSQL vs. SQL (Teil IV): HBase" https://www.ordix.de/ordix-news-archiv/1-2013.html
[3] ORDIX® news Artikel 3/2015 – „Big Data: Informationen neu gelebt (Teil III) - Apache Hadoop" https://www.ordix.de/ordix-news-archiv/3-2015.html
[Q1] Apache Kudu Homepage http://kudu.apache.org/
[Q2] Kudu-Dokumentation von Cloudera https://www.cloudera.com/documentation/kudu/latest.html
[Q3] The Raft Consensus Algorithm https://raft.github.io/
[Q4] GitHub-Seite des Kudu-Projektes https://github.com/apache/kudu
[Q5] Jira-Seite des Kudu-Projektes https://issues.apache.org/jira/browse/KUDU
Bildnachweise
© pexels.com© pixabay.com | HansenHimself | Impala
© flickr.com | Jim, the Photographer | Elephant
© istockphoto.com | anankkml | Greater Kudu in the dark and...
© freepik.com | @graphictwister | White wood texture