Laborator 08

Proiectarea de aplicații distribuite folosind framework-ul Hadoop

Obiective

  • înțelegerea modelului implementat de Hadoop pentru procesarea distribuită a unui volum mare de date, folosind clustere formate din mașini fără performanțe distribuite, larg disponibile în comerț (eng. commodity hardware);
  • descrierea principalelor module integrate de framework-ul Hadoop;
  • cunoașterea produselor care funcționează împreună cu modulele Hadoop, funcționalitatea pe care o pun la dispoziție și oportunitatea folosirii lor;
  • configurarea framework-ului Hadoop (HDFS, YARN) și a bazei de date distribuite HBase pe un cluster format dintr-o singură mașină;
  • utilizarea paradigmei de programare Map/Reduce pentru procesarea distribuită a unui volum mare de date;
  • familiarizarea cu modul în care sunt gestionate sarcinile Map/Reduce în diferite implementări ale framework-ului Hadoop (Map/Reduce “clasic”, respectiv YARN);

Cuvinte Cheie

Hadoop, cluster, commodity hardware, Google File System, Hadoop Distributed File System, YARN, Map-Reduce, HBase, ZooKeeper, Oozie, Pig, Hive, HDFS federation, namenode, datanode, Job Tracker, Task Tracker, Resource Manager, Node Manager, Application Master

Materiale Ajutătoare

Framework-ul Hadoop - aspecte generale

Hadoop este un proiect open-source dezvoltat de Apache care îşi propune realizarea de procesări distribuite a unor seturi de date de dimensiuni mari, rulând pe mai multe clustere, folosind modele de programare simple. Proiectarea acestui framework a fost realizată astfel încât să fie scalabilă chiar şi în situaţia în care sarcinile sunt rulate pe mii de calculatoare, fiecare dintre acestea punând la dispoziţie o anumită capacitate de procesare şi de stocare.

Un cluster este un set de mai multe maşini, legate prin intermediul unei reţele de calculatoare, care se găsesc în aceeaşi locaţie fizică. Nu este necesar ca performanţele pe care le oferă acestea să fie foarte ridicate, scalarea realizându-se nu prin adăugarea de resurse la o maşină (de vreme ce legea lui Moore oricum nu poate acoperi creşterea cantităţii de date), ci prin creşterea numărului de maşini în cadrul unui cluster, respectiv prin creşterea numărului de clustere, framework-ul Hadoop fiind proiectat pentru a gestiona astfel de modificări. Mai mult, spre diferenţă de modelul “tradițional” de procesare distribuită, nu se face o separare între noduri de procesare şi noduri de stocare (întrucât pot apărea blocaje) ci un nod implementează ambele funcţionalităţi, fiind procesate datele stocate local.
Spre diferenţă de sistemele Grid şi Cloud, Hadoop nu se bazează pe hardware-ul maşinilor care fac parte din cluster pentru a oferi un nivel de disponibilitate ridicat, ci are capabilitatea de a detecta (la nivel software) erorile care se produc, gestionând astfel de situaţii (în special prin replicarea datelor şi re-execuţia sarcinilor), garantând realizarea sarcinilor, inclusiv prin redirecţionarea lor pe alte maşini.

Un programator care proiectează aplicaţii distribuite folosind Hadoop nu trebuie să gestioneze evenimentele care se pot produce la nivelul unei maşini, concentrându-se pe dezvoltarea logicii aplicaţiei, în timp ce partiţionarea datelor şi distribuirea codului vor fi realizate în mod transparent prin intermediul framework-ului, interacţiunea dintre componente fiind definită în cadrul unor interfeţe standardizate.

Hadoop a fost creat de Doug Cutting, cel care realizase şi biblioteca pentru căutări în texte Apache Lucene. Ideea acestui proiect a plecat de la Apache Nutch, un motor de căutare web open-source, lansat în 2002, care nu era însă scalabil pentru miliardele de pagini ce ar fi trebuit indexate spre a oferi rezultate corecte. Pe baza unui articol din 2003 care descria sistemul de fişiere distribuite al celor de la Google (GFS), este lansat Nutch Distributed Filesystem (NDFS) ce rezolva atât problema spaţiului de stocare al fişierelor generate în urma proceselor de indexare a paginilor Internet cât şi sarcinile administrative de gestionare a nodurilor de stocare. Un model similar este urmat şi în cazul Map-Reduce: dacă în 2004 Google publica articolul prin care era prezentat acest concept, implementarea sa în Nutch devenea deja funcţională în 2005. În 2006, odată cu mutarea lui Doug Cutting la Yahoo!, apare noţiunea de Hadoop iar în 2008 acesta era deja folosit pentru motorul de căutare web al Yahoo! folosind un cluster din 10.000 noduri. Apache acordă o importanţă deosebită proiectului, ce devine o prioritate a organizaţiei, după ce acesta devenise cel mai rapid sistem care sortase 1 TB de date (209 secunde, pe un cluster format din 910 noduri). În cadrul aceluiaşi an, recordul avea să fie depăşit de încă două ori: implementarea MapReduce de la Google sortase 1TB de date în 68 de secunde, în timp ce sistemul celor de la Yahoo! reuşise aceeaşi performanţă în numai 62 de secunde. Investiţiile în cercetare destinate acestui proiect devin din ce în ce mai mari, achiziţionându-se clustere ce depăşesc 1000 de noduri pe care sunt încărcate seturi de date ce atingeau ordinul sutelor de terrabytes. Începând cu anul 2010, Hadoop a fost adoptat pe scară largă de organizaţii atât în scopul de a stoca volume mari de date cât şi ca platformă de analiză a acestora. În prezent, Hadoop este folosit de numeroase companii pentru care volumul de date generat zilnic depăşeşte capacităţile de procesare şi stocare specifice sistemelor convenţionale: Adobe, AOL, Amazon.com, EBay, Facebook, Google, LinkedIn, Twitter, Yahoo.

Framework-ul Hadoop include mai multe module:

  • Hadoop Common: utilitare de bază care oferă funcţionalităţile pentru celelalte module;
  • Hadoop Distributed File System (HDFS): sistem de fişiere distribuite ce pune la dispoziţie un nivel de disponibilitate ridicat la datele utilizate de aplicaţii;
  • Hadoop YARN: modul pentru planificarea sarcinilor şi gestiunea resurselor din cadrul unui cluster;
  • Hadoop Map-Reduce: sistem bazat pe YARN pentru procesarea paralelă a unor seturi mari de date.

În plus, au fost dezvoltate mai multe produse care pot fi folosite împreună cu Hadoop / HDFS:

  • HBase: bază de date distribuită, scalabilă care suportă stocarea informaţiilor structurate pentru tabele de dimensiuni mari; implementată sub forma unor perechi cheie-valoare, foloseşte de obicei sistemul de fişiere distribuit HDFS deşi poate fi folosit şi împreună cu sistemul de fişiere local;
  • ZooKeeper: serviciu de coordonare performant pentru aplicaţii distribuite;
  • Oozie: modulul pentru gestiunea şi planificarea fluxurilor, coordonând fluxurile Map-Reduce;
  • Pig: limbaj de nivel înalt pentru procesarea fluxurilor de date şi mediu de execuţie pentru prelucrări paralele;
  • Hive: depozit de date cu interfaţă SQL care oferă sumarizarea datelor şi interogări ad-hoc.

Există mai mulţi producători care pun la dispoziţie distribuţii Hadoop, al căror scop este oferirea unei configuraţii care rezolvă incompatibilităţile dintre diferite produse, prin rularea unor teste de integrare între acestea.

Produsele Hadoop integrate în cele mai multe dintre distribuţii sunt HDFS, MapReduce, HBase, Hive, Mahout, Oozie, Pig, Sqoop, Whirr, ZooKeeper, Flume. De asemenea, proiectul BigTop (dezvoltat de Apache) are rolul de a rula teste de interoperabilitate între componentele Hadoop oferind pachete Linux (RPM şi pachete Debian) pentru o instalare mai facilă.

Distribuţiile sunt realizate în mai multe formate, suportă un set de sisteme de operare şi pot include scripturi suplimentare pentru rularea mediului de lucru.

De regulă, este vorba despre sisteme de operare Linux (RedHat Enterprise, CentOS, Oracle Linux, Ubuntu, SUSE Linux Enterprise Server).

Între distribuţiile mai cunoscute se numără Cloudera Distribution for Hadoop (CDH), MapR Distribution, Hortonworks Data Platform (HDP), Apache BigTop Distribution, Greenplum HD Data Computing Appliance. Acestea dispun şi de propriile documentaţii (inclusiv wiki) şi oferă utilizatorilor maşini virtuale pe care sunt instalate distribuţiile în cauză.

Versiunile Hadoop întreţinute în prezent sunt 1.2.x, 2.6.x şi 0.23.x (similară cu 2.x, fără nivelul de disponibilitate ridicat pentru nodurile de nume). Îmbunătăţirile aduse în cadrul versiunii 2.6.x sunt:

  • implementarea federaţiilor HDFS, ce utilizează mai multe noduri (spaţii) de nume independente (care nu necesită coordonare între ele); nodurile de date sunt utilizate ca spaţii de stocare pentru blocuri de către toate nodurile de nume. Pentru implementarea unei astfel de funcţionalităţi, este necesar ca fiecare nod de date să fie înregistrat de toate nodurile de nume din cluster. Acestea vor transmite rapoarte periodice cu privire la blocurile pe care le stochează, gestionând comenzile care provin de la nodurile de nume.



    Din acest motiv, nodurile de date stochează blocuri aparţinând tuturor federaţiilor de blocuri (eng. block pool) din cadrul cluster-ului, fiecare dintre ele fiind gestionate separat de alte astfel de obiecte. O federaţie de blocuri este reprezentată de un set de blocuri ce aparţine unui singur spaţiu de nume. În acest mod, un spaţiu de nume poate genera identificatori pentru blocuri fără a fi necesară coordonarea cu alte spaţii de nume şi în acelaşi timp, erorile produse la nivelul unui nod de nume nu împiedică nodul de date de a lucra cu alte noduri de nume din cluster. Un spaţiu de nume împreună cu federaţia sa de blocuri poartă denumirea de volum al spaţiului de nume, acesta fiind o unitate de gestiune autonomă. Atunci când un nod (spaţiu) de nume este şters, şi federaţia sa de blocuri de la nivelul nodurilor de date este ştearsă. Actualizările de la nivelul volumului unui spaţiu de nume se realizează unitar, atunci când se produce actualizarea unui cluster. Toate nodurile din cadrul unui cluster partajează un identificator al clusterului, generat în mod automat la formatarea unui nod de nume. Un astfel de identificator ar trebui să fie folosit şi pentru formatarea altor noduri de nume în cadrul cluster-ului respectiv.

    Beneficiile acestei îmbunătăţiri includ scalabilitatea spaţiului de nume (spaţiul de stocare al clusterului HDFS este caracterizat prin scalabilitate, astfel încât proiectele de dimensiuni mari sau cele care utilizează numeroase fişiere de dimensiuni mai mici pot beneficia de scalabilitatea spaţiului de nume prin adăugarea mai multor noduri de nume la cluster), performanţa (îmbunătăţirea ratei de transfer pentru operaţiile de citire / scriere în cadrul sistemului de fişiere prin adăugarea mai multor noduri de nume în cadrul clusterului) şi izolarea (categorii diferite de aplicaţii şi utilizatori pot fi izolate în cadrul unor spaţii de nume diferite). Într-o arhitectură care foloseşte un singur spaţiu de nume, într-un mediu concurent, o aplicaţie poate suprasolicita spaţiu de nume astfel încât performanţele celorlalte aplicaţii ce utilizează acelaşi spaţiu de nume vor fi afectate.
  • implementarea MapReduce NextGen (YARN, MRv2) prin care se împart funcţionalităţile modulului de monitorizare a sarcinilor (eng. Job Tracker) – gestiunea resurselor şi planificarea sarcinilor în procese separate; în acest mod, se creează un modul global de gestiune al resurselor (Resource Manager) şi un modul de bază al aplicaţiei (Application Master) pentru fiecare aplicaţie în parte. O aplicaţie este formată fie dintr-o singură sarcină (din punctul de vedere al sarcinilor de tip MapReduce) sau conţine un graf aciclic de sarcini. Modulul de gestiune al resurselor împreună cu modulul de gestiune al nodurilor (Node Manager) formează cadrul de calcul al sarcinilor. Modulul de gestiune al resurselor este autoritatea care arbitrează modul de distribuire al resurselor pentru aplicaţiile din sistem. Modulul de gestiune pentru aplicaţii este de fapt o bibliotecă ce negociază resursele primite de la modulul de gestiune al resurselor colaborând cu modulele de gestiune a nodurilor pentru a executa şi pentru a monitoriza sarcinile.



    Modulul pentru gestiunea resurselor cuprinde două componente de bază: modulul de planificare şi modulul de gestiune al aplicaţiilor.

    Modulul de planificare este responsabil pentru alocarea resurselor către diferite aplicaţii, caracterizate prin constrângeri referitoare la capacitate, cozi. Acesta nu oferă însă nici o garanţie în privinţa repornirii sarcinilor eşuate datorită erorilor produse la nivel hardware sau software şi nici nu se ocupă cu monitorizarea stării aplicaţiilor. Planificarea se realizează în funcţie de cerinţele pentru resurse ale aplicaţiilor, specificate de noţiunea abstractă container care cuprinde elemente ca memorie, procesor, disc, reţea. În versiunile anterioare ale Hadoop, singurul criteriu luat în considerare de planificator era reprezentat de memorie. Modulul de planificare suportă extensii programabile (precum CapacityScheduler sau FairScheduler) responsabile cu partiţionarea clusterului între diferite aplicaţii. Extensia programabilă CapacityScheduler suportă cozi ierarhice pentru a permite partajarea resurselor clusterului în funcţie de previziuni.

    Modulul de gestiune al aplicaţiilor se ocupă cu acceptarea sarcinilor transmise, negociind container-ul pentru executarea modulului de bază al aplicaţiei, oferind servicii pentru repornirea container-ului acestuia în situaţia în care s-au produs erori.

    Modulul de gestiune al nodurilor este un cadru implementat pe fiecare maşină în parte, responsabil pentru containere, monitorizându-le utilizarea resurselor (procesor, memorie, disc, reţea) şi raportând aceste valori la modulul de gestiune al resurselor şi la modulul de planificare.

    Modulul de bază al aplicaţiei are rolul de a negocia containerele de resurse corespunzătoare de la modulul de planificare, monitorizându-le starea.
MapReduce v2 menţine compatibilitatea cu versiunile anterioare, astfel încât acestea pot rula şi în noul cadru de execuţie după ce sunt recompilate.

Funcţionalităţile implementate de diferitele versiuni Hadoop, cu privire la HDFS şi MapReduce pot fi sintetizate astfel:

Funcţionalitate 1.2.x 0.23.x 2.6.x
autentificare sigură da nu da
nume de configurare vechi da învechite învechite
nume de configurare noi nu da da
API MapReduce vechi da da da
API MapReduce nou da (biblioteci lipsă) da da
cadru de rulare MapReduce 1 (clasic) da da nu
cadru de rulare MapReduce 2 (YARN) nu nu da
federaţii HDFS nu nu da
nivel de disponibilitate HDFS ridicat nu nu da

Decizia de a utiliza Hadoop împreună cu HDFS şi MapReduce trebuie să se bazeze pe adecvarea unei anumite probleme la acest framework. Astfel, aplicaţiile dezvoltate folosind această tehnologie implică procesarea unui volum foarte mare de date care poate fi realizată în paralel, ceea ce presupune independenţa acestora unele de altele. Situaţia în care datele sunt dependente unele de altele poate fi rezolvată prin iterare (rularea mai multor sarcini MapReduce, în care datele de ieşire ale unora reprezintă datele de intrare pentru celelalte) sau prin partajare (folosind o bază de date distribuită şi nu variabile distribuite întrucât acestea există doar în contextul unei singure maşini virtuale, neputând fi accesate şi din contextul altor noduri sau clustere).

Configurarea Mediului de Lucru

Paradigma de Programare Map Reduce

MapReduce reprezintă un model de progamare destinat procesării de date pe un număr foarte mare de noduri, reprezentate de maşini disponibile în comerţ, fără performanţe deosebite (eng. commodity hardware). Este inspirat din programarea funcţională de unde sunt preluate funcţiile map şi reduce, putând fi implementat în limbaje de programare ca Java, C++, Python sau Ruby. Un astfel de model este util mai ales pentru prelucrarea unor seturi de date (semistructurate şi orientate pe înregistrări) de dimensiuni foarte mari, în care procesarea paralelă este singura soluţie pentru obţinerea unor rezultate într-un interval de timp acceptabil.

MapReduce se bazează pe împărţirea procesării în 2 etape: map şi reduce, fiecare primind ca date de intrare o pereche cheie-valoare (al căror tipuri poate fi stabilită de programator) şi întorcând ca rezultat tot o pereche cheie-valoare.

Alternativ, se mai poate defini şi o funcţie de combinare (fiind de fapt o implementare a funcţiei de reducere), cu excepţia faptului că tipurile de intrare sunt aceleaşi ca ale funcţiei de reducere iar tipurile de ieşire sunt aceleaşi ca ale funcţiei de mapare.

combine: (K2, list(V2)) → list(K2, V2)

De cele mai multe ori, funcţiile de combinare şi reducere se confundă.

Valorile care sunt procesate de programul MapReduce sunt partiţionate în funcţie de cheile care le caracterizează şi distribuite nodurilor care aplică funcţia de mapare, în urma căreia se generează o listă (intermediară) de valori, fiecare dintre ele având asociată şi o cheie. Acestea sunt sortate şi grupate în funcţie de cheie (toate valorile care au aceeaşi cheie sunt concatenate într-o singură listă), astfel încât funcţia de reducere primeşte mai puţine perechi cheie-valoare (pentru fiecare cheie unică există o listă de valori generate anterior) obţinându-se o listă de rezultate compusă din perechi chei-valoare.

map: (K1, V1) → list(K2, V2)

reduce: (K2, list(V2)) → list(K3, V3)

Gestiunea Sarcinilor Map/Reduce în Implementări ale Framework-ului Hadoop

Framework-ul Hadoop plasează sarcinile (în urma partiţionării) pe nodurile care găzduiesc segmentele de date care trebuie să fie procesate de acea sarcină specifică (codul este mutat acolo unde se găsesc datele).

Fluxurile de intrare şi ieşire sunt coordonate de framework-ul Hadoop, acesta realizând şi sincronizarea (prin intermediul unei bariere) înainte de procesul de sortare şi grupare prin care datele intermediare sunt distribuite către alte maşini.

Erorile sunt considerate un eveniment probabil să se întâmple, astfel că sarcinile care se găsesc într-o astfel de situaţie vor fi executate din nou, însă pe alte maşini.

Execuţia unei sarcini MapReduce se realizează prin apelul metodei submit() pe un obiect de tip Job care abstractizează procesarea ce se doreşte a fi realizată.

Alternativ, poate fi apelată metoda waitForJobCompetion() care transmite sarcina dacă aceasta nu a fost transmisă deja, aşteptând să fie terminată.

Implementarea aplicaţiei utilizând această paradigmă de programare depinde de parametrii de configurare specificaţi: mapred.job.tracker este folosit pentru versiunile Hadoop mai vechi, folosind un mecanism MapReduce “clasic”, în timp ce mapreduce.framework.name reprezintă proprietatea utilizată pentru versiunile Hadoop mai noi, care implementează YARN.

Valorile posibile pentru proprietatea mapreduce.framework.name sunt:
  • local (în caz că se doreşte ca sarcina să fie rulată local, în cadrul unei singure maşini virtuale Java);
  • classic (pentru implementarea MapReduce “clasică”, folosind procesele JobTracker şi TaskTrackers);
  • yarn (pentru implmentarea ce utilizează YARN).

În cazul implementării MapReduce “clasice” sunt implicate mai multe entităţi:

  • clientul, care transmite sarcina de tip MapReduce;
  • un proces pentru monitorizarea sarcinii (eng. Job Tracker) care coordonează rularea acesteia;
Procesul de monitorizare a sarcinii este o aplicaţie Java a cărei clasă principală este JobTracker.
  • mai multe procese pentru monitorizarea părţilor în care a fost împărţită sarcina (eng. Task Tracker);
Procesul de monitorizare a unei părţi a sarcinii este tot o aplicaţie Java a cărei clasă principală este TaskTracker.
  • sistemul distribuit de fişiere (de obicei HDFS), utilizat pentru partajarea fişierelor între aceste entităţi.

Etapele urmate la crearea unei sarcini de tip MapReduce sunt următoarele:

1. metoda submit() apelată pe obiectul de tip Job creează o instanţă a unui obiect de tip JobSummiter pentru care apelează metoda submitJobInternal(); ulterior, metoda waitForJobCompletion() verifică progresul sarcinii o dată pe secundă, raportând starea acesteia (în situaţia în care s-a modificat de la ultimul raport); când sarcina este încheiată cu succes se afişează statisticile sarcinii, altfel este afişată eroarea care a determinat eşecul sarcinii.

2. procesul de transmitere al sarcinii (reprezentat de obiectul instanţă al clasei JobSummitter) solicită procesului de urmărire al sarcinii (JobTracker) un nou identificator pentru sarcina respectivă (prin apelul metodei getNewJobId()); ulterior, sunt verificate specificaţiile privind datele de ieşire şi datele de intrare pentru sarcina în cauză; dacă directorul unde trebuie plasate datele de ieşire nu a fost specificat sau există deja, respectiv nu pot fi determinate partiţiile pentru datele de intrare (datorită faptului că acestea nu au fost specificate sau nu există) sarcina nu este transmisă, generându-se în schimb o eroare; dacă specificaţiile privind operaţiile de intrare/ieşire sunt corecte, sunt determinate partiţiile care vor fi prelucrate de fiecare parte a sarcinii;

3. procesul de transmitere al sarcinii copiază din sistemul distribuit de fişiere resursele necesare pentru rularea sarcinii (inclusiv fişierul .jar care conţine codul corespunzător sarcinii, fişierul de configurare şi partiţiile datelor de intrare) în sistemul de fişiere al procesului de urmărire al sarcinii, într-un director având numele dat de identificatorul sarcinii;

Arhiva .jar conţinând codul corespunzător sarcinii este copiat folosind un factor de replicare foarte mare (indicat de proprietatea mapred.submit.replication a cărei valoare implicită este 10), astfel încât vor exista suficient de multe copii în cadrul clusterului ce vor putea fi accesate de procesele ce prelucrează porţiuni ale sarcinii atunci când se află în execuţie.

4. procesul de transmitere al sarcinii informează procesul de urmărire a sarcinii de faptul că aceasta poate fi executată, prin apelul metodei submitJob() pe obiectul instanţă a clasei JobTracker;

5. atunci când obiectul instanţă a clasei JobTracker primeşte un apel al metodei submitJob(), pune sarcina respectivă într-o coadă internă de unde îl va prelua planificatorul de sarcini pentru a-l iniţializa; procesul de iniţializare presupune crearea unui obiect reprezentând sarcina care va fi executată, conţinând partiţiile care vor fi rulate pe noduri diferite, precum şi informaţii referitoare la progresul acestora;

6. pentru a crea lista partiţiilor sarcinii, planificatorul de sarcini primeşte datele de intrare, defalcate pe secţiuni, aşa cum au fost împărţite de client în urma primirii acestora de la sistemul distribuit de fişiere; pentru fiecare secţiune este creată o partiţie a sarcinii (operaţie de tip map), în timp ce numărul de operaţii de tip reduce este determinat din proprietatea mapred.reduce.task a obiectului Job (fiind stabilit de metoda setNumReduceTasks()) creându-se un număr corespunzător de procese care le vor executa; fiecare partiţie a sarcinii (de tip map sau reduce) va putea fi urmărită printr-un identificator unic; în plus, vor mai fi create încă două procese, pentru configurarea (eng. setup), respectiv eliberarea resurselor (eng. cleanup) asociate partiţiei sarcinii, ele fiind rulate înainte, respectiv după ce codul asociat partiţiei sarcinii este executat; pentru fiecare sarcină există un obiect de tip OutputCommitter ce determină codul corespunzător fiecărei sarcini în parte;

În mod implicit, obiect OutputCommitter are tipul FileOutputCommitter; în cazul procesului de configurare, acesta va crea directorul în care vor fi plasate datele de ieşire precum şi directoarele de lucru (temporare), în timp ce pentru procesul de eliberare al resurselor, acesta va şterge directoarele de lucru (temporare), transmiţând în acelaşi timp rezultatul sarcinii, ceea ce pentru procesele care realizează operaţii pe fişiere se traduce în scrierea datelor de ieşire la locaţia corespunzătoare sarcinii respective (în cazul în care este activată execuţia speculativă, se asigură faptul că vor fi transmise numai rezultatele provenind de la sarcinile duplicate, celelalte fiind ignorate).

7. procesele de urmărire a partiţiilor în care este împărţită o sarcină rulează o buclă care transmite mesaje periodice (eng. heartbeat) procesului ce urmăreşte sarcina prin care acesta este anunţat asupra stării sale de funcţionare; informaţiile cuprinse în aceste mesaje sunt disponibilitatea de a rula o nouă partiţie a sarcinii, caz în care, ca răspuns la acest mesaj, procesul de urmărire a sarcinii îi va aloca una, după o selecţie corespunzătoare, potrivit algoritmului de planificare pe care îl implementează; iniţial este selectată sarcina de unde va fi aleasă o partiţie pentru a fi repartizată procesului de monitorizare corespunzător, algoritmul implicit bazându-se pe asocierea unor priorităţi pentru fiecare sarcină în parte, de unde vor fi alese apoi partiţiile; fiecare proces pentru monitorizarea partiţiei unei sarcini dispune de un număr fix de sarcini de tip map respectiv de tip reduce pe care le pot rula simultan; în distribuirea sarcinilor de tip map se are în vedere localizarea (în reţea) nodului pe care rulează procesul ce urmează să realizeze procesarea, alegându-se unele pentru care setul de date se găseşte cât mai aproape; o astfel de constrângere nu este luată în calcul pentru sarcinile de tip reduce;

Poziţiile corespunzătoare sarcinilor de tip map vor fi completate înaintea poziţiilor aferente sarcinilor de tip reduce (cu alte cuvinte, pentru un proces de urmărire a partiţiilor unei sarcini, atâta timp cât există poziţii libere pentru sarcini de tip map, nu vor fi completate poziţiile alocate sarcinilor de tip reduce).
Valoarea numărului de sarcini care pot fi rulate în paralel este dată de capacitatea de procesare a maşinii pe care rulează procesul respectiv cât şi de memoria respectivă.
Ideal este ca datele să se găsească cât mai aproape de nodul care urmează să realizeze procesarea, pe acelaşi maşină sau pe acelaşi rastel. Nu întotdeauna o astfel de constrângere poate fi satisfăcută, într-o astfel de situaţie înregistrându-se o creştere a traficului utilizat în reţeaua de calculatoare.

8. după ce procesului de monitorizare a partiţiei sarcinii i-a fost alocată sarcina pe care trebuie să o ruleze, acesta îşi va copia arhiva .jar conţinând codul care va fi executat din contextul sistemului distribuit de fişiere în sistemul de fişiere local precum şi alte fişiere necesare din zona de memorie tampon distribuită; ulterior, creează un director temporar pentru sarcină, dezarhivând conţinutul fişierului .jar, acesta fiind rulat în contextul unui obiect de tipul TaskRunner, instanţiat în acest scop;

9. obiectul de tipul TaskRunner lansează o maşină virtuală Java nouă;

10. fiecare sarcină va fi rulată în contextul acestei maşini virtuale Java, astfel că orice erori sau funcţiile de tip map/reduce definite de utilizator nu vor afecta procesul de monitorizare a partiţiei sarcinii, implicând comportamente nedorite (încetarea funcţionării); fiecare partiţie a sarcinii comunică cu procesul părinte prin interfaţa ombilicală, informându-l de progresul său la fiecare câteva secunde până când aceasta este terminată.

Există posibilitatea ca aceeași maşină virtuală Java să fie refolosită între mai multe partiţii ale sarcinii.

Întrucât sarcinile de tip MapReduce durează o perioadă mai mare de timp, este important ca utilizator să aibă informaţii cu privire la progresul acestora. Astfel, fiecare sarcină şi partiţiile sale sunt caracterizate printr-o stare (în execuţie, terminată cu succes, eşuată din cauza unei erori), progresul operaţiilor de tip map şi reduce, valorile contoarelor asociate sarcinii în cauză precum şi un mesaj (descriere) asociat stării respective. Progresul, adică proporţia din sarcină care a fost executată deja (dacă poate fi determinată), este comunicată periodic clientului.

Un progres înregistrat de o partiţie a sarcinii este transmisă procesului care o monitorizează prin intermediul unui semafor (eng. flag). Acesta e verificat de un fir de execuţie dedicat la fiecare trei secunde şi în condiţiile în care valoarea sa o impune, procesul de monitorizare a partiţiei sarcinii este informat de progresul realizat. De asemenea, în mesajele transmise către procesul de monitorizare al sarcinii sunt transmise şi date cu privire la progresul înregistrat de toate partiţiile sarcinii monitorizate în mod curent.

De regulă, astfel de mesaje de semnalizare sunt transmise o dată la fiecare cinci secunde, însă este posibil ca pentru clustere de dimensiuni mai mari, această perioadă să fie mai mare de vreme ce mesajele ocupă lăţime de bandă în cadrul reţelei de calculatoare care conectează nodurile.

Valorile referitoare la progres sunt agregate de procesul de monitorizare al sarcinii, rezultând o viziune de ansamblu asupra tuturor sarcinilor rulate la un anumit moment de timp şi asupra partiţiilor constituente. Ulterior obiectul de tip Job primeşte cel mai recent statut interogând procesul de monitorizare al sarcinii în fiecare secundă.

Clienţii pot folosi și metoda getStatus() a obiectului de tip Job, obţinând o instanţă a unui obiect de tip JobStatus, care conţine toate informaţiile cu privire la starea sarcinii.

Când procesul de monitorizare a sarcinii primeşte notificări provenind de la toate partiţiile sarcinii cu privire la încheierea acestora (inclusiv sarcina pentru eliberarea resurselor), va modifica statutul asociat acesteia ca fiind “terminată cu succes”, acesta fiind transmis şi obiectului de tip Job, care informează la rândul său utilizatorul cu privire la acest fapt, părăsind metoda waitForCompletion(). Toate statisticile şi contoarele cu privire la sarcină sunt afişate la consolă în acest moment. La final, procesul de monitorizare a sarcinii eliberează toate resursele utilizate temporar instruind şi procesele care gestionează partiţiile sarcinii să acţioneze în acelaşi sens.

Procesul de monitorizare a sarcinii poate să transmită o notificare cu privire la starea acesteia şi prin HTTP, în situaţia în care este configurată proprietatea job.end.notification.url de către clienţii care doresc să primească astfel de informaţii prin intermediul unor apeluri inverse (eng. callback).

Întrucât pentru clustere de dimensiuni foarte mari (peste 4000 de noduri) sistemul MapReduce “clasic” are probleme de scalabilitate, a fost implementat mecanismul YARN (Yet Another Resource Negociator / YARN Application Resource Negociator) care rezolvă astfel de inconveniente, separând responsabilităţile procesului de monitorizare a sarcinilor (planificarea sarcinii cât şi monitorizarea progresului realizat de partiţiile asociate acesteia) în două procese independente:

  • un proces pentru gestiunea resurselor în cadrul clusterului;
  • un proces pentru gestiunea aplicaţiilor pe parcursul ciclului lor de viaţă.
Pentru fiecare instanţă a unei aplicaţii există un proces de gestiune dedicat, care rulează atâta vreme cât aceasta se află în execuţie.

Aceste procese negociază căror noduri (containere) le vor fi alocate anumite programe, respectându-se constrângerile cu privire la capacitatea lor de procesare şi referitoare la memoria disponibilă. Verificarea acestor restricţii este realizată de procesul de gestiune al nodului, ce asigură faptul că nu sunt utilizate mai multe resurse decât au fost alocate.

Planificarea sarcinii se referă la asocierea partiţiilor asociate acestora unor anumite procese folosind nişte mecanisme de prioritate.
Monitorizarea progresului realizat de partiţiile asociate unei sarcini implică păstrarea evidenţei acestora, cu repornirea sarcinilor care nu au generat erori sau a celor care progresează într-un ritm necorespunzător şi determinarea statisticilor cu privire la progresul lor, atâta timp cât se află în execuţie.

YARN este mai generic decât MapReduce, aceasta reprezentând doar un tip de aplicaţie ce poate fi rulată folosind acest mecanism, flexibilitatea sa constând în coexistenţa (în cadrul aceluiaşi cluster) al unor programe având tipuri diferite (de exemplu MPI sau un shell distribuit), ceea ce implică beneficii pentru gestiunea şi utilizarea resurselor respective. De asemenea, utilizatorii pot rula diferite versiuni ale MapReduce, astfel că actualizarea de la o versiune la alta se poate realiza foarte uşor.

Entităţile implicate la rularea unei aplicaţii de tip MapReduce folosind YARN sunt:

  • clientul, care transmite sarcina de tip MapReduce;
  • procesul pentru gestiunea resurselor (eng. Resource Manager) care coordonează alocarea resurselor de procesare pe cluster;
  • procesele pentru gestiunea nodurilor (eng. Node Manager) care lansează în execuţie şi monitorizează containerele în cadrul maşinilor din cluster;
  • procesul de gestiune a aplicaţiilor (eng. Application Master) care va coordona partiţiile sarcinilor;
Atât procesul de gestiune a aplicaţiilor cât şi partiţiile sarcinii MapReduce rulează în cadrul unor containere care sunt alocate de procesul de gestiune al resurselor, fiind monitorizate de procesele de gestiune a nodurilor.
  • sistemul distribuit de fişiere (de obicei HDFS), utilizat pentru partajarea fişierelor între aceste entităţi.

Etapele urmate la crearea unei sarcini de tip MapReduce în cadrul YARN sunt următoarele:

1. transmiterea sarcinii se realizează folosind acelaşi API ca în cazul mecanismului MapReduce clasic, deşi există şi o implementare a ClientProtocol care este activată atunci când proprietatea mapreduce.framework.name are valoarea yarn în care procesul de transmitere a sarcinii este destul de similar cu cel al mecanismului MapReduce clasic;

2. identificatorul sarcinii este transmis de la procesul de gestiune a resurselor (faţă de cazul mecanismului MapReduce clasic în care acesta era primit de la procesul de monitorizare a sarcinii);

În terminologia mecanismului YARN, pentru o sarcină se foloseşte conceptul de aplicaţie.

3. clientul verifică specificaţiile sarcinii în privinţa locaţiei unde urmează să fie plasate rezultatele şi calculează partiţiile datelor de intrare, copiind resursele specifice sarcinii (fişierul .jar care conţine codul corespunzător sarcinii, fişierul de configurare dar şi partiţiile datelor de intrare) în sistemul distribuit de fişiere HDFS;

Opţiunea yarn.app.mapreduce.am.compute-splits-in-cluster face ca partiţiile datelor de intrare să fie calculate în cadrul clusterului, ceea ce este util mai ales în cazul sarcinilor care au un număr mare de partiţii ale datelor de intrare.

4. sarcina este transmisă prin apelul metodei submitApplication() în contextul procesului de gestiune a resurelor;

5. când procesul de gestiune al resurselor primeşte acest apel, transmite mai departe această cerere către procesul de planificare al sarcinilor care îi alocă un container astfel încât procesul specific aplicaţiei va fi lansat (de către procesul de gestiune al resurselor) în acest context, fiind supervizat de către procesul de gestiune a nodului respectiv;

6. procesul specific al aplicaţiei pentru sarcini de tip MapReduce este de fapt un program Java având clasa principală MRAppMaster; acesta iniţializează sarcina în cauză creând un număr de obiecte care vor monitoriza progresul sarcinii, primind rapoarte în acest sens de la partiţiile în care va fi împărţită sarcina;

7. procesul specific al aplicaţiei preia partiţiile datelor de intrare determinate de client din cadrul sistemului distribuit de fişiere, creând apoi o partiţie a sarcinii (aplicaţie de tip map) pentru fiecare dintre acestea dar şi un număr de procese de tip reduce (conform proprietăţii mapreduce.job.reduce); ulterior, procesul specific al aplicaţiei decide modul în care va rula partiţiile care constituie sarcina de tip MapReduce: dacă aceasta este de dimensiuni mici, partiţiile vor fi rulate în aceeaşi maşină virtuală Java ca şi el însuşi - o astfel de sarcină se spune că rulează ca o super-partiţie; o sarcină este considerată ca fiind de dimensiuni mici dacă poate fi partiţionată în mai puţin de 10 procese de tip map şi un singur proces de tip reduce, iar dimensiunea fişierului de intrare este mai mică decât un bloc HDFS; înainte ca orice partiţie a sarcinii să fie rulată, trebuie apelată metoda de configurare (o instanţă a obiectului de tip OutputCommitter corespunzător sarcinii) pentru a se crea directorul în care vor fi plasate rezultatele acesteia;

Decizia cu privire la rularea partițiilor sarcinii în aceeași mașină virtuală cu procesul specific al aplicației este luată ţinându-se cont de faptul că supraîncărcarea generată de alocarea şi rularea sarcinilor în alte containere depăşeşte beneficiul obţinut din rularea acestora în paralel (comparativ cu rularea lor secvenţială, în cadrul aceluiaşi nod). Un astfel de comportament este diferit de mecanismul MapReduce clasic, în care sarcinile de dimensiuni mici nu sunt rulate niciodată în contextul unui singur proces de monitorizare a partiţiilor sarcinii.
Posibilitatea de a rula o sarcină ca o super-partiție poate fi dezactivată prin proprietatea mapreduce.job.ubertask.enable care va avea valoarea false.
Caracteristicile unei sarcini de dimensiuni mici pot fi specificate prin intermediul proprietăţilor:
  • mapreduce.job.ubertask.maxmaps, respectiv mapreduce.job.ubertask.maxreduces, care indică numărul de procese;
  • mapreduce.job.ubertask.maxbytes, care specifică dimensiunea unui bloc.
Spre diferenţă de implementarea MapReduce clasică în care metoda de configurare este apelată într-un proces special rulat de procesul de monitorizare a partiţiilor sarcinilor, în cazul implementării YARN, aceasta este apelată direct de către procesul specific al aplicaţiei.

8. în cazul în care sarcina nu întruneşte condiţiile pentru a rula ca super-partiţie, procesul specific al aplicaţiei solicită containere pentru toate procesele de tip map şi reduce din cadrul sarcinii de la modulul de gestiune a resurselor; cererile sunt incluse în cadrul rapoartelor periodice, conţinând informaţii cu privire la locaţia unde se găsesc informaţiile corespunzătoare fiecărei partiţii de tip map (nodurile şi rastelurile pe care se găsesc acestea), acestea fiind folosite de procesul de planificare în luarea deciziilor, încercându-se plasarea sarcinilor aferente în cadrul aceleiaşi maşini ca şi datele sau, în situaţia în care nu este posibil, cel puţin pe acelaşi rastel;

În cadrul cererilor pentru containere se specifică de asemenea informaţii cu privire la memoria necesară; implicit, atât proceselor de tip map cât şi proceselor de tip reduce li se alocă 1024 MB de memorie, însă această valoare poate fi configurată prin intermediul proprietăţilor mapreduce.map.memory.mb, respectiv mapreduce.reduce.memory.mb. Modul în care este alocată memoria este diferit de mecanismul MapReduce tradițional, în care procesele de monitorizare a partiţiilor sarcinii dispuneau de un număr fix de sarcini de tip map sau de tip reduce pe care le puteau rula în paralel, stabilit la momentul configurării clusterului; fiecare sarcină putea dispune de o valoare maximă a memoriei (de asemenea având o valoare fixă pentru un cluster), existând probleme atât dacă aceasta este subutilizată (întrucât alte partiţii ale sarcinii, aflate în aşteptare, nu pot utiliza memoria nefolosită), cât şi dacă este suprautilizată (conducând la eşuarea sarcinii, de vreme ce aceasta nu dispune de memorie suficientă pentru a putea fi terminată). O astfel de problemă este rezolvată în YARN prin faptul că aplicaţiile pot solicita o anumită valoare a memoriei, ce se află între o valoare minimă şi o valoare maximă (cu condiţia de a fi un multiplu al valorii minime care poate fi alocată). Fiecare proces de planificare are valori implicite de alocare a memoriei, valoarea minimă fiind de 1024 MB (stabilită de yarn.scheduler.capacity.minimum-allocation-mb), iar valorea maximă de 10240 MB (stabilită de yarn.scheduler.capacity.maximum-allocation-mb). Astfel, partiţiile sarcinilor pot solicita orice alocare de memorie cuprinsă între 1GB şi 10GB, însă ca multiplu de 1GB (procesul de planificare a sarcinii va realiza rotunjiri la cea mai apropiată valoare, dacă este necesar).

9. după ce unei partiţii a sarcinii i-a fost alocat un container de către procesul de planificare a modulului de gestiune a resurselor, procesul specific al aplicaţiei porneşte acest container, contactând procesul de gestiune a nodului;

10. partiţia sarcinii este executată de o aplicaţie Java a cărei clasă principală este YarnChild; înainte de a rula partiţia sarcinii, va localiza resursele de care aceasta are nevoie (inclusiv configuraţia sarcinii şi arhiva .jar conţinând codul propriu-zis al sarcinii precum şi alte fişiere din cadrul zonei tampon de memorie aflate în cadrul sistemului distribuit de fişiere);

Aplicaţia YarnChild rulează într-o maşină virtuală Java dedicată, din acelaşi motiv pentru care în cazul mecanismului MapReduce “clasic” procesele de monitorizare a partiţiei sarcinii creează noi maşini virtuale Java pentru a rula aplicaţiile corespunzătoare – izolarea codului utilizator de firele de execuţie ale sistemului de operare care rulează în fundal, având asociate sarcini care durează foarte mult. Spre diferenţă de implementarea MapReduce “clasică”, YARN nu suportă reutilizarea maşinilor virtuale Java, astfel încât fiecare partiţie a sarcinii va rula într-o nouă maşină virtuală Java.

11. containerul rulează partiţia sarcinii (de tip map sau reduce).

În cazul Yarn, partiţia sarcinii îşi raportează starea şi progresul (inclusiv valoarea contoarelor) la procesul specific al aplicaţiei, care dispune astfel de o viziune de asamblu a sarcinii.

Astfel de rapoarte sunt transmise la fiecare trei secunde, prin intermediul interfeţei ombilicale. Procesul este mai simplu decât în cazul implementării MapReduce “clasice” în care actualizările cu privire la progres sunt transmise prin procesele de monitorizare a partiţiilor sarcinilor către procesul de monitorizare a sarcinii care deţine viziunea de ansamblu asupra acesteia.

La rândul său, procesul specific al aplicației e interogat de client la fiecare secundă (sau în intervalul stabilit de proprietatea mapreduce.client.progressmonitor.pollinterval). Totodată, interfaţa grafică a procesului de gestiune a resurselor afişează toate aplicaţiile care se află în execuţie, împreună cu legături către interfeţele grafice ale proceselor specifice aplicaţiilor respective, care conţin detalii despre sarcina MapReduce, inclusiv progresul acesteia.

În cazul implementării MapReduce “clasice”, lista sarcinilor aflate în execuţie precum şi progresul lor putea fi consultat prin intermediul interfeţei grafice puse la dispoziţie de procesul de monitorizare a sarcinilor.

În acelaşi mod în care clientul interoghează procesul specific al aplicaţiei pentru a determina progresul acesteia, acesta verifică (la fiecare cinci secunde) dacă sarcina a fost terminată, printr-un apel al metodei waitForCompletion() pe obiectul de tip Job. Există posibilitatea ca notificarea cu privire la terminare să se realizeze prin apeluri inverse folosind protocolul HTTP, însă acestea sunt iniţiate de procesul specific al aplicaţiei.

Intervalul la care este verificată terminarea sarcinii poate fi stabilit prin intermediul proprietăţii mapreduce.client.completion.pollinterval, în fişierul de configurare.

La terminarea sarcinii, procesul specific al aplicaţiei şi containerele în care sunt rulate partiţiile acesteia eliberează resursele utilizate, printr-un apel al metodei corespunzătoare din cadrul obiectului OutputCommitter. Informaţiile cu privire la sarcină sunt arhivate de către serverul ce reţine istoricul acestora, pentru a permite interogări ulterioare din partea clienţilor, în caz de nevoie.

Tratarea Erorilor

Unul dintre principalele avantaje ale Hadoop constă în posibilitatea de recuperare din eroare, situaţie întâlnită cu o frecvenţă destul de mare şi datorată greşelilor din codul sursă al utilizatorilor, blocajelor în cadrul proceselor şi defectării maşinilor pe care rulează acestea.

În cazul implementării MapReduce “clasice” defectele se pot produce la nivelul:

  1. unei partiţii a sarcinii;
  2. procesului ce o monitorizează;
  3. procesului care monitorizează sarcina la nivel global.

a) Cel mai frecvent, erorile de la nivelul unei partiţii a sarcinii se datorează greşelilor din codul sursă, generându-se o excepţie la rulare care este raportată procesului care o monitorizează, aceasta fiind înregistrată şi în cadrul jurnalelor. Procesul de monitorizare a partiţiei sarcinii marchează această încercare ca fiind eşuată, eliberând poziţia ocupată de aceasta pentru ca în cadrul acesteia să fie rulată o altă partiţie a sarcinii. În cazul în care maşina virtuală Java îşi încetează activitatea brusc (datorită unor probleme ale acesteia pentru setul de date al partiţiei respective a sarcinii), procesul de monitorizare va detecta faptul că partiţia sarcinii nu mai rulează şi marchează încercarea de execuţie ca fiind eşuată. În situaţia în care partiţiile sarcinilor sunt suspendate în execuţie, procesul de monitorizare detectează că nu a primit o actualizare a progresului pentru o perioadă de timp mai îndelungată, declanşând procesul de marcare a încercării de rulare a partiţiei sarcinii ca eşuată şi determinând închiderea maşinii virtuale Java care o rula.

Perioada de timp după care partiţiile sarcinilor sunt considerate eşuate e de obicei 10 minute, putând fi configurată (pentru fiecare sarcină în parte sau pentru fiecare cluster în parte) prin intermediul proprietăţii mapred.task.timeout a cărei valoare este exprimată în milisecunde. Indicarea unei valori nule pentru această proprietate dezactivează mecanismul de expirare, astfel că sarcinile a căror execuţie durează mai mult nu vor fi marcate niciodată ca fiind eşuate. Într-un astfel de caz, o partiţie a sarcinii suspendată în execuţie nu îşi va elibera niciodată poziţia, existând posibilitatea de a se înregistra încetinerea globală a cluster-ului. De aceea, acest comportament trebuie evitat, asigurându-se că partiţiile sarcinilor îşi raportează progresul.

Când procesul de monitorizare a partiţiei sarcinii este notificat de eşecul unei încercări (prin intermediul rapoartelor periodice), va replanifica execuţia ei, încercându-se să se evite situaţia în care rularea va fi realizată în contexul unui proces de monitorizare în care a eşuat anterior. Mai mult, în cazul în care o partiţie a sarcinii eşuează de mai mult de patru ori, ea nu va fi reexecutată. Implicit, dacă orice partiţie a sarcinii eşuează de mai mult de patru ori (sau de valoarea maximă configurată pentru numărul de încercări), întreaga sarcină va eşua. Pentru unele aplicaţii, nu este de dorit ca sacina să eşueze din cauza unor partiţii ale sale care au înregistrat erori, întrucât este posibil ca rezultatele furnizate de partiţiile terminate cu succes să poată fi folosite, motiv pentru care se poate configura proporția de partiţii ale sarcinii care pot să eşueze fără a declanşa eşuarea sarcinii. O partiţie a sarcinii poate fi oprită în mod forţat, însă astfel de încercări nu sunt contorizate în numărul maxim de lansare în execuţie, de vreme ce eroarea nu s-a produs din cauza partiţiei sarcinii. Şi utilizatorii au posibilitatea de a termina încercări ale partiţiilor sarcinilor (sau chiar sarcina în totalitate) folosind interfaţa grafică sau linia de comandă.

Numărul maxim de încercări de a executa o sarcină este dat de proprietatea mapred.map.max.attempts pentru partiţii ale sarcinii de tip map, respectiv de proprietatea mapred.reduce.max.attempts pentru partiţii ale sarcinii de tip reduce.
Procentul de partiţii de tip map respectiv de tip reduce care pot eșua sunt controlate independent, folosind proprietăţile mapred.max.map.failures.percent, respectiv mapred.max.reduce.failures.percent.
O încercare a partiţiei sarcinii poate fi oprită datorită faptului că este un duplicat speculativ sau datorită faptului că procesul său de monitorizare a eşuat, astfel încât procesul de monitorizare a sarcinilor va termina în mod forţat toate partiţiile ce rulează în contextul său.

b) Erorile produse la nivelul procesului de monitorizare a partiţiei sarcinii reprezintă altă situaţie care determină eşecul sarcinii. Într-un astfel de caz, procesul de monitorizare a sarcinii va detecta că acesta nu mai funcţionează dacă mesajele periodice întârzie de a fi transmise mai mult de 10 minute, eliminându-l din lista sa, planificând partiţiile sarcinilor care au rulat pe acesta pe alte procese de monitorizare, chiar în situaţia în care au fost terminate cu succes, dacă aparţin unor sarcini incomplete (întrucât valorile de ieşire intermediare se găsesc în sistemul de fişiere al procesului de monitorizare defect astfel încât pot să nu fie accesibile). Un proces de monitorizare a partiţiei sarcinii poate fi inclus de asemenea într-o “listă neagră” de către procesul de urmărire a sarcinii, dacă mai mult de patru partiţii din cadrul aceleiaşi sarcini au eşuat rulând pe acesta. Unui astfel de proces nu i se vor mai repartiza partiţii ale sarcinii, continuând însă să comunice cu procesul de monitorizare a sarcinii. Există implementat un mecanism de expirare a erorilor (cu o rată de una pe zi), astfel încât procesele de monitorizare a partiţiei sarcinii pot fi scoase din cadrul “listei negre” doar prin faptul că rulează în continuare. În situaţia în care erorile au fost produse de o situaţie care poate fi remediată, acesta va fi eliminat din “lista neagră” după ce este repornit şi reintegrat în cadrul clusterului.

Defecţiunile ce se pot înregistra la un proces de monitorizare a partiției sarcinii vizează fie terminarea (sau blocarea sa), fie funcţionarea sa foarte lentă, ceea ce determină ca mesajele periodice să nu mai fie transmise deloc sau să fie transmise mai rar.
Valoarea ce indică perioada de timp maximă în care se așteaptă mesajele periodice înainte ca procesul de monitorizare a sarcinii să constate o disfuncționalitate la nivelul procesului de monitorizare a pertiției sarcinii poate fi configurată prin intermediul proprietăţii mapred.task.tracker.expiry.interval, care specifică o valoare exprimată în milisecunde.
Valoarea care indică numărul de partiții al unei sarcini care au înregistrat erori în cadrul aceluiași proces de monitorizare (fiind incluse pe “lista neagră”) poate fi modificată prin intermediul proprietăţii mapred.max.tracker.blacklists, acesta fiind un prag mai mare decât media de erori înregistrate de procesele de monitorizare pentru partiţiile sarcinii din cadrul unui cluster.

c) Producerea unei erori la nivelul procesului de monitorizare a sarcinii reprezintă situaţia cea mai gravă care poate conduce la eşecul rulării sarcinii. Fiind singurul punct vulnerabil din cadrul arhitecturii, Hadoop nu dispune de un mecanism de recuperare dintr-o astfel de situaţie, astfel încât toate sarcinile care rulează la un moment dat vor eşua. După repornirea procesului de monitorizare a sarcinii, toate sarcinile aflate în execuţie la momentul producerii erorii respective vor trebui să fie retransmise.

Probabilitatea producerii unei astfel de situații este destul de mică (de vreme ce probabilitatea defectării unei anumite maşini din cadrul clusterului este mică), această problemă fiind rezolvată de implementarea YARN, al cărei scop a fost eliminarea punctelor de vulnerabilitate identificate în mecanismul MapReduce.
Există şi posibilitatea de a încerca recuperarea sarcinilor care rulau la momentul opririi procesului de monitorizare (prin proprietatea mapred.jobtracker.restart.recover), însă au fost raportate anumite probleme în funcţionarea sa, astfel încât această opţiune nu ar trebui folosită.

În cazul mecanismului YARN, la rularea unor aplicaţii MapReduce se pot produce erori la nivelul:

  1. partiţiei sarcinii;
  2. procesului specific al aplicaţiei;
  3. procesului de gestiune a nodului;
  4. procesului de gestiune al resurselor.

a) Erorile produse pentru o partiţie a sarcinii sunt tratate într-un mod asemănător cu cazul MapReduce “clasic”. Excepţiile la rulare precum şi terminarea bruscă a maşinii virtuale Java sunt propagate până la procesul specific al aplicaţiei, şi încercarea de rulare a partiţiei sarcinii este marcată ca fiind eşuată. De asemenea, procesele a căror execuţie durează prea mult (care sunt blocate) sunt detectate prin absenţa transmiterii unui mesaj periodic (ping) prin interfaţa ombilicală. Se pot folosi aceleaşi proprietăţi de configurare pentru a indica eşecul definitiv al unei partiţii a sarcinii sau al întregii aplicaţii.

Perioada la care este aşteptat un mesaj periodic (pinb) poate fi configurat prin intermediul proprietăţii mapreduce.task.timeout.
O partiţie a sarcinii este considerată eşuată după ce au fost raportate erori pentru un anumit număr de încercări: mapreduce.map.maxattempts pentru sarcini de tip map şi mapreduce.reduce.maxattempts pentru sarcini de tip reduce. De asemenea, o sarcină în ansamblu va fi marcată ca fiind eşuată în situaţia în care eşuează mai mult de un anumit procent de partiţii ale sale de tip map (indicat de mapreduce.map.failures.maxpercent) sau un anumit procent de partiţii ale sale de tip reduce (indicat de mapreduce.reduce.failures.maxpercent).

b) Şi în cazul aplicaţiilor care eşuează, sunt realizate mai multe încercări. Acestea sunt detectate în situaţia în care procesul specific al aplicaţiei nu mai transmite mesaje periodice către procesul de gestiune a resurselor, situaţie în care va fi creată o nouă instanţă a procesului specific al aplicaţiei, însă într-un alt container, supervizat de un proces de gestiune a nodului. Partiţiile sarcinii care au fost deja rulate pot fi recuperate în acest caz, astfel că nu este necesară execuţia lor din nou. Întrucât clientul solicită procesului specific al aplicaţiei rapoarte cu privire la progresul înregistrat, în situaţia în care acesta eşuează şi nu mai transmite nici o informaţie, clientul va interoga procesul de gestiune al resurselor pentru a primi adresa noii instanţe a procesului specific al aplicaţiei (aşa cum îi fusese transmisă şi la iniţializarea sarcinii, când a fost transmisă).

O aplicaţie este marcată ca fiind eşuată dacă se înregistrează erori o singură dată, însă această valoare poate fi modificată prin proprietatea yarn.resourcemanager.am.max-retries.
Recuperarea din eroare nu este activată în mod implicit (aceasta se face prin intermediul proprietăţii yarn.app.mapreduce.am.job.recovery.enable), astfel că procesul specific al aplicaţiei va rula din nou toate partiţiile sarcinilor, indiferent de starea anterioară.

c) Dacă se înregistrează o eroare la nivelul procesului de gestiune a nodului, acesta va înceta să mai transmită mesaje periodice către procesul de gestiune a resurselor, care îl va elimina din lista sa de noduri disponibile. Orice partiţie a sarcinii sau proces specific al aplicaţiei care rulează pe nodul marcat ca fiind eşuat vor fi recuperate. Procesele de gestiune a nodurilor pot fi incluse pe “lista neagră” în situaţia în care numărul de erori al aplicaţiilor rulate în contextul său este mare. Acestă operaţie este realizată de procesul specific al aplicaţiei (care va planifica partiţiile sarcinilor pe alte noduri) în cazul în care este depăşit un anumit prag minim de partiţii ale sarcinilor care înregistrează erori.

Proprietatea yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms (având implicit valoarea de 600000 milisecunde = 10 minute) determină perioada de timp (trecută de la ultimul mesaj periodic) pe care o aşteaptă procesul de gestiune a resurselor înainte de a considera procesul de gestiune a nodului ca fiind eşuat.
Întrucât nu procesul de gestiune al resurselor este cel care realizează marcarea unui nod ca făcând parte din “lista neagră”, este posibil ca partiţii ale noilor sarcini să fie planificate pe noduri care au fost marcate astfel de procesele specifice ale unor aplicaţii anterioare.
Implicit, un nod este inclus pe “lista neagră” dacă eşuează un număr mai mare de trei partiţii ale sarcinii rulate în contextul său, însă această valoare poate fi modificată prin intermediul proprietăţii mapreduce.job.maxtaskfailures.per.tracker.

d) Erorile produse la nivelul procesului de gestiune a resurselor produc situaţii destul de delicate întrucât în absenţa acestuia nu pot fi lansate nici sarcini şi nici containere ale partiţiilor acestora. Totuşi, el a fost proiectat astfel încât să se poată realiza recuperarea din eroare prin utilizarea unui mecanism ce utilizează puncte de control, astfel încât starea sa este reţinută pe disc. După ce procesul de gestiune al resurselor se opreşte din cauza unei erori, o nouă instanţă va fi lansată de un administrator, aceasta putând relua execuţia prin intermediul stării încărcate de pe disc, conţinând procesele de gestiune ale nodurilor din sistem precum şi aplicaţiile care rulează.

Mecanismul utilizat pentru stocarea pe disc de către procesul de gestiune a resurselor poate fi configurat prin proprietatea yarn.resourcemanager.store.class a cărei valoare implicită este manager.recovery.MemStore (starea este reţinută în memorie astfel încât nu este disponibilă pentru procese din alte maşini virtuale), însă vor fi realizate şi implementări care vor folosi ZooKeeper pentru a realiza recuperarea din eroare în cazul problemelor întâmpinate la nivelul procesului de gestiune a resurselor.
De remarcat faptul că partiţiile sarcinii nu sunt reţinute în starea procesului de gestiune a resurselor, de vreme ce sunt gestionate de procesul specific al aplicaţiei. Ca atare, informaţiile reţinute pe disc (descriind starea procesului de gestiune a resurselor) au o dimensiune mai redusă putând fi gestionate cu uşurinţă decât în cazul procesului de monitorizare a partiţiei sarcinii din cadrul implementării MapReduce “clasice”.

Politici pentru Planificarea Sarcinilor

În privinţa planificării sarcinilor, iniţial Hadoop avea o abordare prin care sarcinile erau rulate în ordinea în care erau transmise de utilizatori, fiecare dintre acestea folosind întregul cluster. Utilizarea unui cluster partajat ar fi oferit oportunitatea folosirii unor resurse de dimensiuni mari pentru mai mulţi utilizatori, însă se punea problema distribuirii echitabile a acestora. Ulterior, s-a introdus posibilitatea specificării unei priorităţi pentru o sarcină astfel încât procesul de planificare a sarcinilor o va alege pe cea cu prioritatea cea mai mare cu toate că nu este implementat dreptul de preempţiune pentru ele. Astfel, în cazul mecanismului de planificare FIFO, o sarcină având o prioritate mai mare poate fi blocată de o sarcină cu prioritate inferioară ei, având o durată de execuţie semnificativă, în situaţia în care a fost pornită anterior.

Specificarea priorității unei sarcini putea fi realizată prin intermediul proprietăţii mapred.job.priority sau prin metoda setJobPriority() pe obiectul JobClient (specificând una din valorile VERY_HIGH, HIGH, NORMAL, LOW sau VERY_LOW).

Curent, MapReduce implementează mai multe mecanisme de planificare. Implicit în MapReduce “clasic” este algoritmul bazat pe cozi de procese, însă există şi procese de planificare care să ruleze într-un mediu concurent, denumite mecanismul de planificare echitabilă (eng. Fair Scheduler) şi mecanismul folosind planificare pe bază de capacitate (eng. Capacity Scheduler).

Prin intermediul mecanismului de planificare echitabilă se doreşte să se dea fiecărui utilizator o şansă egală pentru a utiliza capacitatea clusterului de-a lungul timpului.

Utilizarea acestui mecanism de planificare se face prin plasarea .jar-ului corespunzător în directorul lib al Hadoop şi specificând pentru proprietatea mapred.jobtracker.taskScheduler valoarea org.apache.hadoop.mapred.FairScheduler. Acesta va funcţiona fără alte configurări, însă acestea sunt disponibile inclusiv prin intermediul unei interfeţe web, pentru a utiliza toate facilităţile de care dispune.

În situaţia în care rulează o singură sarcină, acesteia îi va fi alocat întregul cluster. Pe măsură ce sunt transmise mai multe sarcini, sunt distribuite resurse pentru partiţiile acestora, astfel încât fiecare să aibă acces, în mod echitabil, la aceleaşi mijloace ca şi ceilalţi utilizatori. Astfel, o sarcină de dimensiuni mai mici va fi terminată într-un timp optim în timp ce concomitent este rulată o sarcină a cărei execuţie durează mai mult timp, aceasta înregistrând, la rândul ei, anumite progrese.

Sarcinile fac parte dintr-un anumit grup, în mod implicit fiecărui utilizator fiindu-i repartizat un astfel de grup. În acest fel, un utilizator care transmite mai multe sarcini decât altul nu va primi mai multe resurse în cadrul clusterului. Totodată, există posibilitatea de a crea anumite grupuri care să aibă garantat un număr minim de resurse (partiţii ale sarcinii de tip map sau de tip reduce care vor rula), stabilind ponderi pentru fiecare dintre acestea. Acest mecanism implementează dreptul de preemţiune, astfel încât în situaţia în care un grup nu şi-a primit resursele necesare pentru o perioadă de timp, procesul de planificare va termina partiţii ale sarcinilor din grupuri care şi-au depăşit capacitatea, pentru a oferi acces la resursele cuvenite grupului care a primit mai puţine resurse. De asemenea, în cadrul aceluiaşi grup, sarcinile transmise de fiecare utilizator vor partaja în mod echitabil resursele de care dispune acesta.

Abordarea din cazul mecanismului folosind planificare pe bază de capacitate utilizează un număr de cozi pentru fiecare cluster, fiecare având alocată o anumită capacitate. Prin această caracteristică, mecanismul folosind planificare pe bază de capacitate se deosebeşte de mecanismul de planificare echitabilă, permiţând ca pentru fiecare utilizator să se specifice o anumită pondere din totalul clusterului care va fi folosită exclusiv de acesta. Ele pot fi organizate în mod ierarhic, astfel încât între ele se stabilesc relaţii de tip părinte-copil, fiind asemănătoare grupurilor. Fiecare coadă este asociată unui utilizator şi implementează în cadrul său un mecanism de planificare de tip FIFO , folosind priorităţi. Astfel, se permite utilizatorilor să îşi simuleze propriul cluster de tip MapReduce, în care aplicaţiile pe care le transmit vor fi executate în ordinea specificată şi în funcţie de priorităţile indicate.

Şi mecanismul de planificare echitabilă implementează posibilitatea ca în cadrul unui grup, sarcinile să fie planificare în funcţie de ordinea în care sunt transmise de către un utilizator, asemănându-se astfel cu mecanismul folosind planificare pe bază de capacitate.

Sistemul Distribuit de Fișiere HDFS

Sistemul de Gestiune pentru Baze de Date Distribuite HBase

API-ul Java pentru dezvoltarea de aplicaţii distribuite folosind framework-ul Hadoop

Dezvoltarea unei aplicaţii MapReduce folosind Hadoop presupune configurarea unui obiect de tip Job (prin specificarea formatului pe care îl au datele de intrare şi datele de ieşire şi prin indicarea claselor care implementează sarcinile de tip Map, respectiv Reduce - opţional, se poate specifica şi o clasă de tip Combiner), prin implementarea claselor care realizează funcţionalitatea claselor de tip Map, respectiv Reduce urmată de lansarea în execuţie a aplicaţiei.

Clasa Job înglobează toate informaţiile despre o sarcină, controlând totodată execuţia acesteia.

Job job = Job.getInstance(getConf(), "JobName");

O sarcină este împachetată în cadrul unei arhive .jar care este distribuită în mod automat de sistemul Hadoop. Pentru a şti care este sarcina de transmis, trebuie apelată metoda setJarByClass(), primind ca argument chiar arhiva .jar în care se găseşte sarcina curentă. Sistemul Hadoop va localiza prin aceasta fişierul care conţine clasa indicată.

job.setJarByClass(getClass());

În continuare, trebuie specificat formatul datelor de intrare respectiv al datelor de ieşire, acestea putând fi fişiere, directoare, texte sau tabele dintr-o bază de date.

Datele de intrare sunt indicate de o implementare a clasei InputFormat, responsabilă de crearea partiţiilor şi a sistemului de preluare a înregistrărilor.

job.setInputFormatClass(CustomInputFormat.class);
În cazul în care datele de intrare nu respectă formatul nici uneia dintre clasele care implementează interfaţa InputFormat, va fi specificată o clasă definită de utilizator care defineşte metoda createRecordReader(InputSplit, TaskAttemptContext) întorcând obiectul (derivat din clasa RecordReader) care gestionează preluarea înregistrărilor din sursa respectivă.

După ce sunt preluate înregistrările din sursa de date, acestea sunt partiţionate şi fiecare secvenţă este transmisă unui proces de tip Mapper spre a fi prelucrată în continuare.

Hadoop foloseşte propriul mecanism de serializare pentru transmiterea datelor prin infrastructura de comunicaţie, fiind optimizat pentru cazul reţelelor de calculatoare. Serializarea este folosită pentru interacțiunea cu fișiere și cu baze de date. În pachetul org.apache.hadoop.io sunt definite clasele IntWritable (pentru Integer), Text (pentru String) şi altele, însă implementarea propriilor clase pentru tipuri de date serializabile este destul de facilă.

Datele de ieşire sunt specificate de o implementare a clasei OutputFormat, indicându-se şi locaţia la care sarcinile de tip reduce îşi vor plasa rezultatele. Fiecare sarcină de tip reduce dispune de propriile date de ieşire, de tip text, sub forma unor perechi (cheie, valoare).

job.setOutputFormatClass(CustomOutputFormat.class);
Locaţia la care vor fi plasate datele de ieşire trebuie să nu existe, în caz contrar generându-se o excepţie care determină eşuarea globală a sarcinii.
În mod implicit, este definit un singur proces de tip reduce.

Pentru fiecare sarcină trebuie specificat formatul pentru cheia şi valoarea datelor de ieşire, pentru procesele de tip map cât şi pentru procesele de tip reduce. Sunt folosite în acest scop metodele setMapOutputKeyClass() şi setMapOutputValueClass(), care pot primi ca parametrii tipuri predefinite de date sau tipuri de date definite de utilizator.

În cazul în care datele de intrare sau datele de ieşire au tipurile TextInputFormat respectiv TextOutputFormat, locaţia de la care pot fi preluate informaţiile va fi indicată prin metodele FileInputFormat.setInputPaths(), respectiv FileOutputFormat.setOutputPath() care vor primi ca parametru obiectul de tip Job şi un obiect de tip Path care conţine calea din sistemul de fişiere local sau din cadrul sistemului distribuit de fişiere HDFS.

Ulterior, trebuie indicate clasele de tip Mapper şi Reducer (având de regulă aceeaşi cheie pentru datele de ieşire):

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

În continuare, sarcina este lansată în execuţie prin apelarea metodei waitForCompletion() ce primeşte un parametru care indică dacă sunt afişate valori la consolă. Metoda transmite sarcina, aşteptând terminarea acesteia. Rezultatul pe care îl întoarce este true în caz că sarcina a fost terminată cu succes şi false în caz contrar.

Clasa de tip Map este derivată din clasa org.apache.hadoop.mapreduce.Mapper, fiind parametrizată cu 4 valori de tip JavaGenerics având semnificaţia tipurilor de date pe care le au cheia şi valoarea datelor de intrare, respectiv cheia şi valoarea datelor de ieşire. Sarcina programatorului constă în a defini metoda map() care primeşte ca parametru o cheie şi o valoare (cu tipurile aferente celor specificate la parametrizarea clasei) corespunzătoare datelor de intrare şi un obiect de tip Context folosit pentru datele de ieşire, dar şi pentru specificarea unor contoare definite de utilizator.

MyMapper.java
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
 
  @Override
  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    Text outputKey = new Text();
    Text outputValue = new Text();
    // ...
    context.write(outputKey, outputValue);
  }
}

Analog, clasa de tip Reduce este derivată din org.apache.hadoop.mapreduce.Reducer, având acelaşi tip de parametrizare cu menţiunea faptului că tipurile specificate pentru datele de intrare (pentru cheie şi pentru valoare) ale acestuia trebuie să fie identice cu tipurile specificate pentru datele de ieşire ale clasei de tip Map, întrucât perechile (cheie, valoare) produse de aceasta vor fi grupate astfel că pentru fiecare cheie va exista un set de una sau mai multe valori. Acest proces este cunoscut sub denumirea de amestecare şi sortare (eng. shuffle and sort). Intrările pentru clasa de tip Reduce sunt sortate în funcţie de cheie, fiecare set de valori fiind prelucrat în metoda reduce() care primeşte ca argumente o cheie şi un set de valori, dar şi obiectul de tip Context. Un set de valori este reprezentat sub forma Iterable<V>. Prelucrările în cadrul metodei reduce() sunt realizate prin parcurgerea acestei liste şi realizarea unor agregări asupra acestor valori pentru obţinerea rezultatului.

MyReducer.java
public class MyReducer extends Reducer<Text, Text, Text, Text> {
 
  @Override
  public void reduce(Text key, Interable<Text> values, Context context)
    throws IOException, InterruptedException {
    Text outputKey = new Text();
    Text outputValue = new Text();
    for (Text value: values) {
      // ...
    }
    context.write(outputKey, outputValue);
  }
}

Opţional, se poate defini şi o clasă Combiner, spre a combina informaţiile pentru fiecare sarcină de tip Map, astfel încât cantitatea datelor procesate de sarcina Reduce să fie limitată. Aceste clase nu sunt însă garantate să ruleze, fiind folosite mai mult pentru optimizări şi nu în cazul operaţiilor critice pentru logica aplicaţiei.

Informaţiile oferite la rularea unei sarcini sunt identificatorul acesteia (folosit pentru a monitoriza şi a gestiona sarcina), numărul de partiţii generate, progresul şi valoarea contoarelor exprimând statistici referitoare la operaţiile care au fost realizate în cadrul rulării sarcinii.

Întrucât HBase este scris în limbajul de programare Java, el oferă un API pentru acces programatic la informaţiile conţinute. Aceasta reprezintă metoda cea mai rapidă de a accesa informaţiile din cadrul unei baze de date distribuite de tip HBase. Sunt suportate operaţii de tip CRUD, dar şi unele operaţii administrative.

Accesul la o bază de date HBase presupune crearea unui obiect de tip Configuration, în care sunt încărcate proprietăţi specifice, indicate de utilizator în fişierele de configurare, construirea unui obiect de tip HTable (pentru care trebuie să se specifice atât obiectul de configurare cât şi denumirea tabelului), realizarea diferitelor operaţii (put(), get(), scan(), delete()) şi închiderea instanţei care implică eliberarea tuturor resurselor folosite şi a zonelor de memorie tampon.

Clasa Configuration extinde clasa de configurare Hadoop cu care păstrează compatibilitatea, la crearea unui astfel de obiect fiind încărcate din classpath documentele hbase-default.xml şi hbase-site.xml. Valorile proprietăţilor din hbase-site.xml suprascriu valorile proprietăţilor conţinute în hbase-default.xml.

Configuration conf = HBaseConfiguration.create();

Unele proprietăţi pot fi suprascrise manual, însă un astfel de comportament nu este de regulă necesar şi nici de dorit.

conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");

Un obiect de tip Configuration trebuie partajat cât de mult posibil întrucât obiectele de tip HTable create cu aceeaşi configurație vor avea aceeaşi conexiune la ZooKeeper şi HBaseMaster, de vreme ce o conexiune este reţinută intern printr-un obiect de tip asociere care foloseşte drept cheie instanţe ale obiectului de tip Configuration. Când se reutilizează un obiect de tip Configuration pentru mai multe instanţe HTable, trebuie apelat close() ca HConnectionManager să elimine instanţa din lista tabelelor ce folosesc conexiunea respectivă. Când toate tabelele unei conexiuni sunt închise, este terminată şi conexiunea corespunzătoare. O conexiune e reprezentată de clasa HConnection fiind gestionată de clasa HConnectionManager.

Clasa org.apache.hadoop.client.HTable reprezintă interfaţa clientului către o singură tabelă HBase, oferind acces la operaţii CRUD, fiind proiectată pentru a fi utilizată cu uşurinţă. Operaţiile care modifică datele sunt atomice pentru fiecare înregistrare în parte, neexistând conceptul de tranzacţie (care să implice mai mult de o înregistrare). Astfel, există o consistenţă de 100% per înregistrare, clientul realizând operaţii de citire sau de scriere asupra acesteia sau aşteptând eliberarea accesului. Atomicitatea se păstrează indiferent de numărul de coloane utilizate în cadrul cererii.

Un obiect HTable poate fi utilizat pentru grupuri de comenzi (spre a se asigura performanţa), însă aceste operaţii nu sunt atomice.
O astfel de abordare nu reprezintă o problemă în cazul în care există mai multe procese care realizează operaţii de citire, însă există o problemă în cazul în care mai multe procese încearcă să scrie aceeaşi înregistrare.

Crearea unui obiect HTable este destul de costisitoare întrucât este scanat întregul catalog .META., verificându-se dacă tabela există şi este activată. Astfel, aceasta trebuie apelată o singură dată în cadrul unui fir de execuţie, fiind refolosit cât de mult posibil. Operaţiile realizate cu obiecte HTable nu sunt garantate să funcţioneze corect într-un mediu distribuit.

HTable myTable = new HTable(conf, "TableName");
În cazul în care se folosesc prea multe instanţe HTable la un moment dat, ar trebui utilizat un obiect de tip HTablePool pentru că reutilizează instanţele acestor clase.

Pentru a se scrie date într-o tabelă, se foloseşte un obiect de tip Put, care este instanţiat cu identificatorul (cheia de rând) a înregistrării care se doreşte transmisă, în format binar.

Put myPut = new Put(Bytes.toBytes("myRow"));

Un astfel de obiect implementează metoda add(), prin care se indică coordonatele celulei la care se doreşte să se scrie valoarea în cauză.

Put.add(family, column, value);
Put.add(family, column, value, timestamp, value);

Toţi parametrii sunt în format binar, fiind reponsabilitatea programatorului conversia acestora.

Transferul propriu-zis al datelor se realizează la apelul metodei put(), pe instanţa HTable:

myTable.put(myPut);

Pentru a se citi date dintr-o tabelă, se foloseşte un obiect de tip Get, care este instanţiat cu identificatorul (cheia de rând) a înregistrării care se doreşte obţinută, în format binar.

Get myGet = new Get(Bytes.toBytes("myRow"));

Un astfel de obiect implementează metoda add(), prin care se indică coordonatele celulei la care se doreşte să se scrie valoarea în cauză. Metoda add() permite specificarea mai multor familii de coloane şi a mai multor coloane.

Get.addFamily(family);
Get.addColumn(family, column);

Este foarte important să fie indicate exact datele necesare în cadrul aplicaţiei, altfel fiind returnat un rând întreg.

Filtrarea se poate realiza prin specificarea unui interval de timp, respectiv al unui număr de versiuni (prin metodele setTimeRange() şi setMaxVersions()).

Transferul propriu-zis al datelor se realizează la apelul metodei get(), pe instanţa HTable:

Result myResult = myTable.get(myGet);

Obţinerea rezultatelor dintr-un obiect de tip Result se face prin metoda getValue(), specificându-se denumirea familiei de coloane şi a coloanei. Alte metode sunt getRow() pentru obţinerea identificatorului unei înregistrări, isEmpty() pentru a verifica dacă rezultatul este vid, size() pentru a obţine numărul de celule şi containsColumn(family:column) pentru a verifica existenţa unei coloane.

Un obiect HTable trebuie întotdeauna închis dacă nu mai este necesar sau atunci când s-a produs o excepţie (fiind plasat de regulă pe ramura catch, operaţiile pe tabela respectivă fiind plasate pe ramura try):

myTable.close();

Activitate de Laborator

0. Să se cloneze în directorul de pe discul local conținutul depozitului la distanță de la https://www.github.com/aipi2015/Laborator08. În urma acestei operații, directorul Laborator08 va trebui să conțină subdirectorul labtasks, fișierele README.md și LICENSE.

student@aipi2015:~$ git clone https://www.github.com/aipi2015/Laborator08.git
În cadrul acestui laborator, este necesar să se foloseasă masina virtuală Ubuntu 14.04 care conține instalate Hadoop 2.6.1, HBase 1.1.2 precum și un plugin pentru mediul integrat de dezvoltare Eclipse, prin intermediul căruia pot fi dezvoltate aplicațiile distribuite folosind aceste tehnologii.

1. Să se pornească maşina virtuală Ubuntu 14.04, autentificarea făcându-se folosind credențialele aipi2015 (ca nume de utilizator) şi StudentAipi2015 (ca parolă).

2. Într-un terminal, să se pornească sistemul de fişiere distribuite HDFS şi sistemul de gestiune al resurselor YARN.

student@aipi2015:~$ start-dfs.sh
student@aipi2015:~$ start-yarn.sh
Verificaţi faptul că au pornit corect prin inspectarea interfeţelor web de la adresele http://localhost:50070, respectiv http://localhost:8088.

3. Într-un terminal, să se pornească o instanţă a sistemului de gestiune pentru baze distribuite HBase.

student@aipi2015:~$ start-hbase.sh
Verificaţi faptul că rulează toate procesele Hadoop / HBase rulând comanda
student@aipi2015:~$ jps

În execuție trebuie să se afle procesele Java NameNode, SecondaryNameNode, DataNode (ale sistemului distribuit de fișiere HDFS), ResourceManager, NodeManager (ale sistemului de gestiune a resurselor YARN), respectiv HRegionServer, HMaster și HQuorumPeer (ale sistemului de gestiune pentru baze de date distribuite HBase).

4. Să se ruleze script-urile HBase pentru instalarea bazei de date. Vor trebui rulate scripturile Laborator08_DDLc.rb şi Laborator08_DML.rb, localizate în directorul scripts.

Va fi apelat utilitarul hbase (cu parametrul shell), indicându-se căile absolute către scripturile care trebuie rulate.

student@aipi2015:~/scripts$ hbase shell Laborator08_DDLc.rb
student@aipi2015:~/scripts$ hbase shell Laborator08_DML.rb
De asemenea, există un script Laborator08_DDLd.rb pentru dezinstalarea bazei de date, în cazul în care o astfel de operație este necesară.

Structura bazei de date conține următoarele tabele, familii de coloane și atribute:

Tabela Familia de Coloane Atributul
country identification name, code
detail description
publishing_house identification name, registered_number
detail description
address postal_address, zip_code, country_id
miscellanea internet_address
collection identification name, description
referrence publishing_house_id
book identification title, subtitle
detail description
version edition, printing_year
group collection_id
format identification value
detail description
language identification name, code
detail description
book_presentation identification isbn
referrence book_id, format_id, language_id
inventory stockpile, price
writer appellation first_name, last_name
biography content
author referrence book_id, writer_id
category identification name
detail description
category_content referrence book_id, category_id
supply_order_hdeader identification identification_number, issue_date
situation state
producer publishing_house_id
supply_order_line referrence supply_order_header_id
content book_presentation_id, quantity
user appellation first_name, last_name
identification personal_identifier
contact address, phone_number, email
category type
authentication username, password
invoice_header identification identification_number, issue_date
situation state
consumer user_id
invoice_line referrence invoice_header_id
content book_presentation_id, quantity
statistics expense value

5. Se doreşte afişarea unei liste conţinând toţi utilizatorii (identificaţi prin nume şi prenume) precum şi sumele pe care aceştia le-au cheltuit prin achiziţia de cărţi din librăria virtuală.

În cazul unui sistem de gestiune pentru baze de date relaţionale, o astfel de problemă are o soluţie foarte simplă ce constă într-o operaţie de joncţiune între tabelele book_presentation, user, invoice_header şi invoice_line. Pentru un volum foarte mare de informaţii, o astfel de operaţie poate dura o perioadă de timp inacceptabilă, motiv pentru care se doreşte ca procesarea să se realizeze în paralel, pe mai multe maşini, fiecare gestionând o parte dintre înregistrările tabelelor respective.

Un sistem MapReduce poate împărţi datele de intrare (tabela invoice_line pentru situaţia de faţă) în mai multe regiuni în mod automat, fiecare dintre acestea fiind prelucrate de un proces separat. Astfel, pentru fiecare detaliu din factură, reprezentând un produs al facturii se determină preţul total, obţinut prin înmulţirea preţului formatului de prezentare a cărţii (obţinut printr-o interogare a tabelei book_presentation) cu cantitatea precizată. Pentru o factură dată prin identificatorul său, se asociază preţul total al fiecărui produs pe care îl conţine. Ulterior, se creează o listă corespunzătoare fiecărei facturi conţinând toate preţurile totale ale produselor aferente. Acestea vor fi însumate, obţinându-se valoarea facturii.

MAP REDUCE
(factura1, preţ_produs1) (factura1, (preţ_produs1, preţ_produs2, …, preţ_produsm1))
=
(factura1, preţ_produs1 + preţ_produs2 + … + preţ_produsm1)
(factura1, preţ_produs2)
(factura1,preţ_produsm1)
(factura2, preţ_produs1) (factura2, (preţ_produs1, preţ_produs2, …, preţ_produsm1))
=
(factura2, preţ_produs1 + preţ_produs2 + … + preţ_produsm2)
(factura2, preţ_produs2)
(factura2,preţ_produsm2)
(facturan, preţ_produs1) (facturan, (preţ_produs1, preţ_produs2, …, preţ_produsmn))
=
(facturan, preţ_produs1 + preţ_produs2 + … + preţ_produsmn)
(facturan, preţ_produs2)
(facturan,preţ_produsmn)

Cum fiecare factură are asociat un identificator al utilizatorului care a achitat-o, acesta va conduce la obţinerea, prin interogarea bazei de date user, a numelui şi prenumelui clientului respectiv.

Pentru a calcula totalurile pentru fiecare utilizator (un utilizator poate avea mai multe facturi asociate), din datele generate până acum, se vor realiza grupări în funcţie de utilizatori, astfel încât pentru fiecare dintre aceştia vor fi asociate una sau mai multe sume de bani corespunzătoare facturilor. Printr-un mecanism similar, acestea vor fi însumate în vederea determinării cheltuielilor totale, pentru fiecare utilizator în parte.

MAP REDUCE
(utilizator, preţ_factura1) (utilizator, (preţ_factura1, preţ_factura2, …, preţ_facturan))
=
(utilizator, preţ_factura1 + preţ_factura2 + … + preţ_facturan)
(utilizator, preţ_factura2)
(utilizator, preţ_facturan)

Se observă faptul că structura coloanelor marcate este echivalentă, astfel încât aplicaţia va consta din 2 sarcini, din care una va avea atât partiţie de tip map cât şi partiţie de tip reduce, iar una va avea numai partiţie de tip reduce. De asemenea, se observă faptul că datele de ieşire ale unei sarcini reprezintă datele de intrare pentru cealaltă sarcină.

În aplicaţia BookStore, cele două sarcini se numesc invoiceValue şi userExpenses, fiind definite (prin partiţiile map şi reduce corespunzătoare) în clasa cu acelaşi nume.

a) în clasa Map a sarcinii invoiceValue (clasa InvoiceValueMapper din pachetul ro.pub.cs.aipi.lab08.mapper), să se determine preţul formatului de prezentare a cărţii analizate în mod curent (care se găseşte în detaliul de factură repartizat procesului curent);

<spoiler|Indicații de rezolvare> Se va crea o instanţă a clasei Table, pe baza unui obiect de tip Connection (obținut prin intermediul unei fabrici de conexiuni, ConnectionFactory), pentru care se apelează metoda createConnection(), aceasta primind ca argument denumirea tabelei respective (TableName.valueOf(…)). Ulterior, se creează un obiect de tip Get în care se va specifica atributul ce se doreşte obţinut (prin metoda addColumn(), indicându-se atât familia coloanei cât şi denumirea propriu-zisă a coloanei), din aplicarea acestuia generându-se un obiect Result pentru care se apelează metoda getValue() în scopul de a se obţine valoarea dorită.

Nu uitați să eliberați resursele asociate atât tabelei HBase pe care se realizează interogarea cât și a conexiunii pe baza căreia aceasta a fost obținută, prin apelarea metodei close().

</spoiler>

b) în clasa Reduce a sarcinii invoiceValue (clasa InvoiceValueReducer din pachetul ro.pub.cs.aipi.lab08.reducer), să se calculeze preţul total pentru o factură, prin agregarea tuturor sumelor parţiale specificate în detaliile de factură;

c) în clasa Reduce a sarcinii invoiceValue (clasa InvoiceValueReducer din pachetul ro.pub.cs.aipi.lab08.reducer), să se obţină, prin interogarea tabelei invoice_header, identificatorul utilizatorului căruia i-a fost emisă factura analizată în mod curent;

d) în clasa Reduce a sarcinii invoiceValue (clasa InvoiceValueReducer din pachetul din pachetul ro.pub.cs.aipi.lab08.reducer), să se obţină, prin interogarea tabelei user, numele şi prenumele clientului având identificatorul determinat anterior;

e) în clasa BookStore din pachetul ro.pub.cs.aipi.lab08.main, pentru sarcina userExpenses, să se indice valorile claselor corespunzătoare cheii şi valorii datelor de ieşire;

<spoiler|Indicații de Rezolvare> Întrucât datele de ieşire sunt reprezentate de tuplul (nume&prenume, valoare), tipurile de date corespunzătoare vor fi Text, respectiv DoubleWritable. </spoiler>

f) în clasa Reduce a sarcinii userExpenses (clasa UserExpensesFileReducer din pachetul ro.pub.cs.aipi.lab08.reducer), să determine valoarea totală a facturilor achitate de client şi să se scrie această valoare în context (folosind metoda write(), cheia de rând fiind reprezentată de numele şi prenumele utilizatorului);

De remarcat faptul că pentru citirea datelor de intrare în cazul sarcinii userExpenses, a fost folosită o clasă specială, definită de utilizator (CustomFileInputFormat), în care datele au forma (cheie, valoare), iar separatorul dintre aceste entităţi este TAB-ul. Întrucât cheia este reprezentată de numele şi prenumele utilizatorilor, iar valoarea de suma unei facturi, tipurile de date procesate în clasa CustomFileInputFormatRecordReader vor fi Text, respectiv DoubleWritable. O astfel de soluţie a fost adoptată datorită faptului că nu au fost definite tipuri de fişiere de intrare care să aibă această structură.

6. Să se rezolve problema anterioară, stocând rezultatele nu într-un fişier ci în tabela statistics a bazei de date HBase, care are o singură familie de coloane, expense, în care o posibilă coloană ar putea fi valoare.

a) În clasa BookStore din pachetul ro.pub.cs.aipi.lab08.main, se va apela metoda statică initTableReducerJob() a clasei TableMapReduceUtil ce primeşte ca parametrii denumirea tabelei unde se doresc a fie redirectate datele de ieşire, numele clasei care implementează sarcina de tip Reduce şi numele obiectului de tip Job. De asemenea, va trebui indicat şi numărul sarcinilor de tip Reduce.

b) În clasa UserExpensesTableReducer din pachetul ro.pub.cs.aipi.lab08.reducer, ulterior determinării sumei totale achitate de client, să se creeze un obiect de tip Put având drept cheie numele şi prenumele utilizatorului, adăugându-se (prin metoda addColumn()), ca valoare a celulei identificată de coordonatele expense:value, suma tuturor facturilor, corespunzând produselor achiziţionate.

7. Să se verifice rezultatele obținute.

<spoiler|Indicații de Rezolvare> a) în situația în care rezultatele au fost stocate în sistemul de fișiere HDFS, se poate folosi plugin-ul pentru mediul integrat de execuție Eclipse, care conferă un navigator pentru acest sistem de fișiere;
b) în situația în care rezultatele au fost stocate în tabela statistics din cadrul sistemului de gestiune pentru baze de date distribuite HBase, se poate verifica conținutul acesteia:

student@aipi2015:~$ hbase shell
hbase> scan 'statistics'

</spoiler>

8. Înainte de a opri maşina virtuală, să se oprească instanţa sistemului de gestiune pentru baze de date distribuite HBase precum şi a sistemului de gestiune a resurselor YARN, respectiv al sistemului de fişiere distribuite HDFS.

student@aipi2015:~$ stop-hbase.sh
student@aipi2015:~$ stop-yarn.sh
student@aipi2015:~$ stop-dfs.sh

Resurse

Soluții

laboratoare/laborator08.txt · Last modified: 2015/12/11 15:19 by Andrei Roșu-Cojocaru
CC Attribution-Share Alike 4.0 International
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0