Transformare cumulată de stare în Apache Spark Streaming



Această postare pe blog discută despre transformările de stare în Spark Streaming. Aflați totul despre urmărirea cumulativă și îmbunătățirea abilităților pentru o carieră Hadoop Spark.

Contribuție de Prithviraj Bose

În blogul meu anterior am discutat despre transformările de stare folosind conceptul de fereastră al Apache Spark Streaming. O puteți citi Aici .





În această postare voi discuta despre operațiuni cumulative de stare în Apache Spark Streaming. Dacă sunteți nou în Spark Streaming, vă recomand cu tărie să citiți blogul meu anterior pentru a înțelege cum funcționează ferestrele.

Tipuri de transformări de stare în fluxul de scântei (continuare ...)

> Urmărire cumulativă

Folosisem reduceByKeyAndWindow (...) API pentru a urmări stările tastelor, totuși ferestrele prezintă limitări pentru anumite cazuri de utilizare. Ce se întâmplă dacă vrem să acumulăm stările tastelor peste tot, decât să le limităm la o fereastră de timp? În acest caz, ar trebui să folosim updateStateByKey (...) FOC.



Acest API a fost introdus în Spark 1.3.0 și a fost foarte popular. Cu toate acestea, acest API are unele performanțe generale, performanța acestuia se degradează pe măsură ce dimensiunea stărilor crește în timp. Am scris un eșantion pentru a arăta utilizarea acestui API. Puteți găsi codul Aici .

Spark 1.6.0 a introdus un nou API mapWithState (...) care rezolvă cheltuielile generale ale performanței reprezentate de updateStateByKey (...) . În acest blog voi discuta acest API special folosind un exemplu de program pe care l-am scris. Puteți găsi codul Aici .

Înainte de a mă scufunda într-o prezentare a codului, să economisim câteva cuvinte despre checkpointing. Pentru orice transformare de stare, punctul de control este obligatoriu. Checkpointing-ul este un mecanism de restabilire a stării tastelor în cazul în care programul driverului eșuează. Când driverul repornește, starea tastelor este restaurată din fișierele punctelor de control. Locațiile punctelor de control sunt de obicei HDFS sau Amazon S3 sau orice stocare de încredere. În timp ce testați codul, puteți stoca și în sistemul de fișiere local.



În programul eșantion, ascultăm fluxul de text socket pe host = localhost și port = 9999. Acesta simbolizează fluxul de intrare în (cuvinte, numărul de apariții) și urmărește numărul de cuvinte folosind API-ul 1.6.0 mapWithState (...) . În plus, tastele fără actualizări sunt eliminate cu ajutorul StateSpec.timeout API. Suntem puncte de control în HDFS și frecvența punctelor de control este la fiecare 20 de secunde.

Să creăm mai întâi o sesiune Spark Streaming,

Spark-streaming-session

Creăm un checkpointDir în HDFS și apoi apelați metoda obiect getOrCreate (...) . getOrCreate API verifică checkpointDir pentru a vedea dacă există stări anterioare de restaurat, dacă există, atunci recreează sesiunea Spark Streaming și actualizează stările cheilor din datele stocate în fișiere înainte de a continua cu date noi. În caz contrar, se creează o nouă sesiune Spark Streaming.

gestionarea excepțiilor în procedura stocată oracle

getOrCreate preia numele directorului punctului de control și o funcție (pe care am numit-o createFunc ) a cărei semnătură ar trebui să fie () => StreamingContext .

Să examinăm codul din interior createFunc .

Linia # 2: Creăm un context de streaming cu numele lucrării la „TestMapWithStateJob” și intervalul de lot = 5 secunde.

Linia # 5: Setați directorul punctului de control.

Linia # 8: Setați specificațiile de stare folosind clasa org.apache.streaming.StateSpec obiect. Mai întâi setăm funcția care va urmări starea, apoi stabilim numărul de partiții pentru DStream-urile rezultate care urmează să fie generate în timpul transformărilor ulterioare. În cele din urmă, setăm timpul de expirare (la 30 de secunde) în cazul în care dacă nu se primește nicio actualizare pentru o cheie în 30 de secunde, atunci starea cheii va fi eliminată.

Linia 12 #: Configurați fluxul de socket, aplatizați datele lotului primite, creați o pereche cheie-valoare, apelați mapWithState , setați intervalul punctului de control la 20s și, în cele din urmă, imprimați rezultatele.

Cadrul Spark îl numește pe th e createFunc pentru fiecare tastă cu valoarea anterioară și starea curentă. Calculăm suma și actualizăm starea cu suma cumulativă și în cele din urmă returnăm suma pentru cheie.

Surse Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Ai o întrebare pentru noi? Vă rugăm să o menționați în secțiunea de comentarii și vă vom răspunde.

Postări asemănatoare:

Începeți cu Apache Spark & ​​Scala

Transformări de stare cu Windowing în Spark Streaming