Tutorial Spark Streaming - Analiza sentimentelor folosind Apache Spark



Acest blog Spark Streaming vă va prezenta Spark Streaming, caracteristicile și componentele sale. Include un proiect de analiză a sentimentelor folosind Twitter.

Spark Streaming este o extensie a API-ului Spark de bază, care permite procesarea scalabilă, cu randament ridicat, tolerant la erori a fluxurilor de date live. Spark Streaming poate fi utilizat pentru a transmite în flux date în direct și procesarea se poate întâmpla în timp real. Baza de utilizatori din ce în ce mai mare a Spark Streaming este formată din nume de uz casnic precum Uber, Netflix și Pinterest.

Când vine vorba de analiza datelor în timp real, Spark Streaming oferă o singură platformă pentru a ingera date pentru procesare rapidă și live și dovedește îndemânarea ta la fel.Prin acest blog, vă voi prezenta acest nou domeniu interesant al Spark Streaming și vom trece printr-un caz de utilizare complet, Analiza Sentimentului Twitter folosind Spark Streaming.





Următoarele sunt subiectele care vor fi tratate în acest blog:

  1. Ce este streamingul?
  2. De ce Spark Streaming?
  3. Prezentare generală a fluxului Spark
  4. Caracteristici Spark Streaming
  5. Fundamentele Spark Streaming
    5.1 Context de streaming
    5.2 DStream
    5.3 Caching / Persistență
    5.4 Acumulatoare, variabile de difuzare și puncte de control
  6. Utilizare caz - Analiza sentimentelor Twitter

Ce este streamingul?

Fluxul de date este o tehnică pentru transferul de date, astfel încât să poată fi procesate ca un flux constant și continuu. Tehnologiile de streaming devin din ce în ce mai importante odată cu creșterea internetului.



Ce este Streaming - Spark Streaming - EdurekaFigura: Ce este streamingul?

De ce Spark Streaming?

Putem folosi Spark Streaming pentru a transmite în timp real date din diverse surse, cum ar fi Twitter, bursă și sisteme geografice și pentru a efectua analize puternice pentru a ajuta companiile.

Figura: De ce Spark Streaming?



Prezentare generală a fluxului Spark

Spark Streaming este utilizat pentru procesarea datelor în timp real. Este un plus util la API-ul Spark de bază. Spark Streaming permite procesarea fluxurilor de date în timp real și tolerante la erori.

Figura: Fluxuri în Spark Streaming

Unitatea de flux fundamentală este DStreamcare este practic o serie de RDD-uri pentru procesarea datelor în timp real.

Caracteristici Spark Streaming

  1. Scalare: Spark Streaming se poate scala cu ușurință la sute de noduri.
  2. Viteză: Este unproduce latență scăzută.
  3. Toleranță la erori: Spark are capacitatea de a erecuperarea eficientă a eșecurilor.
  4. Integrare: Spark se integrează cu procesarea în serie și în timp real.
  5. Analiza afacerii: Spark Streaming este upentru a urmări comportamentul clienților care poate fi utilizat în analiza afacerii.

Flux de lucru Spark Streaming

Fluxul de lucru Spark Streaming are patru etape la nivel înalt. Primul este să transmiteți în flux date din diverse surse. Aceste surse pot fi streaming de surse de date precum Akka, Kafka, Flume, AWS sau Parquet pentru streaming în timp real. Al doilea tip de surse include HBase, MySQL, PostgreSQL, Elastic Search, Mongo DB și Cassandra pentru streaming static / batch. Odată ce acest lucru se întâmplă, Spark poate fi utilizat pentru a efectua învățarea automată a datelor prin intermediul API-ului său MLlib. În plus, Spark SQL este utilizat pentru a efectua operațiuni suplimentare asupra acestor date. În cele din urmă, ieșirea de streaming poate fi stocată în diferite sisteme de stocare a datelor, cum ar fi HBase, Cassandra, MemSQL, Kafka, Elastic Search, HDFS și sistemul de fișiere local.

Figura: Prezentare generală a fluxului Spark

Fundamentele Spark Streaming

  1. Context de streaming
  2. DStream
  3. Caching
  4. Acumulatoare, variabile de difuzare și puncte de control

Context de streaming

Context de streaming consumă un flux de date în Spark. Înregistrează un Intrare DStream a produce o Receptor obiect. Este punctul principal de intrare pentru funcționalitatea Spark. Spark oferă o serie de implementări implicite ale unor surse precum Twitter, Akka Actor și ZeroMQ care sunt accesibile din context.

Un obiect StreamingContext poate fi creat dintr-un obiect SparkContext. Un SparkContext reprezintă conexiunea la un cluster Spark și poate fi utilizat pentru a crea RDD-uri, acumulatori și variabile de difuzare pe acel cluster.

import org.apache.spark._ import org.apache.spark.streaming._ var ssc = new StreamingContext (sc, Secunde (1))

DStream

Flux discretizat (DStream) este abstractizarea de bază oferită de Spark Streaming. Este un flux continuu de date. Acesta este primit de la o sursă de date sau un flux de date procesat generat prin transformarea fluxului de intrare.

Figura: Extragerea cuvintelor dintr-un DStream de intrare

Intern, un DStream este reprezentat de o serie continuă de RDD-uri și fiecare RDD conține date dintr-un anumit interval.

Intrare DStreams: Intrare DStreams sunt DStreams care reprezintă fluxul de date de intrare primite din surse de streaming.

Figura: Receptorul trimite date pe intrarea DStream unde fiecare lot conține RDD-uri

Fiecare intrare DStream este asociată cu un obiect Receiver care primește datele de la o sursă și le stochează în memoria Spark pentru procesare.

Transformări pe DStreams:

Orice operație aplicată pe un DStream se traduce prin operații pe RDD-urile subiacente. Transformările permit modificarea datelor din DStream de intrare similar cu RDD-urile. DStreams acceptă multe dintre transformările disponibile pe RDD-urile Spark normale.

Figura: DStream Transformations

Următoarele sunt câteva dintre transformările populare pe DStreams:

Hartă( func )Hartă( func ) returnează un nou DStream prin trecerea fiecărui element al sursei DStream printr-o funcție func.
flatMap ( func )flatMap ( func ) este similar cu harta ( func ), dar fiecare element de intrare poate fi mapat la 0 sau mai multe elemente de ieșire și returnează un nou DStream trecând fiecare element sursă printr-o funcție func.
filtru( func )filtru( func ) returnează un nou DStream selectând numai înregistrările sursei DStream pe care func revine adevărat.
reduce( func )reduce( func ) returnează un nou DStream de RDD-uri cu un singur element prin agregarea elementelor din fiecare RDD al DStream sursă folosind o funcție func .
a se grupa cu( func )a se grupa cu( func ) returnează noul RDD care, în principiu, este alcătuit cu o cheie și o listă corespunzătoare de articole din acel grup.

Ieșire DStreams:

ce este cadrul în seleniu

Operațiile de ieșire permit transmiterea datelor DStream către sisteme externe precum baze de date sau sisteme de fișiere. Operațiile de ieșire declanșează executarea efectivă a tuturor transformărilor DStream.

Figura: Operații de ieșire pe DStreams

Caching

DStreams permiteți dezvoltatorilor să memoreze în cache / să păstreze datele fluxului în memorie. Acest lucru este util dacă datele din DStream vor fi calculate de mai multe ori. Acest lucru se poate face folosind persista() metoda pe un DStream.

Figura: Memorarea în cache a 2 noduri

Pentru fluxurile de intrare care primesc date prin rețea (cum ar fi Kafka, Flume, Sockets etc.),nivelul de persistență implicit este setat pentru a reproduce datele pe două noduri pentru toleranță la erori.

Acumulatoare, variabile de difuzare și puncte de control

Acumulatoare: Acumulatoare sunt variabile care se adaugă doar printr-o operație asociativă și comutativă. Acestea sunt utilizate pentru a implementa contoare sau sume. Urmărirea acumulatorilor în interfața de utilizare poate fi utilă pentru înțelegerea progresului etapelor de rulare. Spark suportă în mod nativ acumulatori numerici. Putem crea acumulatori cu nume sau fără nume.

Variabile difuzate: Variabile de difuzare permite programatorului să păstreze o variabilă de numai citire memorată în cache pe fiecare mașină, mai degrabă decât să trimită o copie a acesteia cu sarcini. Ele pot fi utilizate pentru a da fiecărui nod o copie a unui set de date de intrare mare într-o manieră eficientă. Spark încearcă, de asemenea, să distribuie variabilele de difuzare utilizând algoritmi de difuzare eficienți pentru a reduce costurile de comunicare.

Puncte de control: Puncte de control sunt similare punctelor de control din jocuri. Îl fac să ruleze 24/7 și îl fac rezistent la eșecuri care nu au legătură cu logica aplicației.


Figura:
Caracteristicile punctelor de control

Utilizare caz - Analiza sentimentelor Twitter

Acum, că am înțeles conceptele de bază ale Spark Streaming, să rezolvăm o problemă din viața reală folosind Spark Streaming.

Declarație problemă: Pentru a proiecta un sistem de analiză a sentimentelor Twitter în care populăm sentimentele în timp real pentru gestionarea crizelor, ajustarea serviciilor și marketingul țintă.

Aplicații ale analizei sentimentelor:

  • Preziceți succesul unui film
  • Preziceți succesul campaniei politice
  • Decideți dacă investiți într-o anumită companie
  • Publicitate direcționată
  • Examinați produsele și serviciile

Implementarea Spark Streaming:

Găsiți mai jos codul Pseudo:

// Importați pachetele necesare în programul Spark import org.apache.spark.streaming. {Secunde, StreamingContext} import org.apache.spark.SparkContext._ ... import java.io.File object twitterSentiment {def main (args : Array [String]) {if (args.length<4) { System.err.println('Usage: TwitterPopularTags ' + ' []') System.exit(1) } StreamingExamples.setStreamingLogLevels() //Passing our Twitter keys and tokens as arguments for authorization val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) val filters = args.takeRight(args.length - 4) // Set the system properties so that Twitter4j library used by twitter stream // Use them to generate OAuth credentials System.setProperty('twitter4j.oauth.consumerKey', consumerKey) ... System.setProperty('twitter4j.oauth.accessTokenSecret', accessTokenSecret) val sparkConf = new SparkConf().setAppName('twitterSentiment').setMaster('local[2]') val ssc = new Streaming Context val stream = TwitterUtils.createStream(ssc, None, filters) //Input DStream transformation using flatMap val tags = stream.flatMap { status =>Obțineți text din Hashtags} // Transformarea RDD utilizând sortBy și apoi mapați funcțiile tags.countByValue () .foreachRDD {rdd => val now = Obțineți ora curentă a fiecărui Tweet rdd .sortBy (_._ 2) .map (x => (x, acum)) // Salvarea rezultatelor noastre la ~ / twitter / directory .saveAsTextFile (s '~ / twitter / $ now')} // Transformarea DStream folosind funcțiile de filtrare și hartă val tweets = stream.filter {t => tag-uri val = t. Split On Spaces .filter (_. StartsWith ('#')). Conversie la minuscule tags.exists {x => true}} val data = tweets.map {status => val sentiment = SentimentAnalysisUtils.detectSentiment (status.getText) val tagss = status.getHashtagEntities.map (_. GetText.toLowerCase) (status.getText, sentiment.toString, tagss.toString ())} data.print () // Salvarea rezultatelor noastre la ~ / cu nume de fișiere începând ca twitters data.saveAsTextFiles ('~ / twitters', '20000') ssc. start () ssc.awaitTermination ()}}

Rezultate:

Următoarele sunt rezultatele afișate în Eclipse IDE în timpul rulării programului Twitter Sentiment Streaming.

Figura: Analiza sentimentului de ieșire în Eclipse IDE

După cum putem vedea în captura de ecran, toate tweet-urile sunt clasificate în pozitive, neutre și negative în funcție de sentimentul conținutului tweet-urilor.

Rezultatul Sentimentelor Tweet-urilor este stocat în dosare și fișiere în funcție de momentul în care au fost create. Această ieșire poate fi stocată pe sistemul de fișiere local sau HDFS, după caz. Directorul de ieșire arată astfel:

Figura: Dosare de ieșire din folderul nostru de proiect „twitter”

Aici, în directorul twitter, putem găsi numele de utilizator ale utilizatorilor Twitter împreună cu marcajul de timp pentru fiecare tweet așa cum se arată mai jos:

Figura: Fișier de ieșire care conține nume de utilizator Twitter cu marcaj de timp

Acum, că avem numele de utilizator și marca de timp Twitter, să ne uităm la Sentimente și tweets stocate în directorul principal. Aici, fiecare tweet este urmat de emoția sentimentului. Acest sentiment care este stocat este utilizat în continuare pentru a analiza o vastă multitudine de perspective de către companii.

Figura: Fișier de ieșire care conține tweets cu sentimente

Cod de ajustare:

Acum, permiteți-ne să modificăm puțin codul nostru pentru a obține sentimente pentru anumite hashtag-uri (subiecte). În prezent, Donald Trump, președintele Statelor Unite, este în tendințe pe canalele de știri și pe rețelele de socializare online. Să ne uităm la sentimentele asociate cu cuvântul cheie „ Atu ‘.

Figura: Efectuarea analizei sentimentelor pe tweet-uri cu cuvântul cheie „Trump”

clasa scanerului în exemplu java

Mergi mai departe:

Așa cum am văzut din demonstrația noastră de analiză a sentimentelor, putem extrage sentimente din anumite subiecte la fel cum am făcut pentru „Trump”. În mod similar, Sentiment Analytics poate fi utilizat în gestionarea crizelor, reglarea serviciilor și marketingul țintă de către companii din întreaga lume.

Companiile care utilizează Spark Streaming pentru analiza sentimentelor au aplicat aceeași abordare pentru a realiza următoarele:

  1. Îmbunătățirea experienței clienților
  2. Obținerea unui avantaj competitiv
  3. Câștigarea inteligenței de afaceri
  4. Revitalizarea unui brand în pierdere

Cu aceasta, am ajuns la sfârșitul acestui lucru Tutorial Spark Streaming blog. Până acum, trebuie să fi dobândit o înțelegere solidă a ceea ce este Spark Streaming. Cazul de utilizare Twitter Sentiment Analysis vă va oferi încrederea necesară pentru a lucra la orice proiecte viitoare pe care le întâlniți în Spark Streaming și Apache Spark. Practica este cheia stăpânirii oricărui subiect și sper că acest blog v-a creat suficient interes pentru a explora mai departe Apache Spark.

Vă recomandăm următorul tutorial YouTube Spark Streaming de la Edureka pentru a începe:

Spark Streaming | Exemplu de analiză a sentimentelor Twitter | Edureka

Această serie video din Spark Tutorial oferă un fundal complet al componentelor, împreună cu cazuri de utilizare din viața reală, cum ar fi Analiza Sentimentului Twitter , Analiza predicției jocului NBA , Sistem de detectare a cutremurelor , Analiza datelor de zbor și Sisteme de recomandare a filmelor . Am conceput personal cazurile de utilizare astfel încât să oferim o experiență generală tuturor celor care execută codul.

Ai o întrebare pentru noi? Vă rugăm să o menționați în secțiunea de comentarii și vă vom contacta cel mai devreme. Dacă doriți să învățați Spark și să construiți o carieră în domeniul Spark și să vă dezvoltați expertiza pentru a efectua prelucrări de date la scară largă utilizând RDD, Spark Streaming, SparkSQL, MLlib, GraphX ​​și Scala cu cazuri de utilizare Real Life, consultați interactiv pe net Aici, care vine cu suport 24 * 7 pentru a vă ghida pe toată perioada de învățare.