Table of Contents
Laborator 08
Proiectarea de aplicații distribuite folosind framework-ul Hadoop
Obiective
- înțelegerea modelului implementat de Hadoop pentru procesarea distribuită a unui volum mare de date, folosind clustere formate din mașini fără performanțe distribuite, larg disponibile în comerț (eng. commodity hardware);
- descrierea principalelor module integrate de framework-ul Hadoop;
- cunoașterea produselor care funcționează împreună cu modulele Hadoop, funcționalitatea pe care o pun la dispoziție și oportunitatea folosirii lor;
- configurarea framework-ului Hadoop (HDFS, YARN) și a bazei de date distribuite HBase pe un cluster format dintr-o singură mașină;
- utilizarea paradigmei de programare Map/Reduce pentru procesarea distribuită a unui volum mare de date;
- familiarizarea cu modul în care sunt gestionate sarcinile Map/Reduce în diferite implementări ale framework-ului Hadoop (Map/Reduce “clasic”, respectiv YARN);
Cuvinte Cheie
Hadoop, cluster, commodity hardware, Google File System, Hadoop Distributed File System, YARN, Map-Reduce, HBase, ZooKeeper, Oozie, Pig, Hive, HDFS federation, namenode, datanode, Job Tracker, Task Tracker, Resource Manager, Node Manager, Application Master
Materiale Ajutătoare
Framework-ul Hadoop - aspecte generale
Hadoop este un proiect open-source dezvoltat de Apache care îşi propune realizarea de procesări distribuite a unor seturi de date de dimensiuni mari, rulând pe mai multe clustere, folosind modele de programare simple. Proiectarea acestui framework a fost realizată astfel încât să fie scalabilă chiar şi în situaţia în care sarcinile sunt rulate pe mii de calculatoare, fiecare dintre acestea punând la dispoziţie o anumită capacitate de procesare şi de stocare.
Un programator care proiectează aplicaţii distribuite folosind Hadoop nu trebuie să gestioneze evenimentele care se pot produce la nivelul unei maşini, concentrându-se pe dezvoltarea logicii aplicaţiei, în timp ce partiţionarea datelor şi distribuirea codului vor fi realizate în mod transparent prin intermediul framework-ului, interacţiunea dintre componente fiind definită în cadrul unor interfeţe standardizate.
Hadoop a fost creat de Doug Cutting, cel care realizase şi biblioteca pentru căutări în texte Apache Lucene. Ideea acestui proiect a plecat de la Apache Nutch, un motor de căutare web open-source, lansat în 2002, care nu era însă scalabil pentru miliardele de pagini ce ar fi trebuit indexate spre a oferi rezultate corecte. Pe baza unui articol din 2003 care descria sistemul de fişiere distribuite al celor de la Google (GFS), este lansat Nutch Distributed Filesystem (NDFS) ce rezolva atât problema spaţiului de stocare al fişierelor generate în urma proceselor de indexare a paginilor Internet cât şi sarcinile administrative de gestionare a nodurilor de stocare. Un model similar este urmat şi în cazul Map-Reduce: dacă în 2004 Google publica articolul prin care era prezentat acest concept, implementarea sa în Nutch devenea deja funcţională în 2005. În 2006, odată cu mutarea lui Doug Cutting la Yahoo!, apare noţiunea de Hadoop iar în 2008 acesta era deja folosit pentru motorul de căutare web al Yahoo! folosind un cluster din 10.000 noduri. Apache acordă o importanţă deosebită proiectului, ce devine o prioritate a organizaţiei, după ce acesta devenise cel mai rapid sistem care sortase 1 TB de date (209 secunde, pe un cluster format din 910 noduri). În cadrul aceluiaşi an, recordul avea să fie depăşit de încă două ori: implementarea MapReduce de la Google sortase 1TB de date în 68 de secunde, în timp ce sistemul celor de la Yahoo! reuşise aceeaşi performanţă în numai 62 de secunde. Investiţiile în cercetare destinate acestui proiect devin din ce în ce mai mari, achiziţionându-se clustere ce depăşesc 1000 de noduri pe care sunt încărcate seturi de date ce atingeau ordinul sutelor de terrabytes. Începând cu anul 2010, Hadoop a fost adoptat pe scară largă de organizaţii atât în scopul de a stoca volume mari de date cât şi ca platformă de analiză a acestora. În prezent, Hadoop este folosit de numeroase companii pentru care volumul de date generat zilnic depăşeşte capacităţile de procesare şi stocare specifice sistemelor convenţionale: Adobe, AOL, Amazon.com, EBay, Facebook, Google, LinkedIn, Twitter, Yahoo.
Framework-ul Hadoop include mai multe module:
- Hadoop Common: utilitare de bază care oferă funcţionalităţile pentru celelalte module;
- Hadoop Distributed File System (HDFS): sistem de fişiere distribuite ce pune la dispoziţie un nivel de disponibilitate ridicat la datele utilizate de aplicaţii;
- Hadoop YARN: modul pentru planificarea sarcinilor şi gestiunea resurselor din cadrul unui cluster;
- Hadoop Map-Reduce: sistem bazat pe YARN pentru procesarea paralelă a unor seturi mari de date.
În plus, au fost dezvoltate mai multe produse care pot fi folosite împreună cu Hadoop / HDFS:
- HBase: bază de date distribuită, scalabilă care suportă stocarea informaţiilor structurate pentru tabele de dimensiuni mari; implementată sub forma unor perechi cheie-valoare, foloseşte de obicei sistemul de fişiere distribuit HDFS deşi poate fi folosit şi împreună cu sistemul de fişiere local;
- ZooKeeper: serviciu de coordonare performant pentru aplicaţii distribuite;
- Oozie: modulul pentru gestiunea şi planificarea fluxurilor, coordonând fluxurile Map-Reduce;
- Pig: limbaj de nivel înalt pentru procesarea fluxurilor de date şi mediu de execuţie pentru prelucrări paralele;
- Hive: depozit de date cu interfaţă SQL care oferă sumarizarea datelor şi interogări ad-hoc.
Există mai mulţi producători care pun la dispoziţie distribuţii Hadoop, al căror scop este oferirea unei configuraţii care rezolvă incompatibilităţile dintre diferite produse, prin rularea unor teste de integrare între acestea.
Produsele Hadoop integrate în cele mai multe dintre distribuţii sunt HDFS, MapReduce, HBase, Hive, Mahout, Oozie, Pig, Sqoop, Whirr, ZooKeeper, Flume. De asemenea, proiectul BigTop (dezvoltat de Apache) are rolul de a rula teste de interoperabilitate între componentele Hadoop oferind pachete Linux (RPM şi pachete Debian) pentru o instalare mai facilă.
Distribuţiile sunt realizate în mai multe formate, suportă un set de sisteme de operare şi pot include scripturi suplimentare pentru rularea mediului de lucru.
Între distribuţiile mai cunoscute se numără Cloudera Distribution for Hadoop (CDH), MapR Distribution, Hortonworks Data Platform (HDP), Apache BigTop Distribution, Greenplum HD Data Computing Appliance. Acestea dispun şi de propriile documentaţii (inclusiv wiki) şi oferă utilizatorilor maşini virtuale pe care sunt instalate distribuţiile în cauză.
Versiunile Hadoop întreţinute în prezent sunt 1.2.x, 2.6.x şi 0.23.x (similară cu 2.x, fără nivelul de disponibilitate ridicat pentru nodurile de nume). Îmbunătăţirile aduse în cadrul versiunii 2.6.x sunt:
- implementarea federaţiilor HDFS, ce utilizează mai multe noduri (spaţii) de nume independente (care nu necesită coordonare între ele); nodurile de date sunt utilizate ca spaţii de stocare pentru blocuri de către toate nodurile de nume. Pentru implementarea unei astfel de funcţionalităţi, este necesar ca fiecare nod de date să fie înregistrat de toate nodurile de nume din cluster. Acestea vor transmite rapoarte periodice cu privire la blocurile pe care le stochează, gestionând comenzile care provin de la nodurile de nume.
Din acest motiv, nodurile de date stochează blocuri aparţinând tuturor federaţiilor de blocuri (eng. block pool) din cadrul cluster-ului, fiecare dintre ele fiind gestionate separat de alte astfel de obiecte. O federaţie de blocuri este reprezentată de un set de blocuri ce aparţine unui singur spaţiu de nume. În acest mod, un spaţiu de nume poate genera identificatori pentru blocuri fără a fi necesară coordonarea cu alte spaţii de nume şi în acelaşi timp, erorile produse la nivelul unui nod de nume nu împiedică nodul de date de a lucra cu alte noduri de nume din cluster. Un spaţiu de nume împreună cu federaţia sa de blocuri poartă denumirea de volum al spaţiului de nume, acesta fiind o unitate de gestiune autonomă. Atunci când un nod (spaţiu) de nume este şters, şi federaţia sa de blocuri de la nivelul nodurilor de date este ştearsă. Actualizările de la nivelul volumului unui spaţiu de nume se realizează unitar, atunci când se produce actualizarea unui cluster. Toate nodurile din cadrul unui cluster partajează un identificator al clusterului, generat în mod automat la formatarea unui nod de nume. Un astfel de identificator ar trebui să fie folosit şi pentru formatarea altor noduri de nume în cadrul cluster-ului respectiv.
Beneficiile acestei îmbunătăţiri includ scalabilitatea spaţiului de nume (spaţiul de stocare al clusterului HDFS este caracterizat prin scalabilitate, astfel încât proiectele de dimensiuni mari sau cele care utilizează numeroase fişiere de dimensiuni mai mici pot beneficia de scalabilitatea spaţiului de nume prin adăugarea mai multor noduri de nume la cluster), performanţa (îmbunătăţirea ratei de transfer pentru operaţiile de citire / scriere în cadrul sistemului de fişiere prin adăugarea mai multor noduri de nume în cadrul clusterului) şi izolarea (categorii diferite de aplicaţii şi utilizatori pot fi izolate în cadrul unor spaţii de nume diferite). Într-o arhitectură care foloseşte un singur spaţiu de nume, într-un mediu concurent, o aplicaţie poate suprasolicita spaţiu de nume astfel încât performanţele celorlalte aplicaţii ce utilizează acelaşi spaţiu de nume vor fi afectate.
- implementarea MapReduce NextGen (YARN, MRv2) prin care se împart funcţionalităţile modulului de monitorizare a sarcinilor (eng. Job Tracker) – gestiunea resurselor şi planificarea sarcinilor în procese separate; în acest mod, se creează un modul global de gestiune al resurselor (Resource Manager) şi un modul de bază al aplicaţiei (Application Master) pentru fiecare aplicaţie în parte. O aplicaţie este formată fie dintr-o singură sarcină (din punctul de vedere al sarcinilor de tip MapReduce) sau conţine un graf aciclic de sarcini. Modulul de gestiune al resurselor împreună cu modulul de gestiune al nodurilor (Node Manager) formează cadrul de calcul al sarcinilor. Modulul de gestiune al resurselor este autoritatea care arbitrează modul de distribuire al resurselor pentru aplicaţiile din sistem. Modulul de gestiune pentru aplicaţii este de fapt o bibliotecă ce negociază resursele primite de la modulul de gestiune al resurselor colaborând cu modulele de gestiune a nodurilor pentru a executa şi pentru a monitoriza sarcinile.
Modulul pentru gestiunea resurselor cuprinde două componente de bază: modulul de planificare şi modulul de gestiune al aplicaţiilor.
Modulul de planificare este responsabil pentru alocarea resurselor către diferite aplicaţii, caracterizate prin constrângeri referitoare la capacitate, cozi. Acesta nu oferă însă nici o garanţie în privinţa repornirii sarcinilor eşuate datorită erorilor produse la nivel hardware sau software şi nici nu se ocupă cu monitorizarea stării aplicaţiilor. Planificarea se realizează în funcţie de cerinţele pentru resurse ale aplicaţiilor, specificate de noţiunea abstractă container care cuprinde elemente ca memorie, procesor, disc, reţea. În versiunile anterioare ale Hadoop, singurul criteriu luat în considerare de planificator era reprezentat de memorie. Modulul de planificare suportă extensii programabile (precum CapacityScheduler sau FairScheduler) responsabile cu partiţionarea clusterului între diferite aplicaţii. Extensia programabilă CapacityScheduler suportă cozi ierarhice pentru a permite partajarea resurselor clusterului în funcţie de previziuni.
Modulul de gestiune al aplicaţiilor se ocupă cu acceptarea sarcinilor transmise, negociind container-ul pentru executarea modulului de bază al aplicaţiei, oferind servicii pentru repornirea container-ului acestuia în situaţia în care s-au produs erori.
Modulul de gestiune al nodurilor este un cadru implementat pe fiecare maşină în parte, responsabil pentru containere, monitorizându-le utilizarea resurselor (procesor, memorie, disc, reţea) şi raportând aceste valori la modulul de gestiune al resurselor şi la modulul de planificare.
Modulul de bază al aplicaţiei are rolul de a negocia containerele de resurse corespunzătoare de la modulul de planificare, monitorizându-le starea.
Funcţionalităţile implementate de diferitele versiuni Hadoop, cu privire la HDFS şi MapReduce pot fi sintetizate astfel:
Funcţionalitate | 1.2.x | 0.23.x | 2.6.x |
---|---|---|---|
autentificare sigură | da | nu | da |
nume de configurare vechi | da | învechite | învechite |
nume de configurare noi | nu | da | da |
API MapReduce vechi | da | da | da |
API MapReduce nou | da (biblioteci lipsă) | da | da |
cadru de rulare MapReduce 1 (clasic) | da | da | nu |
cadru de rulare MapReduce 2 (YARN) | nu | nu | da |
federaţii HDFS | nu | nu | da |
nivel de disponibilitate HDFS ridicat | nu | nu | da |
Decizia de a utiliza Hadoop împreună cu HDFS şi MapReduce trebuie să se bazeze pe adecvarea unei anumite probleme la acest framework. Astfel, aplicaţiile dezvoltate folosind această tehnologie implică procesarea unui volum foarte mare de date care poate fi realizată în paralel, ceea ce presupune independenţa acestora unele de altele. Situaţia în care datele sunt dependente unele de altele poate fi rezolvată prin iterare (rularea mai multor sarcini MapReduce, în care datele de ieşire ale unora reprezintă datele de intrare pentru celelalte) sau prin partajare (folosind o bază de date distribuită şi nu variabile distribuite întrucât acestea există doar în contextul unei singure maşini virtuale, neputând fi accesate şi din contextul altor noduri sau clustere).
Configurarea Mediului de Lucru
Paradigma de Programare Map Reduce
MapReduce reprezintă un model de progamare destinat procesării de date pe un număr foarte mare de noduri, reprezentate de maşini disponibile în comerţ, fără performanţe deosebite (eng. commodity hardware). Este inspirat din programarea funcţională de unde sunt preluate funcţiile map şi reduce, putând fi implementat în limbaje de programare ca Java, C++, Python sau Ruby. Un astfel de model este util mai ales pentru prelucrarea unor seturi de date (semistructurate şi orientate pe înregistrări) de dimensiuni foarte mari, în care procesarea paralelă este singura soluţie pentru obţinerea unor rezultate într-un interval de timp acceptabil.
MapReduce se bazează pe împărţirea procesării în 2 etape: map şi reduce, fiecare primind ca date de intrare o pereche cheie-valoare (al căror tipuri poate fi stabilită de programator) şi întorcând ca rezultat tot o pereche cheie-valoare.
combine: (K2, list(V2)) → list(K2, V2)
De cele mai multe ori, funcţiile de combinare şi reducere se confundă.
Valorile care sunt procesate de programul MapReduce sunt partiţionate în funcţie de cheile care le caracterizează şi distribuite nodurilor care aplică funcţia de mapare, în urma căreia se generează o listă (intermediară) de valori, fiecare dintre ele având asociată şi o cheie. Acestea sunt sortate şi grupate în funcţie de cheie (toate valorile care au aceeaşi cheie sunt concatenate într-o singură listă), astfel încât funcţia de reducere primeşte mai puţine perechi cheie-valoare (pentru fiecare cheie unică există o listă de valori generate anterior) obţinându-se o listă de rezultate compusă din perechi chei-valoare.
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
Gestiunea Sarcinilor Map/Reduce în Implementări ale Framework-ului Hadoop
Framework-ul Hadoop plasează sarcinile (în urma partiţionării) pe nodurile care găzduiesc segmentele de date care trebuie să fie procesate de acea sarcină specifică (codul este mutat acolo unde se găsesc datele).
Fluxurile de intrare şi ieşire sunt coordonate de framework-ul Hadoop, acesta realizând şi sincronizarea (prin intermediul unei bariere) înainte de procesul de sortare şi grupare prin care datele intermediare sunt distribuite către alte maşini.
Erorile sunt considerate un eveniment probabil să se întâmple, astfel că sarcinile care se găsesc într-o astfel de situaţie vor fi executate din nou, însă pe alte maşini.
Execuţia unei sarcini MapReduce se realizează prin apelul metodei submit()
pe un obiect de tip Job
care abstractizează procesarea ce se doreşte a fi realizată.
waitForJobCompetion()
care transmite sarcina dacă aceasta nu a fost transmisă deja, aşteptând să fie terminată.
Implementarea aplicaţiei utilizând această paradigmă de programare depinde de parametrii de configurare specificaţi: mapred.job.tracker
este folosit pentru versiunile Hadoop mai vechi, folosind un mecanism MapReduce “clasic”, în timp ce mapreduce.framework.name
reprezintă proprietatea utilizată pentru versiunile Hadoop mai noi, care implementează YARN.
mapreduce.framework.name
sunt:local
(în caz că se doreşte ca sarcina să fie rulată local, în cadrul unei singure maşini virtuale Java);classic
(pentru implementarea MapReduce “clasică”, folosind proceseleJobTracker
şiTaskTrackers
);yarn
(pentru implmentarea ce utilizează YARN).
În cazul implementării MapReduce “clasice” sunt implicate mai multe entităţi:
- clientul, care transmite sarcina de tip MapReduce;
- un proces pentru monitorizarea sarcinii (eng. Job Tracker) care coordonează rularea acesteia;
JobTracker
.
- mai multe procese pentru monitorizarea părţilor în care a fost împărţită sarcina (eng. Task Tracker);
TaskTracker
.
- sistemul distribuit de fişiere (de obicei HDFS), utilizat pentru partajarea fişierelor între aceste entităţi.
Etapele urmate la crearea unei sarcini de tip MapReduce sunt următoarele:
1. metoda submit()
apelată pe obiectul de tip Job
creează o instanţă a unui obiect de tip JobSummiter
pentru care apelează metoda submitJobInternal()
; ulterior, metoda waitForJobCompletion()
verifică progresul sarcinii o dată pe secundă, raportând starea acesteia (în situaţia în care s-a modificat de la ultimul raport); când sarcina este încheiată cu succes se afişează statisticile sarcinii, altfel este afişată eroarea care a determinat eşecul sarcinii.
2. procesul de transmitere al sarcinii (reprezentat de obiectul instanţă al clasei JobSummitter
) solicită procesului de urmărire al sarcinii (JobTracker
) un nou identificator pentru sarcina respectivă (prin apelul metodei getNewJobId()
); ulterior, sunt verificate specificaţiile privind datele de ieşire şi datele de intrare pentru sarcina în cauză; dacă directorul unde trebuie plasate datele de ieşire nu a fost specificat sau există deja, respectiv nu pot fi determinate partiţiile pentru datele de intrare (datorită faptului că acestea nu au fost specificate sau nu există) sarcina nu este transmisă, generându-se în schimb o eroare; dacă specificaţiile privind operaţiile de intrare/ieşire sunt corecte, sunt determinate partiţiile care vor fi prelucrate de fiecare parte a sarcinii;
3. procesul de transmitere al sarcinii copiază din sistemul distribuit de fişiere resursele necesare pentru rularea sarcinii (inclusiv fişierul .jar care conţine codul corespunzător sarcinii, fişierul de configurare şi partiţiile datelor de intrare) în sistemul de fişiere al procesului de urmărire al sarcinii, într-un director având numele dat de identificatorul sarcinii;
mapred.submit.replication
a cărei valoare implicită este 10), astfel încât vor exista suficient de multe copii în cadrul clusterului ce vor putea fi accesate de procesele ce prelucrează porţiuni ale sarcinii atunci când se află în execuţie.
4. procesul de transmitere al sarcinii informează procesul de urmărire a sarcinii de faptul că aceasta poate fi executată, prin apelul metodei submitJob()
pe obiectul instanţă a clasei JobTracker
;
5. atunci când obiectul instanţă a clasei JobTracker
primeşte un apel al metodei submitJob()
, pune sarcina respectivă într-o coadă internă de unde îl va prelua planificatorul de sarcini pentru a-l iniţializa; procesul de iniţializare presupune crearea unui obiect reprezentând sarcina care va fi executată, conţinând partiţiile care vor fi rulate pe noduri diferite, precum şi informaţii referitoare la progresul acestora;
6. pentru a crea lista partiţiilor sarcinii, planificatorul de sarcini primeşte datele de intrare, defalcate pe secţiuni, aşa cum au fost împărţite de client în urma primirii acestora de la sistemul distribuit de fişiere; pentru fiecare secţiune este creată o partiţie a sarcinii (operaţie de tip map
), în timp ce numărul de operaţii de tip reduce
este determinat din proprietatea mapred.reduce.task
a obiectului Job
(fiind stabilit de metoda setNumReduceTasks()
) creându-se un număr corespunzător de procese care le vor executa; fiecare partiţie a sarcinii (de tip map sau reduce) va putea fi urmărită printr-un identificator unic; în plus, vor mai fi create încă două procese, pentru configurarea (eng. setup), respectiv eliberarea resurselor (eng. cleanup) asociate partiţiei sarcinii, ele fiind rulate înainte, respectiv după ce codul asociat partiţiei sarcinii este executat; pentru fiecare sarcină există un obiect de tip OutputCommitter
ce determină codul corespunzător fiecărei sarcini în parte;
OutputCommitter
are tipul FileOutputCommitter
; în cazul procesului de configurare, acesta va crea directorul în care vor fi plasate datele de ieşire precum şi directoarele de lucru (temporare), în timp ce pentru procesul de eliberare al resurselor, acesta va şterge directoarele de lucru (temporare), transmiţând în acelaşi timp rezultatul sarcinii, ceea ce pentru procesele care realizează operaţii pe fişiere se traduce în scrierea datelor de ieşire la locaţia corespunzătoare sarcinii respective (în cazul în care este activată execuţia speculativă, se asigură faptul că vor fi transmise numai rezultatele provenind de la sarcinile duplicate, celelalte fiind ignorate).
7. procesele de urmărire a partiţiilor în care este împărţită o sarcină rulează o buclă care transmite mesaje periodice (eng. heartbeat) procesului ce urmăreşte sarcina prin care acesta este anunţat asupra stării sale de funcţionare; informaţiile cuprinse în aceste mesaje sunt disponibilitatea de a rula o nouă partiţie a sarcinii, caz în care, ca răspuns la acest mesaj, procesul de urmărire a sarcinii îi va aloca una, după o selecţie corespunzătoare, potrivit algoritmului de planificare pe care îl implementează; iniţial este selectată sarcina de unde va fi aleasă o partiţie pentru a fi repartizată procesului de monitorizare corespunzător, algoritmul implicit bazându-se pe asocierea unor priorităţi pentru fiecare sarcină în parte, de unde vor fi alese apoi partiţiile; fiecare proces pentru monitorizarea partiţiei unei sarcini dispune de un număr fix de sarcini de tip map respectiv de tip reduce pe care le pot rula simultan; în distribuirea sarcinilor de tip map se are în vedere localizarea (în reţea) nodului pe care rulează procesul ce urmează să realizeze procesarea, alegându-se unele pentru care setul de date se găseşte cât mai aproape; o astfel de constrângere nu este luată în calcul pentru sarcinile de tip reduce;
8. după ce procesului de monitorizare a partiţiei sarcinii i-a fost alocată sarcina pe care trebuie să o ruleze, acesta îşi va copia arhiva .jar conţinând codul care va fi executat din contextul sistemului distribuit de fişiere în sistemul de fişiere local precum şi alte fişiere necesare din zona de memorie tampon distribuită; ulterior, creează un director temporar pentru sarcină, dezarhivând conţinutul fişierului .jar, acesta fiind rulat în contextul unui obiect de tipul TaskRunner
, instanţiat în acest scop;
9. obiectul de tipul TaskRunner
lansează o maşină virtuală Java nouă;
10. fiecare sarcină va fi rulată în contextul acestei maşini virtuale Java, astfel că orice erori sau funcţiile de tip map/reduce definite de utilizator nu vor afecta procesul de monitorizare a partiţiei sarcinii, implicând comportamente nedorite (încetarea funcţionării); fiecare partiţie a sarcinii comunică cu procesul părinte prin interfaţa ombilicală, informându-l de progresul său la fiecare câteva secunde până când aceasta este terminată.
Întrucât sarcinile de tip MapReduce durează o perioadă mai mare de timp, este important ca utilizator să aibă informaţii cu privire la progresul acestora. Astfel, fiecare sarcină şi partiţiile sale sunt caracterizate printr-o stare (în execuţie, terminată cu succes, eşuată din cauza unei erori), progresul operaţiilor de tip map şi reduce, valorile contoarelor asociate sarcinii în cauză precum şi un mesaj (descriere) asociat stării respective. Progresul, adică proporţia din sarcină care a fost executată deja (dacă poate fi determinată), este comunicată periodic clientului.
Un progres înregistrat de o partiţie a sarcinii este transmisă procesului care o monitorizează prin intermediul unui semafor (eng. flag). Acesta e verificat de un fir de execuţie dedicat la fiecare trei secunde şi în condiţiile în care valoarea sa o impune, procesul de monitorizare a partiţiei sarcinii este informat de progresul realizat. De asemenea, în mesajele transmise către procesul de monitorizare al sarcinii sunt transmise şi date cu privire la progresul înregistrat de toate partiţiile sarcinii monitorizate în mod curent.
Valorile referitoare la progres sunt agregate de procesul de monitorizare al sarcinii, rezultând o viziune de ansamblu asupra tuturor sarcinilor rulate la un anumit moment de timp şi asupra partiţiilor constituente. Ulterior obiectul de tip Job
primeşte cel mai recent statut interogând procesul de monitorizare al sarcinii în fiecare secundă.
getStatus()
a obiectului de tip Job
, obţinând o instanţă a unui obiect de tip JobStatus
, care conţine toate informaţiile cu privire la starea sarcinii.
Când procesul de monitorizare a sarcinii primeşte notificări provenind de la toate partiţiile sarcinii cu privire la încheierea acestora (inclusiv sarcina pentru eliberarea resurselor), va modifica statutul asociat acesteia ca fiind “terminată cu succes”, acesta fiind transmis şi obiectului de tip Job
, care informează la rândul său utilizatorul cu privire la acest fapt, părăsind metoda waitForCompletion()
. Toate statisticile şi contoarele cu privire la sarcină sunt afişate la consolă în acest moment. La final, procesul de monitorizare a sarcinii eliberează toate resursele utilizate temporar instruind şi procesele care gestionează partiţiile sarcinii să acţioneze în acelaşi sens.
job.end.notification.url
de către clienţii care doresc să primească astfel de informaţii prin intermediul unor apeluri inverse (eng. callback).
Întrucât pentru clustere de dimensiuni foarte mari (peste 4000 de noduri) sistemul MapReduce “clasic” are probleme de scalabilitate, a fost implementat mecanismul YARN (Yet Another Resource Negociator / YARN Application Resource Negociator) care rezolvă astfel de inconveniente, separând responsabilităţile procesului de monitorizare a sarcinilor (planificarea sarcinii cât şi monitorizarea progresului realizat de partiţiile asociate acesteia) în două procese independente:
- un proces pentru gestiunea resurselor în cadrul clusterului;
- un proces pentru gestiunea aplicaţiilor pe parcursul ciclului lor de viaţă.
Aceste procese negociază căror noduri (containere) le vor fi alocate anumite programe, respectându-se constrângerile cu privire la capacitatea lor de procesare şi referitoare la memoria disponibilă. Verificarea acestor restricţii este realizată de procesul de gestiune al nodului, ce asigură faptul că nu sunt utilizate mai multe resurse decât au fost alocate.
YARN este mai generic decât MapReduce, aceasta reprezentând doar un tip de aplicaţie ce poate fi rulată folosind acest mecanism, flexibilitatea sa constând în coexistenţa (în cadrul aceluiaşi cluster) al unor programe având tipuri diferite (de exemplu MPI sau un shell distribuit), ceea ce implică beneficii pentru gestiunea şi utilizarea resurselor respective. De asemenea, utilizatorii pot rula diferite versiuni ale MapReduce, astfel că actualizarea de la o versiune la alta se poate realiza foarte uşor.
Entităţile implicate la rularea unei aplicaţii de tip MapReduce folosind YARN sunt:
- clientul, care transmite sarcina de tip MapReduce;
- procesul pentru gestiunea resurselor (eng. Resource Manager) care coordonează alocarea resurselor de procesare pe cluster;
- procesele pentru gestiunea nodurilor (eng. Node Manager) care lansează în execuţie şi monitorizează containerele în cadrul maşinilor din cluster;
- procesul de gestiune a aplicaţiilor (eng. Application Master) care va coordona partiţiile sarcinilor;
- sistemul distribuit de fişiere (de obicei HDFS), utilizat pentru partajarea fişierelor între aceste entităţi.
Etapele urmate la crearea unei sarcini de tip MapReduce în cadrul YARN sunt următoarele:
1. transmiterea sarcinii se realizează folosind acelaşi API ca în cazul mecanismului MapReduce clasic, deşi există şi o implementare a ClientProtocol
care este activată atunci când proprietatea mapreduce.framework.name
are valoarea yarn
în care procesul de transmitere a sarcinii este destul de similar cu cel al mecanismului MapReduce clasic;
2. identificatorul sarcinii este transmis de la procesul de gestiune a resurselor (faţă de cazul mecanismului MapReduce clasic în care acesta era primit de la procesul de monitorizare a sarcinii);
3. clientul verifică specificaţiile sarcinii în privinţa locaţiei unde urmează să fie plasate rezultatele şi calculează partiţiile datelor de intrare, copiind resursele specifice sarcinii (fişierul .jar care conţine codul corespunzător sarcinii, fişierul de configurare dar şi partiţiile datelor de intrare) în sistemul distribuit de fişiere HDFS;
yarn.app.mapreduce.am.compute-splits-in-cluster
face ca partiţiile datelor de intrare să fie calculate în cadrul clusterului, ceea ce este util mai ales în cazul sarcinilor care au un număr mare de partiţii ale datelor de intrare.
4. sarcina este transmisă prin apelul metodei submitApplication()
în contextul procesului de gestiune a resurelor;
5. când procesul de gestiune al resurselor primeşte acest apel, transmite mai departe această cerere către procesul de planificare al sarcinilor care îi alocă un container astfel încât procesul specific aplicaţiei va fi lansat (de către procesul de gestiune al resurselor) în acest context, fiind supervizat de către procesul de gestiune a nodului respectiv;
6. procesul specific al aplicaţiei pentru sarcini de tip MapReduce
este de fapt un program Java având clasa principală MRAppMaster
; acesta iniţializează sarcina în cauză creând un număr de obiecte care vor monitoriza progresul sarcinii, primind rapoarte în acest sens de la partiţiile în care va fi împărţită sarcina;
7. procesul specific al aplicaţiei preia partiţiile datelor de intrare determinate de client din cadrul sistemului distribuit de fişiere, creând apoi o partiţie a sarcinii (aplicaţie de tip map) pentru fiecare dintre acestea dar şi un număr de procese de tip reduce (conform proprietăţii mapreduce.job.reduce
); ulterior, procesul specific al aplicaţiei decide modul în care va rula partiţiile care constituie sarcina de tip MapReduce: dacă aceasta este de dimensiuni mici, partiţiile vor fi rulate în aceeaşi maşină virtuală Java ca şi el însuşi - o astfel de sarcină se spune că rulează ca o super-partiţie; o sarcină este considerată ca fiind de dimensiuni mici dacă poate fi partiţionată în mai puţin de 10 procese de tip map şi un singur proces de tip reduce, iar dimensiunea fişierului de intrare este mai mică decât un bloc HDFS; înainte ca orice partiţie a sarcinii să fie rulată, trebuie apelată metoda de configurare (o instanţă a obiectului de tip OutputCommitter
corespunzător sarcinii) pentru a se crea directorul în care vor fi plasate rezultatele acesteia;
mapreduce.job.ubertask.enable
care va avea valoarea false
.
mapreduce.job.ubertask.maxmaps
, respectivmapreduce.job.ubertask.maxreduces
, care indică numărul de procese;mapreduce.job.ubertask.maxbytes
, care specifică dimensiunea unui bloc.
8. în cazul în care sarcina nu întruneşte condiţiile pentru a rula ca super-partiţie, procesul specific al aplicaţiei solicită containere pentru toate procesele de tip map şi reduce din cadrul sarcinii de la modulul de gestiune a resurselor; cererile sunt incluse în cadrul rapoartelor periodice, conţinând informaţii cu privire la locaţia unde se găsesc informaţiile corespunzătoare fiecărei partiţii de tip map (nodurile şi rastelurile pe care se găsesc acestea), acestea fiind folosite de procesul de planificare în luarea deciziilor, încercându-se plasarea sarcinilor aferente în cadrul aceleiaşi maşini ca şi datele sau, în situaţia în care nu este posibil, cel puţin pe acelaşi rastel;
mapreduce.map.memory.mb
, respectiv mapreduce.reduce.memory.mb
. Modul în care este alocată memoria este diferit de mecanismul MapReduce tradițional, în care procesele de monitorizare a partiţiilor sarcinii dispuneau de un număr fix de sarcini de tip map sau de tip reduce pe care le puteau rula în paralel, stabilit la momentul configurării clusterului; fiecare sarcină putea dispune de o valoare maximă a memoriei (de asemenea având o valoare fixă pentru un cluster), existând probleme atât dacă aceasta este subutilizată (întrucât alte partiţii ale sarcinii, aflate în aşteptare, nu pot utiliza memoria nefolosită), cât şi dacă este suprautilizată (conducând la eşuarea sarcinii, de vreme ce aceasta nu dispune de memorie suficientă pentru a putea fi terminată). O astfel de problemă este rezolvată în YARN prin faptul că aplicaţiile pot solicita o anumită valoare a memoriei, ce se află între o valoare minimă şi o valoare maximă (cu condiţia de a fi un multiplu al valorii minime care poate fi alocată). Fiecare proces de planificare are valori implicite de alocare a memoriei, valoarea minimă fiind de 1024 MB (stabilită de yarn.scheduler.capacity.minimum-allocation-mb
), iar valorea maximă de 10240 MB (stabilită de yarn.scheduler.capacity.maximum-allocation-mb
). Astfel, partiţiile sarcinilor pot solicita orice alocare de memorie cuprinsă între 1GB şi 10GB, însă ca multiplu de 1GB (procesul de planificare a sarcinii va realiza rotunjiri la cea mai apropiată valoare, dacă este necesar).
9. după ce unei partiţii a sarcinii i-a fost alocat un container de către procesul de planificare a modulului de gestiune a resurselor, procesul specific al aplicaţiei porneşte acest container, contactând procesul de gestiune a nodului;
10. partiţia sarcinii este executată de o aplicaţie Java a cărei clasă principală este YarnChild
; înainte de a rula partiţia sarcinii, va localiza resursele de care aceasta are nevoie (inclusiv configuraţia sarcinii şi arhiva .jar conţinând codul propriu-zis al sarcinii precum şi alte fişiere din cadrul zonei tampon de memorie aflate în cadrul sistemului distribuit de fişiere);
YarnChild
rulează într-o maşină virtuală Java dedicată, din acelaşi motiv pentru care în cazul mecanismului MapReduce “clasic” procesele de monitorizare a partiţiei sarcinii creează noi maşini virtuale Java pentru a rula aplicaţiile corespunzătoare – izolarea codului utilizator de firele de execuţie ale sistemului de operare care rulează în fundal, având asociate sarcini care durează foarte mult. Spre diferenţă de implementarea MapReduce “clasică”, YARN nu suportă reutilizarea maşinilor virtuale Java, astfel încât fiecare partiţie a sarcinii va rula într-o nouă maşină virtuală Java.
11. containerul rulează partiţia sarcinii (de tip map sau reduce).
În cazul Yarn, partiţia sarcinii îşi raportează starea şi progresul (inclusiv valoarea contoarelor) la procesul specific al aplicaţiei, care dispune astfel de o viziune de asamblu a sarcinii.
La rândul său, procesul specific al aplicației e interogat de client la fiecare secundă (sau în intervalul stabilit de proprietatea mapreduce.client.progressmonitor.pollinterval
). Totodată, interfaţa grafică a procesului de gestiune a resurselor afişează toate aplicaţiile care se află în execuţie, împreună cu legături către interfeţele grafice ale proceselor specifice aplicaţiilor respective, care conţin detalii despre sarcina MapReduce, inclusiv progresul acesteia.
În acelaşi mod în care clientul interoghează procesul specific al aplicaţiei pentru a determina progresul acesteia, acesta verifică (la fiecare cinci secunde) dacă sarcina a fost terminată, printr-un apel al metodei waitForCompletion()
pe obiectul de tip Job
. Există posibilitatea ca notificarea cu privire la terminare să se realizeze prin apeluri inverse folosind protocolul HTTP, însă acestea sunt iniţiate de procesul specific al aplicaţiei.
mapreduce.client.completion.pollinterval
, în fişierul de configurare.
La terminarea sarcinii, procesul specific al aplicaţiei şi containerele în care sunt rulate partiţiile acesteia eliberează resursele utilizate, printr-un apel al metodei corespunzătoare din cadrul obiectului OutputCommitter
. Informaţiile cu privire la sarcină sunt arhivate de către serverul ce reţine istoricul acestora, pentru a permite interogări ulterioare din partea clienţilor, în caz de nevoie.
Tratarea Erorilor
Unul dintre principalele avantaje ale Hadoop constă în posibilitatea de recuperare din eroare, situaţie întâlnită cu o frecvenţă destul de mare şi datorată greşelilor din codul sursă al utilizatorilor, blocajelor în cadrul proceselor şi defectării maşinilor pe care rulează acestea.
În cazul implementării MapReduce “clasice” defectele se pot produce la nivelul:
- unei partiţii a sarcinii;
- procesului ce o monitorizează;
- procesului care monitorizează sarcina la nivel global.
a) Cel mai frecvent, erorile de la nivelul unei partiţii a sarcinii se datorează greşelilor din codul sursă, generându-se o excepţie la rulare care este raportată procesului care o monitorizează, aceasta fiind înregistrată şi în cadrul jurnalelor. Procesul de monitorizare a partiţiei sarcinii marchează această încercare ca fiind eşuată, eliberând poziţia ocupată de aceasta pentru ca în cadrul acesteia să fie rulată o altă partiţie a sarcinii. În cazul în care maşina virtuală Java îşi încetează activitatea brusc (datorită unor probleme ale acesteia pentru setul de date al partiţiei respective a sarcinii), procesul de monitorizare va detecta faptul că partiţia sarcinii nu mai rulează şi marchează încercarea de execuţie ca fiind eşuată. În situaţia în care partiţiile sarcinilor sunt suspendate în execuţie, procesul de monitorizare detectează că nu a primit o actualizare a progresului pentru o perioadă de timp mai îndelungată, declanşând procesul de marcare a încercării de rulare a partiţiei sarcinii ca eşuată şi determinând închiderea maşinii virtuale Java care o rula.
mapred.task.timeout
a cărei valoare este exprimată în milisecunde. Indicarea unei valori nule pentru această proprietate dezactivează mecanismul de expirare, astfel că sarcinile a căror execuţie durează mai mult nu vor fi marcate niciodată ca fiind eşuate. Într-un astfel de caz, o partiţie a sarcinii suspendată în execuţie nu îşi va elibera niciodată poziţia, existând posibilitatea de a se înregistra încetinerea globală a cluster-ului. De aceea, acest comportament trebuie evitat, asigurându-se că partiţiile sarcinilor îşi raportează progresul.
Când procesul de monitorizare a partiţiei sarcinii este notificat de eşecul unei încercări (prin intermediul rapoartelor periodice), va replanifica execuţia ei, încercându-se să se evite situaţia în care rularea va fi realizată în contexul unui proces de monitorizare în care a eşuat anterior. Mai mult, în cazul în care o partiţie a sarcinii eşuează de mai mult de patru ori, ea nu va fi reexecutată. Implicit, dacă orice partiţie a sarcinii eşuează de mai mult de patru ori (sau de valoarea maximă configurată pentru numărul de încercări), întreaga sarcină va eşua. Pentru unele aplicaţii, nu este de dorit ca sacina să eşueze din cauza unor partiţii ale sale care au înregistrat erori, întrucât este posibil ca rezultatele furnizate de partiţiile terminate cu succes să poată fi folosite, motiv pentru care se poate configura proporția de partiţii ale sarcinii care pot să eşueze fără a declanşa eşuarea sarcinii. O partiţie a sarcinii poate fi oprită în mod forţat, însă astfel de încercări nu sunt contorizate în numărul maxim de lansare în execuţie, de vreme ce eroarea nu s-a produs din cauza partiţiei sarcinii. Şi utilizatorii au posibilitatea de a termina încercări ale partiţiilor sarcinilor (sau chiar sarcina în totalitate) folosind interfaţa grafică sau linia de comandă.
mapred.map.max.attempts
pentru partiţii ale sarcinii de tip map, respectiv de proprietatea mapred.reduce.max.attempts
pentru partiţii ale sarcinii de tip reduce.
mapred.max.map.failures.percent
, respectiv mapred.max.reduce.failures.percent
.
b) Erorile produse la nivelul procesului de monitorizare a partiţiei sarcinii reprezintă altă situaţie care determină eşecul sarcinii. Într-un astfel de caz, procesul de monitorizare a sarcinii va detecta că acesta nu mai funcţionează dacă mesajele periodice întârzie de a fi transmise mai mult de 10 minute, eliminându-l din lista sa, planificând partiţiile sarcinilor care au rulat pe acesta pe alte procese de monitorizare, chiar în situaţia în care au fost terminate cu succes, dacă aparţin unor sarcini incomplete (întrucât valorile de ieşire intermediare se găsesc în sistemul de fişiere al procesului de monitorizare defect astfel încât pot să nu fie accesibile). Un proces de monitorizare a partiţiei sarcinii poate fi inclus de asemenea într-o “listă neagră” de către procesul de urmărire a sarcinii, dacă mai mult de patru partiţii din cadrul aceleiaşi sarcini au eşuat rulând pe acesta. Unui astfel de proces nu i se vor mai repartiza partiţii ale sarcinii, continuând însă să comunice cu procesul de monitorizare a sarcinii. Există implementat un mecanism de expirare a erorilor (cu o rată de una pe zi), astfel încât procesele de monitorizare a partiţiei sarcinii pot fi scoase din cadrul “listei negre” doar prin faptul că rulează în continuare. În situaţia în care erorile au fost produse de o situaţie care poate fi remediată, acesta va fi eliminat din “lista neagră” după ce este repornit şi reintegrat în cadrul clusterului.
mapred.task.tracker.expiry.interval
, care specifică o valoare exprimată în milisecunde.
mapred.max.tracker.blacklists
, acesta fiind un prag mai mare decât media de erori înregistrate de procesele de monitorizare pentru partiţiile sarcinii din cadrul unui cluster.
c) Producerea unei erori la nivelul procesului de monitorizare a sarcinii reprezintă situaţia cea mai gravă care poate conduce la eşecul rulării sarcinii. Fiind singurul punct vulnerabil din cadrul arhitecturii, Hadoop nu dispune de un mecanism de recuperare dintr-o astfel de situaţie, astfel încât toate sarcinile care rulează la un moment dat vor eşua. După repornirea procesului de monitorizare a sarcinii, toate sarcinile aflate în execuţie la momentul producerii erorii respective vor trebui să fie retransmise.
mapred.jobtracker.restart.recover
), însă au fost raportate anumite probleme în funcţionarea sa, astfel încât această opţiune nu ar trebui folosită.
În cazul mecanismului YARN, la rularea unor aplicaţii MapReduce se pot produce erori la nivelul:
- partiţiei sarcinii;
- procesului specific al aplicaţiei;
- procesului de gestiune a nodului;
- procesului de gestiune al resurselor.
a) Erorile produse pentru o partiţie a sarcinii sunt tratate într-un mod asemănător cu cazul MapReduce “clasic”. Excepţiile la rulare precum şi terminarea bruscă a maşinii virtuale Java sunt propagate până la procesul specific al aplicaţiei, şi încercarea de rulare a partiţiei sarcinii este marcată ca fiind eşuată. De asemenea, procesele a căror execuţie durează prea mult (care sunt blocate) sunt detectate prin absenţa transmiterii unui mesaj periodic (ping) prin interfaţa ombilicală. Se pot folosi aceleaşi proprietăţi de configurare pentru a indica eşecul definitiv al unei partiţii a sarcinii sau al întregii aplicaţii.
mapreduce.task.timeout
.
mapreduce.map.maxattempts
pentru sarcini de tip map şi mapreduce.reduce.maxattempts
pentru sarcini de tip reduce. De asemenea, o sarcină în ansamblu va fi marcată ca fiind eşuată în situaţia în care eşuează mai mult de un anumit procent de partiţii ale sale de tip map (indicat de mapreduce.map.failures.maxpercent
) sau un anumit procent de partiţii ale sale de tip reduce (indicat de mapreduce.reduce.failures.maxpercent
).
b) Şi în cazul aplicaţiilor care eşuează, sunt realizate mai multe încercări. Acestea sunt detectate în situaţia în care procesul specific al aplicaţiei nu mai transmite mesaje periodice către procesul de gestiune a resurselor, situaţie în care va fi creată o nouă instanţă a procesului specific al aplicaţiei, însă într-un alt container, supervizat de un proces de gestiune a nodului. Partiţiile sarcinii care au fost deja rulate pot fi recuperate în acest caz, astfel că nu este necesară execuţia lor din nou. Întrucât clientul solicită procesului specific al aplicaţiei rapoarte cu privire la progresul înregistrat, în situaţia în care acesta eşuează şi nu mai transmite nici o informaţie, clientul va interoga procesul de gestiune al resurselor pentru a primi adresa noii instanţe a procesului specific al aplicaţiei (aşa cum îi fusese transmisă şi la iniţializarea sarcinii, când a fost transmisă).
yarn.resourcemanager.am.max-retries
.
yarn.app.mapreduce.am.job.recovery.enable
), astfel că procesul specific al aplicaţiei va rula din nou toate partiţiile sarcinilor, indiferent de starea anterioară.
c) Dacă se înregistrează o eroare la nivelul procesului de gestiune a nodului, acesta va înceta să mai transmită mesaje periodice către procesul de gestiune a resurselor, care îl va elimina din lista sa de noduri disponibile. Orice partiţie a sarcinii sau proces specific al aplicaţiei care rulează pe nodul marcat ca fiind eşuat vor fi recuperate. Procesele de gestiune a nodurilor pot fi incluse pe “lista neagră” în situaţia în care numărul de erori al aplicaţiilor rulate în contextul său este mare. Acestă operaţie este realizată de procesul specific al aplicaţiei (care va planifica partiţiile sarcinilor pe alte noduri) în cazul în care este depăşit un anumit prag minim de partiţii ale sarcinilor care înregistrează erori.
yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms
(având implicit valoarea de 600000 milisecunde = 10 minute) determină perioada de timp (trecută de la ultimul mesaj periodic) pe care o aşteaptă procesul de gestiune a resurselor înainte de a considera procesul de gestiune a nodului ca fiind eşuat.
mapreduce.job.maxtaskfailures.per.tracker
.
d) Erorile produse la nivelul procesului de gestiune a resurselor produc situaţii destul de delicate întrucât în absenţa acestuia nu pot fi lansate nici sarcini şi nici containere ale partiţiilor acestora. Totuşi, el a fost proiectat astfel încât să se poată realiza recuperarea din eroare prin utilizarea unui mecanism ce utilizează puncte de control, astfel încât starea sa este reţinută pe disc. După ce procesul de gestiune al resurselor se opreşte din cauza unei erori, o nouă instanţă va fi lansată de un administrator, aceasta putând relua execuţia prin intermediul stării încărcate de pe disc, conţinând procesele de gestiune ale nodurilor din sistem precum şi aplicaţiile care rulează.
yarn.resourcemanager.store.class
a cărei valoare implicită este manager.recovery.MemStore
(starea este reţinută în memorie astfel încât nu este disponibilă pentru procese din alte maşini virtuale), însă vor fi realizate şi implementări care vor folosi ZooKeeper pentru a realiza recuperarea din eroare în cazul problemelor întâmpinate la nivelul procesului de gestiune a resurselor.
Politici pentru Planificarea Sarcinilor
În privinţa planificării sarcinilor, iniţial Hadoop avea o abordare prin care sarcinile erau rulate în ordinea în care erau transmise de utilizatori, fiecare dintre acestea folosind întregul cluster. Utilizarea unui cluster partajat ar fi oferit oportunitatea folosirii unor resurse de dimensiuni mari pentru mai mulţi utilizatori, însă se punea problema distribuirii echitabile a acestora. Ulterior, s-a introdus posibilitatea specificării unei priorităţi pentru o sarcină astfel încât procesul de planificare a sarcinilor o va alege pe cea cu prioritatea cea mai mare cu toate că nu este implementat dreptul de preempţiune pentru ele. Astfel, în cazul mecanismului de planificare FIFO, o sarcină având o prioritate mai mare poate fi blocată de o sarcină cu prioritate inferioară ei, având o durată de execuţie semnificativă, în situaţia în care a fost pornită anterior.
mapred.job.priority
sau prin metoda setJobPriority()
pe obiectul JobClient
(specificând una din valorile VERY_HIGH
, HIGH
, NORMAL
, LOW
sau VERY_LOW
).
Curent, MapReduce implementează mai multe mecanisme de planificare. Implicit în MapReduce “clasic” este algoritmul bazat pe cozi de procese, însă există şi procese de planificare care să ruleze într-un mediu concurent, denumite mecanismul de planificare echitabilă (eng. Fair Scheduler) şi mecanismul folosind planificare pe bază de capacitate (eng. Capacity Scheduler).
Prin intermediul mecanismului de planificare echitabilă se doreşte să se dea fiecărui utilizator o şansă egală pentru a utiliza capacitatea clusterului de-a lungul timpului.
lib
al Hadoop şi specificând pentru proprietatea mapred.jobtracker.taskScheduler
valoarea org.apache.hadoop.mapred.FairScheduler
. Acesta va funcţiona fără alte configurări, însă acestea sunt disponibile inclusiv prin intermediul unei interfeţe web, pentru a utiliza toate facilităţile de care dispune.
În situaţia în care rulează o singură sarcină, acesteia îi va fi alocat întregul cluster. Pe măsură ce sunt transmise mai multe sarcini, sunt distribuite resurse pentru partiţiile acestora, astfel încât fiecare să aibă acces, în mod echitabil, la aceleaşi mijloace ca şi ceilalţi utilizatori. Astfel, o sarcină de dimensiuni mai mici va fi terminată într-un timp optim în timp ce concomitent este rulată o sarcină a cărei execuţie durează mai mult timp, aceasta înregistrând, la rândul ei, anumite progrese.
Sarcinile fac parte dintr-un anumit grup, în mod implicit fiecărui utilizator fiindu-i repartizat un astfel de grup. În acest fel, un utilizator care transmite mai multe sarcini decât altul nu va primi mai multe resurse în cadrul clusterului. Totodată, există posibilitatea de a crea anumite grupuri care să aibă garantat un număr minim de resurse (partiţii ale sarcinii de tip map sau de tip reduce care vor rula), stabilind ponderi pentru fiecare dintre acestea. Acest mecanism implementează dreptul de preemţiune, astfel încât în situaţia în care un grup nu şi-a primit resursele necesare pentru o perioadă de timp, procesul de planificare va termina partiţii ale sarcinilor din grupuri care şi-au depăşit capacitatea, pentru a oferi acces la resursele cuvenite grupului care a primit mai puţine resurse. De asemenea, în cadrul aceluiaşi grup, sarcinile transmise de fiecare utilizator vor partaja în mod echitabil resursele de care dispune acesta.
Abordarea din cazul mecanismului folosind planificare pe bază de capacitate utilizează un număr de cozi pentru fiecare cluster, fiecare având alocată o anumită capacitate. Prin această caracteristică, mecanismul folosind planificare pe bază de capacitate se deosebeşte de mecanismul de planificare echitabilă, permiţând ca pentru fiecare utilizator să se specifice o anumită pondere din totalul clusterului care va fi folosită exclusiv de acesta. Ele pot fi organizate în mod ierarhic, astfel încât între ele se stabilesc relaţii de tip părinte-copil, fiind asemănătoare grupurilor. Fiecare coadă este asociată unui utilizator şi implementează în cadrul său un mecanism de planificare de tip FIFO , folosind priorităţi. Astfel, se permite utilizatorilor să îşi simuleze propriul cluster de tip MapReduce, în care aplicaţiile pe care le transmit vor fi executate în ordinea specificată şi în funcţie de priorităţile indicate.
Sistemul Distribuit de Fișiere HDFS
Sistemul de Gestiune pentru Baze de Date Distribuite HBase
API-ul Java pentru dezvoltarea de aplicaţii distribuite folosind framework-ul Hadoop
Dezvoltarea unei aplicaţii MapReduce folosind Hadoop presupune configurarea unui obiect de tip Job
(prin specificarea formatului pe care îl au datele de intrare şi datele de ieşire şi prin indicarea claselor care implementează sarcinile de tip Map, respectiv Reduce - opţional, se poate specifica şi o clasă de tip Combiner), prin implementarea claselor care realizează funcţionalitatea claselor de tip Map, respectiv Reduce urmată de lansarea în execuţie a aplicaţiei.
Clasa Job
înglobează toate informaţiile despre o sarcină, controlând totodată execuţia acesteia.
Job job = Job.getInstance(getConf(), "JobName");
O sarcină este împachetată în cadrul unei arhive .jar
care este distribuită în mod automat de sistemul Hadoop. Pentru a şti care este sarcina de transmis, trebuie apelată metoda setJarByClass()
, primind ca argument chiar arhiva .jar
în care se găseşte sarcina curentă. Sistemul Hadoop va localiza prin aceasta fişierul care conţine clasa indicată.
job.setJarByClass(getClass());
În continuare, trebuie specificat formatul datelor de intrare respectiv al datelor de ieşire, acestea putând fi fişiere, directoare, texte sau tabele dintr-o bază de date.
Datele de intrare sunt indicate de o implementare a clasei InputFormat
, responsabilă de crearea partiţiilor şi a sistemului de preluare a înregistrărilor.
job.setInputFormatClass(CustomInputFormat.class);
InputFormat
, va fi specificată o clasă definită de utilizator care defineşte metoda createRecordReader(InputSplit, TaskAttemptContext)
întorcând obiectul (derivat din clasa RecordReader
) care gestionează preluarea înregistrărilor din sursa respectivă.
După ce sunt preluate înregistrările din sursa de date, acestea sunt partiţionate şi fiecare secvenţă este transmisă unui proces de tip Mapper spre a fi prelucrată în continuare.
Hadoop foloseşte propriul mecanism de serializare pentru transmiterea datelor prin infrastructura de comunicaţie, fiind optimizat pentru cazul reţelelor de calculatoare. Serializarea este folosită pentru interacțiunea cu fișiere și cu baze de date. În pachetul org.apache.hadoop.io
sunt definite clasele IntWritable
(pentru Integer
), Text
(pentru String
) şi altele, însă implementarea propriilor clase pentru tipuri de date serializabile este destul de facilă.
Datele de ieşire sunt specificate de o implementare a clasei OutputFormat
, indicându-se şi locaţia la care sarcinile de tip reduce îşi vor plasa rezultatele. Fiecare sarcină de tip reduce dispune de propriile date de ieşire, de tip text, sub forma unor perechi (cheie, valoare).
job.setOutputFormatClass(CustomOutputFormat.class);
Pentru fiecare sarcină trebuie specificat formatul pentru cheia şi valoarea datelor de ieşire, pentru procesele de tip map cât şi pentru procesele de tip reduce. Sunt folosite în acest scop metodele setMapOutputKeyClass()
şi setMapOutputValueClass()
, care pot primi ca parametrii tipuri predefinite de date sau tipuri de date definite de utilizator.
În cazul în care datele de intrare sau datele de ieşire au tipurile TextInputFormat
respectiv TextOutputFormat
, locaţia de la care pot fi preluate informaţiile va fi indicată prin metodele FileInputFormat.setInputPaths()
, respectiv FileOutputFormat.setOutputPath()
care vor primi ca parametru obiectul de tip Job
şi un obiect de tip Path
care conţine calea din sistemul de fişiere local sau din cadrul sistemului distribuit de fişiere HDFS.
Ulterior, trebuie indicate clasele de tip Mapper şi Reducer (având de regulă aceeaşi cheie pentru datele de ieşire):
job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class);
În continuare, sarcina este lansată în execuţie prin apelarea metodei waitForCompletion()
ce primeşte un parametru care indică dacă sunt afişate valori la consolă. Metoda transmite sarcina, aşteptând terminarea acesteia. Rezultatul pe care îl întoarce este true
în caz că sarcina a fost terminată cu succes şi false
în caz contrar.
Clasa de tip Map
este derivată din clasa org.apache.hadoop.mapreduce.Mapper
, fiind parametrizată cu 4 valori de tip JavaGenerics având semnificaţia tipurilor de date pe care le au cheia şi valoarea datelor de intrare, respectiv cheia şi valoarea datelor de ieşire. Sarcina programatorului constă în a defini metoda map()
care primeşte ca parametru o cheie şi o valoare (cu tipurile aferente celor specificate la parametrizarea clasei) corespunzătoare datelor de intrare şi un obiect de tip Context
folosit pentru datele de ieşire, dar şi pentru specificarea unor contoare definite de utilizator.
- MyMapper.java
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text outputKey = new Text(); Text outputValue = new Text(); // ... context.write(outputKey, outputValue); } }
Analog, clasa de tip Reduce
este derivată din org.apache.hadoop.mapreduce.Reducer
, având acelaşi tip de parametrizare cu menţiunea faptului că tipurile specificate pentru datele de intrare (pentru cheie şi pentru valoare) ale acestuia trebuie să fie identice cu tipurile specificate pentru datele de ieşire ale clasei de tip Map
, întrucât perechile (cheie, valoare) produse de aceasta vor fi grupate astfel că pentru fiecare cheie va exista un set de una sau mai multe valori. Acest proces este cunoscut sub denumirea de amestecare şi sortare (eng. shuffle and sort). Intrările pentru clasa de tip Reduce
sunt sortate în funcţie de cheie, fiecare set de valori fiind prelucrat în metoda reduce()
care primeşte ca argumente o cheie şi un set de valori, dar şi obiectul de tip Context
. Un set de valori este reprezentat sub forma Iterable<V>
. Prelucrările în cadrul metodei reduce()
sunt realizate prin parcurgerea acestei liste şi realizarea unor agregări asupra acestor valori pentru obţinerea rezultatului.
- MyReducer.java
public class MyReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Interable<Text> values, Context context) throws IOException, InterruptedException { Text outputKey = new Text(); Text outputValue = new Text(); for (Text value: values) { // ... } context.write(outputKey, outputValue); } }
Opţional, se poate defini şi o clasă Combiner, spre a combina informaţiile pentru fiecare sarcină de tip Map
, astfel încât cantitatea datelor procesate de sarcina Reduce
să fie limitată. Aceste clase nu sunt însă garantate să ruleze, fiind folosite mai mult pentru optimizări şi nu în cazul operaţiilor critice pentru logica aplicaţiei.
Informaţiile oferite la rularea unei sarcini sunt identificatorul acesteia (folosit pentru a monitoriza şi a gestiona sarcina), numărul de partiţii generate, progresul şi valoarea contoarelor exprimând statistici referitoare la operaţiile care au fost realizate în cadrul rulării sarcinii.
Întrucât HBase este scris în limbajul de programare Java, el oferă un API pentru acces programatic la informaţiile conţinute. Aceasta reprezintă metoda cea mai rapidă de a accesa informaţiile din cadrul unei baze de date distribuite de tip HBase. Sunt suportate operaţii de tip CRUD, dar şi unele operaţii administrative.
Accesul la o bază de date HBase presupune crearea unui obiect de tip Configuration
, în care sunt încărcate proprietăţi specifice, indicate de utilizator în fişierele de configurare, construirea unui obiect de tip HTable
(pentru care trebuie să se specifice atât obiectul de configurare cât şi denumirea tabelului), realizarea diferitelor operaţii (put()
, get()
, scan()
, delete()
) şi închiderea instanţei care implică eliberarea tuturor resurselor folosite şi a zonelor de memorie tampon.
Clasa Configuration
extinde clasa de configurare Hadoop cu care păstrează compatibilitatea, la crearea unui astfel de obiect fiind încărcate din classpath documentele hbase-default.xml
şi hbase-site.xml
. Valorile proprietăţilor din hbase-site.xml
suprascriu valorile proprietăţilor conţinute în hbase-default.xml
.
Configuration conf = HBaseConfiguration.create();
Unele proprietăţi pot fi suprascrise manual, însă un astfel de comportament nu este de regulă necesar şi nici de dorit.
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
Un obiect de tip Configuration
trebuie partajat cât de mult posibil întrucât obiectele de tip HTable
create cu aceeaşi configurație vor avea aceeaşi conexiune la ZooKeeper şi HBaseMaster, de vreme ce o conexiune este reţinută intern printr-un obiect de tip asociere care foloseşte drept cheie instanţe ale obiectului de tip Configuration
. Când se reutilizează un obiect de tip Configuration
pentru mai multe instanţe HTable
, trebuie apelat close()
ca HConnectionManager
să elimine instanţa din lista tabelelor ce folosesc conexiunea respectivă. Când toate tabelele unei conexiuni sunt închise, este terminată şi conexiunea corespunzătoare. O conexiune e reprezentată de clasa HConnection
fiind gestionată de clasa HConnectionManager
.
Clasa org.apache.hadoop.client.HTable
reprezintă interfaţa clientului către o singură tabelă HBase, oferind acces la operaţii CRUD, fiind proiectată pentru a fi utilizată cu uşurinţă. Operaţiile care modifică datele sunt atomice pentru fiecare înregistrare în parte, neexistând conceptul de tranzacţie (care să implice mai mult de o înregistrare). Astfel, există o consistenţă de 100% per înregistrare, clientul realizând operaţii de citire sau de scriere asupra acesteia sau aşteptând eliberarea accesului. Atomicitatea se păstrează indiferent de numărul de coloane utilizate în cadrul cererii.
HTable
poate fi utilizat pentru grupuri de comenzi (spre a se asigura performanţa), însă aceste operaţii nu sunt atomice.
Crearea unui obiect HTable
este destul de costisitoare întrucât este scanat întregul catalog .META.
, verificându-se dacă tabela există şi este activată. Astfel, aceasta trebuie apelată o singură dată în cadrul unui fir de execuţie, fiind refolosit cât de mult posibil. Operaţiile realizate cu obiecte HTable
nu sunt garantate să funcţioneze corect într-un mediu distribuit.
HTable myTable = new HTable(conf, "TableName");
HTable
la un moment dat, ar trebui utilizat un obiect de tip HTablePool
pentru că reutilizează instanţele acestor clase.
Pentru a se scrie date într-o tabelă, se foloseşte un obiect de tip Put
, care este instanţiat cu identificatorul (cheia de rând) a înregistrării care se doreşte transmisă, în format binar.
Put myPut = new Put(Bytes.toBytes("myRow"));
Un astfel de obiect implementează metoda add()
, prin care se indică coordonatele celulei la care se doreşte să se scrie valoarea în cauză.
Put.add(family, column, value); Put.add(family, column, value, timestamp, value);
Toţi parametrii sunt în format binar, fiind reponsabilitatea programatorului conversia acestora.
Transferul propriu-zis al datelor se realizează la apelul metodei put()
, pe instanţa HTable
:
myTable.put(myPut);
Pentru a se citi date dintr-o tabelă, se foloseşte un obiect de tip Get
, care este instanţiat cu identificatorul (cheia de rând) a înregistrării care se doreşte obţinută, în format binar.
Get myGet = new Get(Bytes.toBytes("myRow"));
Un astfel de obiect implementează metoda add()
, prin care se indică coordonatele celulei la care se doreşte să se scrie valoarea în cauză. Metoda add()
permite specificarea mai multor familii de coloane şi a mai multor coloane.
Get.addFamily(family); Get.addColumn(family, column);
Este foarte important să fie indicate exact datele necesare în cadrul aplicaţiei, altfel fiind returnat un rând întreg.
Filtrarea se poate realiza prin specificarea unui interval de timp, respectiv al unui număr de versiuni (prin metodele setTimeRange()
şi setMaxVersions()
).
Transferul propriu-zis al datelor se realizează la apelul metodei get()
, pe instanţa HTable
:
Result myResult = myTable.get(myGet);
Obţinerea rezultatelor dintr-un obiect de tip Result
se face prin metoda getValue()
, specificându-se denumirea familiei de coloane şi a coloanei. Alte metode sunt getRow()
pentru obţinerea identificatorului unei înregistrări, isEmpty()
pentru a verifica dacă rezultatul este vid, size()
pentru a obţine numărul de celule şi containsColumn(family:column)
pentru a verifica existenţa unei coloane.
Un obiect HTable
trebuie întotdeauna închis dacă nu mai este necesar sau atunci când s-a produs o excepţie (fiind plasat de regulă pe ramura catch
, operaţiile pe tabela respectivă fiind plasate pe ramura try
):
myTable.close();
Activitate de Laborator
0. Să se cloneze în directorul de pe discul local conținutul depozitului la distanță de la https://www.github.com/aipi2015/Laborator08. În urma acestei operații, directorul Laborator08 va trebui să conțină subdirectorul labtasks
, fișierele README.md
și LICENSE
.
student@aipi2015:~$ git clone https://www.github.com/aipi2015/Laborator08.git
1. Să se pornească maşina virtuală Ubuntu 14.04, autentificarea făcându-se folosind credențialele aipi2015 (ca nume de utilizator) şi StudentAipi2015 (ca parolă).
2. Într-un terminal, să se pornească sistemul de fişiere distribuite HDFS şi sistemul de gestiune al resurselor YARN.
student@aipi2015:~$ start-dfs.sh student@aipi2015:~$ start-yarn.sh
3. Într-un terminal, să se pornească o instanţă a sistemului de gestiune pentru baze distribuite HBase.
student@aipi2015:~$ start-hbase.sh
student@aipi2015:~$ jps
În execuție trebuie să se afle procesele Java NameNode, SecondaryNameNode, DataNode (ale sistemului distribuit de fișiere HDFS), ResourceManager, NodeManager (ale sistemului de gestiune a resurselor YARN), respectiv HRegionServer, HMaster și HQuorumPeer (ale sistemului de gestiune pentru baze de date distribuite HBase).
4. Să se ruleze script-urile HBase pentru instalarea bazei de date. Vor trebui rulate scripturile Laborator08_DDLc.rb
şi Laborator08_DML.rb
, localizate în directorul scripts
.
Va fi apelat utilitarul hbase
(cu parametrul shell
), indicându-se căile absolute către scripturile care trebuie rulate.
student@aipi2015:~/scripts$ hbase shell Laborator08_DDLc.rb student@aipi2015:~/scripts$ hbase shell Laborator08_DML.rb
Laborator08_DDLd.rb
pentru dezinstalarea bazei de date, în cazul în care o astfel de operație este necesară.
Structura bazei de date conține următoarele tabele, familii de coloane și atribute:
Tabela | Familia de Coloane | Atributul |
---|---|---|
country | identification | name, code |
detail | description | |
publishing_house | identification | name, registered_number |
detail | description | |
address | postal_address, zip_code, country_id | |
miscellanea | internet_address | |
collection | identification | name, description |
referrence | publishing_house_id | |
book | identification | title, subtitle |
detail | description | |
version | edition, printing_year | |
group | collection_id | |
format | identification | value |
detail | description | |
language | identification | name, code |
detail | description | |
book_presentation | identification | isbn |
referrence | book_id, format_id, language_id | |
inventory | stockpile, price | |
writer | appellation | first_name, last_name |
biography | content | |
author | referrence | book_id, writer_id |
category | identification | name |
detail | description | |
category_content | referrence | book_id, category_id |
supply_order_hdeader | identification | identification_number, issue_date |
situation | state | |
producer | publishing_house_id | |
supply_order_line | referrence | supply_order_header_id |
content | book_presentation_id, quantity | |
user | appellation | first_name, last_name |
identification | personal_identifier | |
contact | address, phone_number, email | |
category | type | |
authentication | username, password | |
invoice_header | identification | identification_number, issue_date |
situation | state | |
consumer | user_id | |
invoice_line | referrence | invoice_header_id |
content | book_presentation_id, quantity | |
statistics | expense | value |
5. Se doreşte afişarea unei liste conţinând toţi utilizatorii (identificaţi prin nume şi prenume) precum şi sumele pe care aceştia le-au cheltuit prin achiziţia de cărţi din librăria virtuală.
În cazul unui sistem de gestiune pentru baze de date relaţionale, o astfel de problemă are o soluţie foarte simplă ce constă într-o operaţie de joncţiune între tabelele book_presentation
, user
, invoice_header
şi invoice_line
. Pentru un volum foarte mare de informaţii, o astfel de operaţie poate dura o perioadă de timp inacceptabilă, motiv pentru care se doreşte ca procesarea să se realizeze în paralel, pe mai multe maşini, fiecare gestionând o parte dintre înregistrările tabelelor respective.
Un sistem MapReduce poate împărţi datele de intrare (tabela invoice_line
pentru situaţia de faţă) în mai multe regiuni în mod automat, fiecare dintre acestea fiind prelucrate de un proces separat. Astfel, pentru fiecare detaliu din factură, reprezentând un produs al facturii se determină preţul total, obţinut prin înmulţirea preţului formatului de prezentare a cărţii (obţinut printr-o interogare a tabelei book_presentation
) cu cantitatea precizată. Pentru o factură dată prin identificatorul său, se asociază preţul total al fiecărui produs pe care îl conţine. Ulterior, se creează o listă corespunzătoare fiecărei facturi conţinând toate preţurile totale ale produselor aferente. Acestea vor fi însumate, obţinându-se valoarea facturii.
MAP | → | REDUCE |
---|---|---|
(factura1, preţ_produs1) | → | (factura1, (preţ_produs1, preţ_produs2, …, preţ_produsm1)) = (factura1, preţ_produs1 + preţ_produs2 + … + preţ_produsm1) |
(factura1, preţ_produs2) | ||
… | ||
(factura1,preţ_produsm1) | ||
(factura2, preţ_produs1) | (factura2, (preţ_produs1, preţ_produs2, …, preţ_produsm1)) = (factura2, preţ_produs1 + preţ_produs2 + … + preţ_produsm2) |
|
(factura2, preţ_produs2) | ||
… | ||
(factura2,preţ_produsm2) | ||
… | … | |
(facturan, preţ_produs1) | (facturan, (preţ_produs1, preţ_produs2, …, preţ_produsmn)) = (facturan, preţ_produs1 + preţ_produs2 + … + preţ_produsmn) |
|
(facturan, preţ_produs2) | ||
… | ||
(facturan,preţ_produsmn) |
Cum fiecare factură are asociat un identificator al utilizatorului care a achitat-o, acesta va conduce la obţinerea, prin interogarea bazei de date user, a numelui şi prenumelui clientului respectiv.
Pentru a calcula totalurile pentru fiecare utilizator (un utilizator poate avea mai multe facturi asociate), din datele generate până acum, se vor realiza grupări în funcţie de utilizatori, astfel încât pentru fiecare dintre aceştia vor fi asociate una sau mai multe sume de bani corespunzătoare facturilor. Printr-un mecanism similar, acestea vor fi însumate în vederea determinării cheltuielilor totale, pentru fiecare utilizator în parte.
MAP | → | REDUCE |
---|---|---|
(utilizator, preţ_factura1) | → | (utilizator, (preţ_factura1, preţ_factura2, …, preţ_facturan)) = (utilizator, preţ_factura1 + preţ_factura2 + … + preţ_facturan) |
(utilizator, preţ_factura2) | ||
… | ||
(utilizator, preţ_facturan) |
Se observă faptul că structura coloanelor marcate este echivalentă, astfel încât aplicaţia va consta din 2 sarcini, din care una va avea atât partiţie de tip map cât şi partiţie de tip reduce, iar una va avea numai partiţie de tip reduce. De asemenea, se observă faptul că datele de ieşire ale unei sarcini reprezintă datele de intrare pentru cealaltă sarcină.
În aplicaţia BookStore
, cele două sarcini se numesc invoiceValue
şi userExpenses
, fiind definite (prin partiţiile map şi reduce corespunzătoare) în clasa cu acelaşi nume.
a) în clasa Map
a sarcinii invoiceValue
(clasa InvoiceValueMapper
din pachetul ro.pub.cs.aipi.lab08.mapper
), să se determine preţul formatului de prezentare a cărţii analizate în mod curent (care se găseşte în detaliul de factură repartizat procesului curent);
<spoiler|Indicații de rezolvare>
Se va crea o instanţă a clasei Table
, pe baza unui obiect de tip Connection
(obținut prin intermediul unei fabrici de conexiuni, ConnectionFactory
), pentru care se apelează metoda createConnection()
, aceasta primind ca argument denumirea tabelei respective (TableName.valueOf(…)
). Ulterior, se creează un obiect de tip Get
în care se va specifica atributul ce se doreşte obţinut (prin metoda addColumn()
, indicându-se atât familia coloanei cât şi denumirea propriu-zisă a coloanei), din aplicarea acestuia generându-se un obiect Result
pentru care se apelează metoda getValue()
în scopul de a se obţine valoarea dorită.
close()
.
</spoiler>
b) în clasa Reduce
a sarcinii invoiceValue
(clasa InvoiceValueReducer
din pachetul ro.pub.cs.aipi.lab08.reducer
), să se calculeze preţul total pentru o factură, prin agregarea tuturor sumelor parţiale specificate în detaliile de factură;
c) în clasa Reduce
a sarcinii invoiceValue
(clasa InvoiceValueReducer
din pachetul ro.pub.cs.aipi.lab08.reducer
), să se obţină, prin interogarea tabelei invoice_header
, identificatorul utilizatorului căruia i-a fost emisă factura analizată în mod curent;
d) în clasa Reduce
a sarcinii invoiceValue
(clasa InvoiceValueReducer
din pachetul din pachetul ro.pub.cs.aipi.lab08.reducer
), să se obţină, prin interogarea tabelei user
, numele şi prenumele clientului având identificatorul determinat anterior;
e) în clasa BookStore
din pachetul ro.pub.cs.aipi.lab08.main
, pentru sarcina userExpenses
, să se indice valorile claselor corespunzătoare cheii şi valorii datelor de ieşire;
<spoiler|Indicații de Rezolvare>
Întrucât datele de ieşire sunt reprezentate de tuplul (nume&prenume, valoare), tipurile de date corespunzătoare vor fi Text
, respectiv DoubleWritable
.
</spoiler>
f) în clasa Reduce
a sarcinii userExpenses
(clasa UserExpensesFileReducer
din pachetul ro.pub.cs.aipi.lab08.reducer
), să determine valoarea totală a facturilor achitate de client şi să se scrie această valoare în context (folosind metoda write()
, cheia de rând fiind reprezentată de numele şi prenumele utilizatorului);
De remarcat faptul că pentru citirea datelor de intrare în cazul sarcinii userExpenses
, a fost folosită o clasă specială, definită de utilizator (CustomFileInputFormat
), în care datele au forma (cheie, valoare), iar separatorul dintre aceste entităţi este TAB-ul. Întrucât cheia este reprezentată de numele şi prenumele utilizatorilor, iar valoarea de suma unei facturi, tipurile de date procesate în clasa CustomFileInputFormatRecordReader
vor fi Text
, respectiv DoubleWritable
. O astfel de soluţie a fost adoptată datorită faptului că nu au fost definite tipuri de fişiere de intrare care să aibă această structură.
6. Să se rezolve problema anterioară, stocând rezultatele nu într-un fişier ci în tabela statistics
a bazei de date HBase, care are o singură familie de coloane, expense
, în care o posibilă coloană ar putea fi valoare.
a) În clasa BookStore
din pachetul ro.pub.cs.aipi.lab08.main
, se va apela metoda statică initTableReducerJob()
a clasei TableMapReduceUtil
ce primeşte ca parametrii denumirea tabelei unde se doresc a fie redirectate datele de ieşire, numele clasei care implementează sarcina de tip Reduce
şi numele obiectului de tip Job
. De asemenea, va trebui indicat şi numărul sarcinilor de tip Reduce
.
b) În clasa UserExpensesTableReducer
din pachetul ro.pub.cs.aipi.lab08.reducer
, ulterior determinării sumei totale achitate de client, să se creeze un obiect de tip Put
având drept cheie numele şi prenumele utilizatorului, adăugându-se (prin metoda addColumn()
), ca valoare a celulei identificată de coordonatele expense:value
, suma tuturor facturilor, corespunzând produselor achiziţionate.
7. Să se verifice rezultatele obținute.
<spoiler|Indicații de Rezolvare>
a) în situația în care rezultatele au fost stocate în sistemul de fișiere HDFS, se poate folosi plugin-ul pentru mediul integrat de execuție Eclipse, care conferă un navigator pentru acest sistem de fișiere;
b) în situația în care rezultatele au fost stocate în tabela statistics
din cadrul sistemului de gestiune pentru baze de date distribuite HBase
, se poate verifica conținutul acesteia:
student@aipi2015:~$ hbase shell hbase> scan 'statistics'
</spoiler>
8. Înainte de a opri maşina virtuală, să se oprească instanţa sistemului de gestiune pentru baze de date distribuite HBase precum şi a sistemului de gestiune a resurselor YARN, respectiv al sistemului de fişiere distribuite HDFS.
student@aipi2015:~$ stop-hbase.sh student@aipi2015:~$ stop-yarn.sh student@aipi2015:~$ stop-dfs.sh
Resurse
Hadoop Documentation
Tom WHITE, Hadoop – The Definitive Guide, 3rd Edition, O’Reilly, 2012
Hadoop Tutorial: Developing Big-Data Applications with Apache Hadoop
The Apache HBaseTM Reference Guide
Lars GEORGE, HBase: The Definitive Guide, O’Reilly, 2011