RDD folosind Spark: Blocul de construcție al Apache Spark



Acest blog despre RDD folosind Spark vă va oferi o cunoaștere detaliată și cuprinzătoare a RDD, care este unitatea fundamentală a Spark și cât de util este.

, Cuvântul în sine este suficient pentru a genera o scânteie în mintea fiecărui inginer Hadoop. LA n memorie instrument de procesare care este extrem de rapid în calculul cluster. Comparativ cu MapReduce, partajarea datelor în memorie face RDD-uri 10-100x Mai repede decât partajarea de rețea și disc și toate acestea sunt posibile datorită RDD-urilor (seturi de date distribuite rezistente). Punctele cheie pe care le concentrăm astăzi în acest RDD folosind articolul Spark sunt:

Aveți nevoie de RDD-uri?

De ce avem nevoie de RDD? -RDD folosind Spark





Lumea evoluează odată cu și Știința datelor din cauza avansării în . Algoritmi bazat pe Regresie , , și care merge mai departe Distribuit Calcul iterativ acțiune moda care include Reutilizarea și partajarea datelor între mai multe unități de calcul.

Traditionalul tehnicile aveau nevoie de o stocare intermediară stabilă și distribuită, cum ar fi HDFS cuprinzând calcule repetitive cu replicări de date și serializare a datelor, ceea ce a făcut procesul mult mai lent. Găsirea unei soluții nu a fost niciodată ușoară.



diferența dintre extinde și unelte

Aici e locul RDD-uri (Seturi de date distribuite rezistente) ajunge la imaginea de ansamblu.

RDD Sunt ușor de utilizat și nu se creează fără efort, deoarece datele sunt importate din surse de date și aruncate în RDD-uri. Mai mult, operațiile sunt aplicate pentru a le procesa. Ei sunt colecție distribuită de memorie cu permisiuni ca Numai în citire și cel mai important, sunt Tolerant la erori .



Dacă există partiție de date de RDD este pierdut , poate fi regenerat prin aplicarea aceluiași transformare operație pe acea partiție pierdută în descendență , mai degrabă decât să procesăm toate datele de la zero. Acest tip de abordare în scenarii în timp real poate face ca miracolele să se întâmple în situații de pierdere a datelor sau când un sistem este defect.

Ce sunt RDD-urile?

RDD sau ( Set de date distribuite rezistent ) este un element fundamental structură de date în Spark. Termenul Rezistent definește capacitatea care generează automat datele sau datele întorcându-se înapoi la starea inițială când apare o calamitate neașteptată cu o probabilitate de pierdere a datelor.

Datele scrise în RDD-uri sunt partiționat și stocate în mai multe noduri executabile . Dacă un nod de executare eșuează în timpul rulării, apoi obține instantaneu back-up-ul de pe următorul nod executabil . Acesta este motivul pentru care RDD-urile sunt considerate ca un tip avansat de structuri de date în comparație cu alte structuri de date tradiționale. RDD-urile pot stoca date structurate, nestructurate și semi-structurate.

Să mergem mai departe cu RDD-ul nostru folosind blogul Spark și să aflăm despre caracteristicile unice ale RDD-urilor, care îi oferă un avantaj față de alte tipuri de structuri de date.

Caracteristici ale RDD

  • In memoria (RAM) Calcule : Conceptul de calcul în memorie duce prelucrarea datelor într-o etapă mai rapidă și eficientă, în general performanţă a sistemului este actualizat.
  • L evaluarea sa : Termenul de evaluare leneș spune transformări sunt aplicate datelor din RDD, dar ieșirea nu este generată. În schimb, transformările aplicate sunt logat.
  • Persistenţă : RDD-urile rezultate sunt întotdeauna reutilizabil.
  • Operații cu granulație grosieră : Utilizatorul poate aplica transformări tuturor elementelor din seturile de date prin Hartă, filtru sau a se grupa cu operațiuni.
  • Tolerant la erori : Dacă există o pierdere de date, sistemul poate întoarce-te înapoi la a sa starea inițială prin utilizarea jurnalului transformări .
  • Imuabilitate : Datele definite, recuperate sau create nu pot fi schimbat odată ce este conectat la sistem. În cazul în care trebuie să accesați și să modificați RDD-ul existent, trebuie să creați un RDD nou prin aplicarea unui set de Transformare funcționează pe RDD curent sau precedent.
  • Partiționare : Este unitate crucială de paralelism în Spark RDD. În mod implicit, numărul de partiții create se bazează pe sursa dvs. de date. Puteți chiar decide numărul de partiții pe care doriți să le faceți folosind partiție personalizată funcții.

Crearea RDD folosind Spark

RDD-urile pot fi create în trei moduri:

  1. Citirea datelor din colecții paralelizate
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Punerea în aplicare transformare pe RDD-urile anterioare
cuvinte val = spark.sparkContext.parallelize (Seq („Spark”, „este”, „a”, „foarte”, „puternic”, „limbă”)) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Citirea datelor din stocare externă sau căi de fișiere precum HDFS sau HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operații efectuate pe RDD-uri:

Există în principal două tipuri de operații care sunt efectuate pe RDD-uri, și anume:

  • Transformări
  • Acțiuni

Transformări : operațiuni aplicăm pe RDD-uri la filtru, acces și modifica datele din RDD părinte pentru a genera un RDD succesiv se numește transformare . Noul RDD returnează un pointer către RDD-ul anterior, asigurând dependența dintre ele.

Transformările sunt Evaluări leneșe, cu alte cuvinte, operațiile aplicate pe RDD pe care le lucrați vor fi înregistrate, dar nu executat. Sistemul generează un rezultat sau o excepție după declanșarea Acțiune .

Putem împărți transformările în două tipuri după cum urmează:

  • Transformări înguste
  • Transformări largi

Transformări înguste Aplicăm transformări înguste pe a partiție unică din RDD părinte pentru a genera un RDD nou, deoarece datele necesare procesării RDD sunt disponibile pe o singură partiție a ASD părinte . Exemplele pentru transformări înguste sunt:

  • Hartă()
  • filtru()
  • flatMap ()
  • partiție ()
  • mapPartitions ()

Transformări largi: Aplicăm transformarea largă pe partiții multiple pentru a genera un nou RDD. Datele necesare procesării RDD sunt disponibile pe mai multe partiții ale ASD părinte . Exemplele pentru transformări ample sunt:

  • reduceBy ()
  • uniune()

Acțiuni : Acțiunile instruiesc Apache Spark să aplice calcul și treceți rezultatul sau o excepție înapoi la driverul RDD. Puține dintre acțiuni includ:

  • colectarea()
  • numara()
  • lua()
  • primul()

Să aplicăm practic operațiunile pe RDD-uri:

IPL (Indian Premier League) este un turneu de cricket cu un nivel ridicat. Așadar, haideți să punem mâna pe setul de date IPL și să executăm RDD-ul nostru folosind Spark.

  • In primul rand, să descărcăm datele CSV de potrivire a IPL. După descărcare, începe să arate ca un fișier EXCEL cu rânduri și coloane.

În pasul următor, aprindem scânteia și încărcăm fișierul matches.csv din locația sa, în cazul meuCSVlocația fișierului este „/User/edureka_566977/test/matches.csv”

Acum să începem cu Transformare prima parte:

  • Hartă():

Folosim Transformarea hărții pentru a aplica o operație de transformare specifică pe fiecare element al unui RDD. Aici creăm un RDD după numele CKfile unde stocăm fișierele noastreCSVfişier. Vom crea un alt RDD numit State to stocați detaliile orașului .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filtru():

Transformarea filtrului, numele în sine descrie utilizarea sa. Folosim această operațiune de transformare pentru a filtra datele selective dintr-o colecție de date date. Aplicăm operarea filtrului aici pentru a obține înregistrările meciurilor IPL ale anului 2017 și păstrați-l în fil RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Aplicăm FlatMap este o operațiune de transformare pentru fiecare dintre elementele unui RDD pentru a crea un nou RDD. Este similar cu transformarea Hărții. aici aplicămFlatmapla scuipă meciurile orașului Hyderabad și stocați datele înfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • partiție ():

Fiecare dată pe care o scriem într-un RDD este împărțită într-un anumit număr de partiții. Folosim această transformare pentru a găsi numărul de partiții datele sunt de fapt împărțite în.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Considerăm MapPatitions ca o alternativă a Map () șipentru fiecare() împreună. Folosim mapPartitions aici pentru a găsi număr de rânduri avem în fișierul nostru RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy ():

FolosimReduceBy() pe Perechi cheie-valoare . Am folosit această transformare pe a noastrăCSVfișier pentru a găsi playerul cu cel mai înalt Om al meciurilor .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • uniune():

Numele explică totul, folosim transformarea uniunii este pentru club două RDD-uri împreună . Aici creăm două RDD-uri și anume fil și fil2. fil RDD conține înregistrările meciurilor IPL 2017 și fil2 RDD conține înregistrările meciurilor IPL 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Să începem cu Acțiune partea în care afișăm rezultatul real:

  • colectarea():

Colectarea este acțiunea la care obișnuim afișați conținutul în RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • numara():

Numaraeste o acțiune pe care o folosim pentru a număra numărul de înregistrări prezent în RDD.Aicifolosim această operațiune pentru a număra numărul total de înregistrări din fișierul nostru matches.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • lua():

Take este o operațiune de acțiune similară pentru a colecta, dar singura diferență este că poate imprima oricare numărul selectiv de rânduri conform cererii utilizatorului. Aici aplicăm următorul cod pentru a tipări primele zece rapoarte de frunte.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • primul():

First () este o operațiune de acțiune similară cu collect () și take ()aceastafolosit pentru a imprima raportul cel mai de sus s rezultatul Aici folosim prima () operație pentru a găsi numărul maxim de meciuri jucate într-un anumit oraș și obținem Mumbai ca rezultat.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => { Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Pentru a face procesul nostru de învățare RDD folosind Spark, și mai interesant, am venit cu un caz de utilizare interesant.

RDD folosind Spark: Pokemon Use Case

  • In primul rand, Permiteți-ne să descărcăm un fișier Pokemon.csv și să-l încărcăm în spark-shell așa cum am făcut în fișierul Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemonii sunt de fapt disponibili într-o mare varietate. Să găsim câteva soiuri.

  • Eliminarea schemei din fișierul Pokemon.csv

S-ar putea să nu avem nevoie de Schemă din fișierul Pokemon.csv. Prin urmare, îl eliminăm.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Găsirea numărului de partiții pokemon.csv este distribuit în.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokemon de apă

Găsirea numărul de pokemon de apă

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Foc Pokemon

Găsirea numărul de pokemon Fire

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • De asemenea, putem detecta populației a unui alt tip de pokemon folosind funcția de numărare
WaterRDD.count () FireRDD.count ()

  • Din moment ce îmi place jocul de strategie defensivă hai să găsim pokemonul cu apărare maximă.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Știm maximul valoarea puterii apărării dar nu știm care este pokemonul. deci, să găsim care este asta pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Comandând [Dublu] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Acum hai să sortăm pokemonul cu cel puțin Apărare
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Acum să vedem Pokemonul cu un strategie mai puțin defensivă.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokP .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Deci, odată cu aceasta, ajungem la sfârșitul acestui RDD folosind articolul Spark. Sper că am scos puțin lumină asupra cunoștințelor dvs. despre RDD-uri, caracteristicile lor și diferitele tipuri de operații care pot fi efectuate pe ele.

Acest articol se bazează pe este conceput pentru a vă pregăti pentru examenul de certificare Cloudera Hadoop și Spark Developer (CCA175). Veți obține cunoștințe aprofundate despre Apache Spark și Ecosistemul Spark, care include Spark RDD, Spark SQL, Spark MLlib și Spark Streaming. Veți obține cunoștințe cuprinzătoare despre limbajul de programare Scala, HDFS, Sqoop, Flume, Spark GraphX ​​și sistemul de mesagerie, cum ar fi Kafka.