Disco: Eine Alternative zu Hadoop
Die Administration von Hadoop ist für einen Hobby-Nutzer relativ umständlich. Nach dem, was ich im Berufsleben bisher von Hadoop gesehen habe, hatte ich zumindest keine Lust, mir zu Hause ein Testsystem einzurichten, das über eine fertige Distribution hinausgeht.
Umso interessanter erschien mir das Projekt Disco, als ich davon gelesen habe. Es ist ebenfalls eine Plattform zur parallelen Verarbeitung großer Datenmengen (ebenfalls mit verteiltem Dateisystem und Map-Reduce). Allerdings wurde es in Erlang programmiert, setzt für die Map-Reduce-Scripts nativ auf Python und wirkt auch sonst von der Dokumentation näher an meiner privaten Linux-Umgebung. Außerdem soll es wesentlich einfacher einzurichten sein.
Test einer lokalen Instanz
Beim Test auf nur einer lokalen Instanz (Archlinux) zeigten sich die ersten Schwierigkeiten. Die aktuelle Version von Disco funktioniert nicht mit Erlang 18.2 (laut Pull Request #632 seit 18.1 nicht mehr). Zwar funktioniert der genannte Pull Request nicht mit früheren Versionen von Erlang (und wurde deshalb noch nicht übernommen), mit den neueren jedoch schon.
Wenn man auf einem normalen Produktivsystem testet, kann man den Befehl
make install
einfach weglassen und für die Python-Scripts virtualenv
verwenden, um die Dateien nicht irgendwo im System zu verteilen.
Da Disco über SSH kommuniziert (auch mit dem localhost), müssen wir noch SSH-Keys einrichten und akzeptieren.
Der letzte Schritt ist dazu da, einmal den Fingerprint zu bestätigen.
Alternativ könnte man auch StrictHostKeyChecking=no
setzen.
Standardmäßig ist der Master (den wir eben installiert haben) mit einem Worker eingerichtet, sodass wir direkt einen Job starten können.
Ein einfaches Beispielskript für den ersten Test liefert das Disco-Tutorial:
Hier wird die Anzahl an Vorkommnissen pro Wort in einem Text gezählt, wobei der Text von einem HTTP-URL heruntergeladen wird.
Einen Cluster unter Amazon EC2 einrichten
Hat man einen AWS-Account, kann man sich halbwegs leicht einen Test-Cluster einrichten. Die Dokumentation zur Einrichtung eines richtigen Clusters fand ich teilweise etwas zerstreut, aber mit ein wenig Recherche kommt man dann doch durch.
Zunächst einmal muss man wissen, dass man Master-Instanzen und
Node-Instanzen installieren kann. Wobei die Master-Instanz eine Übermenge
der Node-Instanz ist. Das heißt, man kann theoretisch überall den Master
installieren, hat dann aber unnötig viel installiert.
Der Unterschied liegt lediglich in der Zeile make install
für den Master
gegen make install-node
für den Node.
Außerdem funktionierte bei mir Disco nicht unter der Amazon AMI
(ami-60b6c60a
), sondern nur mit Ubuntu Server 14.04 (ami-fce3c696
). Das
habe ich als zweites ausprobiert, als ich gesehen habe, dass das offizielle
AWS-Setup-Script Ubuntu verwendet. Ansonsten hätte ich
vermutlich vorher noch RHEL probiert, weil ich Ubuntu als Serversystem
meide.
Vorbereitung der EC2-Umgebung
Als allererstes richten wir uns eine neue Security Group für unsere EC2-Instanzen ein. Ich habe zwei relevante Security Groups aktiv:
- Disco:
- Inbound: All traffic, All protocols, All ports,
[ID der Security Group selbst]
- Outbound: All traffic, All protocols, All ports,
0.0.0.0/0
- Inbound: All traffic, All protocols, All ports,
- SSH everywhere:
- Inbound: SSH, TCP, 22,
0.0.0.0/0
- Outbound: All traffic, All protocols, All ports,
0.0.0.0/0
- Inbound: SSH, TCP, 22,
Den SSH-Zugang kann man sich natürlich auch auf “My IP” stellen, mit einer dynamischen IP ist das allerdings lästig, da dann jeden Tag die Gruppe aktualisiert werden muss.
Die Security Group Disco dient allen Disco-Knoten zur Kommunikation untereinander. Jeder, der in dieser Gruppe ist, darf zu jedem anderen in der Gruppe auf allen Ports kommunizieren. SSH everywhere dient denjenigen Knoten, auf die wir während der Installation zugreifen müssen.
Installation des Masters
Fangen wir mit dem Master an. Als allererstes richten wir wieder die SSH-Keys ein.
Jetzt müssen wir uns noch ein Erlang-Cookie einrichten, das anscheinend zur Authentifizierung von [verteiltem Erlang-Code][distributed-erlang] selbst verwendet wird.
Authentication determines which nodes are allowed to communicate with each other. In a network of different Erlang nodes, it is built into the system at the lowest possible level. Each node has its own magic cookie, which is an Erlang atom.
When a node tries to connect to another node, the magic cookies are compared. If they do not match, the connected node rejects the connection.
Das Erlang-Cookie muss auf allen unseren Disco-Knoten denselben Inhalt haben. Den Zufallsgenerator habe ich von earthgecko.
Anschließend können wir die Pakete aus dem Repository installieren und Disco kompilieren und installieren:
Installation der Slaves (Nodes)
Die Installation der Slaves läuft sehr ähnlich zum Master ab, bis auf die
Zeile sudo make install-node
. Zusätzlich brauchen wir bei den Slaves
noch einige bekannte Werte vom Master, nämlich das Erlang-Cookie und auch
den SSH-Public-Key. Außerdem müssen wir unseren SSH-Public-Key vom Slave
dann auch noch an den Master senden.
Da Master und Slave bisher in keiner Richtung miteinander kommunizieren können, müssen wir den Public-Key des Masters auf unseren lokalen PC kopieren und dann auf den Slave laden.
Damit ist schonmal die Verbindung vom Master zum Slave möglich. Da wir sowieso
schon vom lokalen PC aus mit den beiden kommunizieren, können wir umgekehrt
genau denselben Weg gehen (alternativ wäre es nun auch möglich, vom Master
aus den Pubkey des Slaves via scp
zu beziehen).
Und das Erlang-Cookie müssen wir auch noch vom Master zum Slave kopieren:
Nachdem diese ganzen Kommunikationseinstellungen erledigt sind, können wir vom Slave aus endlich den Rest abarbeiten, der fast genau gleich wie beim Master läuft:
Konfiguration des Clusters
Da wir außer Port 22 keine weiteren Ports vom Internet aus zugelassen haben, müssen wir uns einen SSH-Tunnel aufsetzen, um die webbasierte Oberfläche von Disco zu erreichen. Diese ist notwendig, um den Cluster zu konfigurieren. Wenn man das Format der Cluster-Konfigurationsdatei kennt, kann man sie natürlich auch manuell schreiben, definiert ist es allerdings (meines Wissens) nicht.
Den SSH-Tunnel können wir so aufbauen:
Und anschließend erreicht man die Weboberfläche ganz normal über Aufruf
von http://localhost:8989
im Browser.
Automatisierung des Setups
Dieses ganze Setup kann man sich nun natürlich auch in Scripts automatisieren. Einen Ansatz hierzu gibt es bereits unter dem Namen disposabledisco. disposabledisco hebt sich meines Erachtens besonders durch die Verwendung von Spot-Instanzen hervor.
Neben dem bisher bereits entwickelten Code (in etwas abgewandelter Form) benötigen wir nun auch noch die SSH-Kommunikation, um die SSH-Schlüssel zwischen Master und Slaves zu verteilen. Außerdem müssen wir natürlich auch die Instanzen automatisiert anlegen.
Ich habe mich dazu entschieden, das Script in Python mit boto3
zu
programmieren. Als SSH-Bibliothek gibt es paramiko
.
Vom Ablauf her starten wir zunächst einmal einen Master und danach die
gewünschte Anzahl von Slaves. Man kann dem Funktionsaufruf create_instances
direkt die Anzahl an zu startenden Servern mitgeben. Beim Aufruf von
create_instances
geben wir auch gleich ein Startup-Script mit an, das
die Rechner nach dem Booten ausführen sollen, um alle notwendigen Dienste
einzurichten. Dies ist der schwierigste Teil, da sich das Startup-Script
meines Wissens nicht besonders gut debuggen lässt.
Beide Scripts sind relativ ähnlich, wobei der Master das .erlang.cookie
anlegt und der Slave dies erst später vom Master erhält. Außerdem wird
entsprechend der gewünschten Instanz das richtige make
-Target install
oder install-node
ausgeführt.
Zunächst das Script für den Master:
Und das Script für den Slave:
In diesen Code-Segmenten ist jetzt noch der Platzhalter {PUBKEY}
enthalten.
Zwar verwendet EC2 sowieso schon meinen Public-Key auf den Instanzen,
allerdings ist mein zugehöriger Private-Key passwortgeschützt und ich
möchte mein Passwort nicht in den Python-Code für paramiko
eingeben. Deswegen
generiere ich ein temporäres Schlüsselpaar und lade den Public-Key davon in
die EC2-Instanz.
Das Hochfahren der Knoten ist dann relativ einfach. Wir starten zunächst den Master und dann die Slaves mit den Einstellungen wie zuvor auch. Eine kleine Stolperfalle gibt es nun noch. Bei mir im Cost Management schienen plötzlich Gebühren für regional traffic auf. Diese wurden vermutlich verursacht, weil meine Instanzen nicht in derselben Availability Zone waren. Deshalb müssen wir beim Starten der Slaves noch die Availability Zone angeben, in der auch der Master bereits läuft.
Wenn alle Instanzen
laufen, lassen wir uns über eine SSH-Verbindung den Public-Key vom Master
ausgeben und laden diesen auf die Slaves hoch. Genauso verfahren wir
umgekehrt: Wir geben die Public-Keys der Slaves aus und laden sie auf den
Master hoch. Außerdem holen wir uns das .erlang.cookie
vom Master und
laden es ebenfalls auf die Slaves hoch. Dabei ist darauf zu achten, dass das
Cookie exakt die Rechte 400
(Eigentümer lesen) haben muss.
Außerdem haben wir uns an dieser Stelle auch gleich die Private-Hostnames
der Slaves gespeichert. Diese brauchen wir zur korrekten Einrichtung der
Disco-Konfiguration. Da leider nur vorgesehen ist, diese über die Weboberfläche
zu ändern, ist der Code hierzu entsprechend hässlich, da ich die
Struktur ohne viel Aufwand nachbauen und in die Datei schreiben wollte. Der
folgende Code-Block ist auch noch Teil des try
-Blocks.
Anwendungsfall: Google n-grams
Als kleinen Anwendungsfall für Disco wollen wir die Google n-grams, genauer gesagt die 2-grams auswerten. Das wollen wir auch gleich dazu nutzen, Disco ein wenig zu evaluieren. Hierzu schreiben wir uns zunächst ein Script, das alle notwendigen Dateien von Google herunterlädt und entpackt. Dann versuchen wir einmal, die Daten bereits gechunkt (so wie sie von Google kommen) ins DDFS zu schreiben und einmal sie zusammenzufügen und dann ins DDFS zu schreiben und chunken zu lassen.
Dateien ins DDFS kopieren
Wichtig bevor wir Dateien ins DDFS kopieren: Sollte ein Rechner keine Worker
beheimaten (üblicherweise der Master), so muss man diesen anscheinend im DDFS
blacklisten. Andernfalls bleiben bei mir immer einige Map-Tasks im
Status waiting
hängen und gehen nicht weiter. Sobald ich in diesem Fall dem
Master einen Worker zugesprochen habe, wurde sofort mit der Abarbeitung der
Tasks begonnen. Eventuell wäre die richtigere Einstellung auch Blacklisted
nodes for Disco, das habe ich mir noch nicht genau angesehen. Der
Beschreibung zufolge scheint Blacklisted nodes for DDFS ein Subset von
Blacklisted Nodes for Disco zu sein - zumindest dann, wenn noch keine
Daten im DDFS sind?
- Blacklisted nodes for Disco: No new disco tasks or ddfs blobs will be assigned to the node.
- Blacklisted nodes for DDFS: Re-replicates the blobs out of this node and prepares it for being removed from the cluster.
Hier das Script, das die Dateien herunterlädt und gleich zusammenfügt:
Die entpackten und zusammengefügten Daten der Google 2-grams (für Deutsch in der Version von 2009) haben 49GB. Um diese nun ins DDFS zu kriegen und DDFS das Chunken übernehmen zu lassen, brauchen wir folgenden Befehl.
Der Pfadbeginn ./
ist wichtig, da DDFS den Namen sonst für ein DDFS-Tag
hält und einen Fehler ausgibt.
Die Verarbeitung dauerte bei mir mit insgesamt fünf Maschinen (1 Master,
4 Slaves) auf t2.micro-Instanzen 95 Minuten (real
), wobei 70 Minuten davon
wirklich gearbeitet wurde (user
+ sys
)
Daten verarbeiten
Diese Daten aus dem DDFS können wir nun relativ leicht parallelisiert verarbeiten lassen, indem wir das Beispielskript für Map-Reduce ein wenig anpassen, um die n-gram-Dateien zu parsen. Jede Zeile der n-gram-Daten enthält ein Wort, das Jahr, die Anzahl insgesamt, die Anzahl der Seiten, in denen das Wort auftritt, und die Anzahl der Bücher, in denen das Wort auftritt, in genannter Reihenfolge, separiert durch Tabs.
ngram TAB year TAB match_count TAB page_count TAB volume_count NEWLINE
Wenn wir den Mapper ein wenig anpassen, können wir schon die n-grams aufsummieren und uns so die 100 über alle Jahre hinweg häufigsten n-grams ausgeben lassen.
Setzt man show=True
, dann gibt Disco den aktuellen Stand auch gleich
im Konsolenfenster aus. Allerdings war das in meinem Fall keine so gute Idee,
da die Ausgabe nicht zu einem Ende kam, sondern immer im Kreis gelaufen ist.
Wie man sieht, arbeiten tatsächlich mehrere Worker gleichzeitig an der Aufgabe, da die Aufgabe groß genug ist alle verfügbaren.
Die Rechenzeit für obiges Script betrug in meinem Fall beim ersten
Durchlauf 20 Minuten, beim zweiten 60 Minuten und beim letzten Durchlauf
dann mehrere Stunden (jeweils 5 Worker,
da ich den Master wegen des DDFS-Problems ebenfalls aktivieren musste). Ich
vermute, dass die Last im Rechenzentrum größer wurde, da in us-east
noch Morgen war, als ich die ersten Tests startete. Beim
letzten Durchlauf war es in den USA dann 14 Uhr.
Knoten on-the-fly als Worker hinzuzufügen scheint bei Disco nicht möglich zu
sein. Als ich merkte, dass die Berechnung im letzten Durchlauf so lange
dauern würde, startete ich einen neuen Slave, verteilte die SSH-Keys und das
.erlang.cookie
und konnte ihn erfolgreich in den Cluster einfügen
(sichtbar durch eine schwarz hinterlegte Überschrift im Web-Interface), aber
es bleibt weiterhin bei fünf gleichzeitig laufenden Jobs und die Liste
current nodes hat sich nicht verändert.
Es kann jedoch auch sein, dass dies möglich wäre, wenn die Daten ebenfalls auf diesen neuen Slave verteilt wären (denn bei Hadoop und Disco führt Map-Reduce führt die Berechnung möglichst auf dem Knoten aus, wo auch die Daten liegen). Allerdings habe ich in der Dokumentation keinen Aufruf gefunden, mit dem man ein Rebalancing der Daten anstoßen könnte. Re-Replication wird automatisch gemacht, das scheint mir aber nur dann einzutreten, wenn Nodes ausfallen.
Eine Menge DNS-Requests
Auch als alle Rechner in derselben Availability Zone waren, war das Problem mit dem regional traffic nicht gelöst. Ich bin dem Problem auf den Grund gegangen und fand heraus, dass Disco ziemlich viel DNS-Anfragen stellt - pro Node mehrere pro Sekunde. DNS-Anfragen gehen in eine andere Availability Zone.
Als Gegenmaßnahme installieren wir uns dnsmasq auf dem Master. Da wir die
Ubuntu-Maschinen verwenden, müssen wir auch mit resolvconf
umgehen. Außerdem
wollen wir diese Änderungen alle gleich bei der Einrichtung der Maschinen
automatisiert durchführen. Das Paket selbst müssen wir natürlich installieren:
Die Hauptkonfiguration von dnsmasq kommt in /etc/dnsmasq.conf
:
Den AWS-Nameserver müssen wir in der /etc/resolv.dnsmasq.conf
eintragen:
Und dann müssen noch die richtigen Einstellungen für resolvconf
gesetzt
werden in /etc/resolvconf.conf
:
Jetzt müssen diese Änderungen nur noch an den Slaves eintragen, damit
diese auch den Master als Nameserver verwenden. Da ich meine Probleme mit
resolvconf
hatte, habe ich den Symlink einfach gelöscht und mir eine
good ol’ resolv.conf
-Datei angelegt.
Die Master-IP wird vor dem Starten der Slaves in die User-Data der Slaves eingetragen.
Nach diesen Anpassungen werden DNS-Requests nur noch etwa alle 30 Sekunden an den AWS-Nameserver durchgereicht, alle anderen beantwortet dnsmasq aus dem Cache.
Ubuntu-Repository-Server
Wie sich herausstellte, war auch danach mein Problem mit regional traffic nicht gelöst. Nach weiterem Debugging kam ich darauf, dass es an dem Standard-Ubuntu-Repository-Server lag. Dieser ist ebenfalls innerhalb des AWS-Netzwerks angesiedelt. Das bedeutet für mich natürlich bei jedem Installieren von Paketen Traffic in irgendeine Availability-Zone von AWS. Um dieses Problem - auch im Sinne der Ubuntu-Distributoren - sinnvoll zu lösen, habe ich letztlich die fertigen Master- und Slave-Distributionen als neue AMI gesichert und konnte dann direkt diese laden.
I do not maintain a comments section. If you have any questions or comments regarding my posts, please do not hesitate to send me an e-mail to blog@stefan-koch.name.