DBInputFormat pentru a transfera date din SQL în baza de date NoSQL



Obiectivul acestui blog este de a învăța cum să transferați date din bazele de date SQL în HDFS, cum să transferați date din bazele de date SQL în bazele de date NoSQL.

În acest blog vom explora capacitățile și posibilitățile uneia dintre cele mai importante componente ale tehnologiei Hadoop, adică MapReduce.

Astăzi, companiile adoptă cadrul Hadoop ca primă alegere pentru stocarea datelor, datorită capacităților sale de a gestiona datele mari în mod eficient. Dar știm, de asemenea, că datele sunt versatile și există în diverse structuri și formate. Pentru a controla o varietate atât de mare de date și diferitele sale formate, ar trebui să existe un mecanism de acomodare a tuturor soiurilor și totuși să producă un rezultat eficient și consecvent.





Cea mai puternică componentă din cadrul Hadoop este MapReduce, care poate oferi controlul asupra datelor și structurii sale mai bine decât ceilalți omologi ai săi. Deși necesită o cheltuială a curbei de învățare și complexitatea programării, dacă puteți gestiona aceste complexități, puteți trata cu siguranță orice fel de date cu Hadoop.

Cadrul MapReduce își împarte toate sarcinile de procesare în două faze: Map și Reduce.



Pregătirea datelor brute pentru aceste faze necesită înțelegerea unor clase de bază și interfețe. Super clasa pentru aceste reprocesări este Formatul de intrare.

Formatul de intrare class este una dintre clasele de bază din API-ul Hadoop MapReduce. Această clasă este responsabilă pentru definirea a două lucruri principale:

  • Împărțirea datelor
  • Cititor de înregistrări

Împărțirea datelor este un concept fundamental în cadrul Hadoop MapReduce care definește atât dimensiunea sarcinilor de hartă individuale, cât și serverul său potențial de execuție. Cititor de înregistrări este responsabil pentru citirea efectivă a înregistrărilor din fișierul de intrare și trimiterea acestora (ca perechi cheie / valoare) către mapator.



c ++ accesați

Numărul de cartografi se decide pe baza numărului de împărțiri. Este sarcina InputFormat să creeze divizările. De cele mai multe ori dimensiunea divizării este echivalentă cu dimensiunea blocului, dar nu întotdeauna se vor crea divizări pe baza dimensiunii blocului HDFS. Depinde în totalitate de modul în care metoda getSplits () a InputFormat a fost anulată.

Există o diferență fundamentală între divizarea MR și blocul HDFS. Un bloc este o bucată fizică de date, în timp ce o împărțire este doar o bucată logică pe care o citește un cartograf. O divizare nu conține datele de intrare, deține doar o referință sau o adresă a datelor. O divizare are în principiu două lucruri: o lungime în octeți și un set de locații de stocare, care sunt doar șiruri.

Pentru a înțelege mai bine acest lucru, să luăm un exemplu: Prelucrarea datelor stocate în MySQL folosind MR. Deoarece nu există un concept de blocuri în acest caz, teoria: „divizările sunt întotdeauna create pe baza blocului HDFS”,eșuează. O posibilitate este să creați împărțiri pe baza intervalelor de rânduri din tabelul MySQL (și asta face DBInputFormat, un format de intrare pentru citirea datelor dintr-o bază de date relațională). S-ar putea să avem k număr de divizări constând din n rânduri.

Numai pentru InputFormats bazate pe FileInputFormat (un InputFormat pentru gestionarea datelor stocate în fișiere) se creează divizările pe baza dimensiunii totale, în octeți, a fișierelor de intrare. Cu toate acestea, dimensiunea blocului FileSystem al fișierelor de intrare este tratată ca o limită superioară pentru divizările de intrare. Dacă aveți un fișier mai mic decât dimensiunea blocului HDFS, veți obține doar 1 cartograf pentru fișierul respectiv. Dacă doriți să aveți un comportament diferit, puteți utiliza mapred.min.split.size. Dar depinde din nou numai de getSplits () al InputFormat.

Avem atâtea formate de intrare preexistente disponibile sub pachetul org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Valoarea implicită este TextInputFormat.

În mod similar, avem atât de multe formate de ieșire care citesc datele de la reductoare și le stochează în HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Implicit fiind TextOutputFormat.

Până când terminați de citit acest blog, ați fi învățat:

  • Cum se scrie un program de reducere a hărții
  • Despre diferite tipuri de InputFormats disponibile în Mapreduce
  • Care este nevoia InputFormats
  • Cum se scrie InputFormats personalizate
  • Cum se transferă date din bazele de date SQL în HDFS
  • Cum se transferă date din baze de date SQL (aici MySQL) în baze de date NoSQL (aici Hbase)
  • Cum să transferați date dintr-o bază de date SQL în alt tabel din bazele de date SQL (Poate că acest lucru nu poate fi atât de important dacă facem acest lucru în aceeași bază de date SQL. Cu toate acestea, nu este nimic în neregulă dacă aveți cunoștințe despre aceleași. Nu știți niciodată cum poate intra în uz)

Condiție preliminară:

  • Hadoop preinstalat
  • SQL preinstalat
  • Hbase preinstalat
  • Înțelegere de bază Java
  • MapReduce cunoștințele
  • Cunoștințe de bază cadru Hadoop

Să înțelegem afirmația problemei pe care o vom rezolva aici:

Avem un tabel de angajați în MySQL DB în baza noastră de date relațională Edureka. Acum, conform cerințelor de afaceri, trebuie să mutăm toate datele disponibile în DB relațional către sistemul de fișiere Hadoop, adică HDFS, DB NoSQL cunoscut sub numele de Hbase.

Avem multe opțiuni pentru a realiza această sarcină:

  • Sqoop
  • Canal
  • MapReduce

Acum, nu doriți să instalați și să configurați orice alt instrument pentru această operațiune. Rămâneți cu o singură opțiune, care este cadrul de procesare MapReduce de la Hadoop. Cadrul MapReduce vă va oferi control deplin asupra datelor în timpul transferului. Puteți manipula coloanele și pune direct în oricare dintre cele două locații țintă.

Notă:

  • Trebuie să descărcăm și să punem conectorul MySQL în clasa Hadoop pentru a prelua tabele din tabelul MySQL. Pentru a face acest lucru, descărcați conectorul com.mysql.jdbc_5.1.5.jar și păstrați-l în directorul Hadoop_home / share / Hadoop / MaPreduce / lib.
Descărcări cp / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • De asemenea, puneți toate borcanele Hbase sub clasa Hadoop pentru a vă permite accesul programului MR la Hbase. Pentru a face acest lucru, executați următoarea comandă :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Versiunile software pe care le-am folosit în executarea acestei sarcini sunt:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Eclipse Moon

Pentru a evita programul în orice problemă de compatibilitate, îmi prescriu cititorilor să ruleze comanda cu un mediu similar.

DBInputWritable personalizat:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) aruncă SQLException // Obiectul Resultset reprezintă datele returnate dintr-o instrucțiune SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) aruncă IOException { } public void write (PreparedStatement ps) aruncă SQLException {ps.setInt (1, id) ps.setString (2, nume) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Personalizat DBOutputWritable:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implements Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = nume this.id = id this.dept = dept} public void readFields (DataInput in) aruncă IOException {} public void readFields (ResultSet rs) aruncă SQLException {} public void write (DataOutput out) aruncă IOException {} public void write (PreparedStatement ps) aruncă SQLException {ps.setString (1, nume) ps.setInt (2, id) ps.setString (3, dept)}}

Tabel de intrare:

creați baza de date edureka
creați tabelul emp (empid int nu nul, nume varchar (30), dept varchar (20), cheie primară (empid))
introduceți în valorile emp (1, „abhay”, „dezvoltare”), (2, „brundesh”, „test”)
selectați * din emp

Cazul 1: Transfer de la MySQL la HDFS

pachet com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // clasa driverului' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // nume utilizator' root ') // parola Job job = job Job nou (conf) .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOut new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nume tabel de intrare nul, nul, șir nou [] {'empid', 'nume', 'dept'} / / coloane tabel) Calea p = calea nouă (args [0]) FileSystem fs = FileSystem.get (URI nou (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (adevărat)? 0: 1)}}

Această bucată de cod ne permite să pregătim sau să configurăm formatul de intrare pentru a accesa baza noastră de date SQL sursă. Parametrul include clasa driverului, adresa URL are adresa bazei de date SQL, numele de utilizator și parola.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // clasa driverului 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // numele utilizatorului 'root') //parola

Această bucată de cod ne permite să trecem detaliile tabelelor din baza de date și să le setăm în obiectul lucrării. Parametrii includ, desigur, instanța de lucru, clasa personalizabilă de scriere care trebuie să implementeze interfața DBWritable, numele tabelului sursă, condiția dacă este altceva nul, orice parametri de sortare altfel nul, respectiv lista coloanelor tabelului.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nume tabel de intrare nul, nul, șir nou [] {'empid', 'nume', 'dept'} // coloane de tabel)

Mapper

pachet com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map extinde Mapper {
hartă nulă protejată (cheie LongWritable, valoare DBInputWritable, context ctx) {încercați {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (text nou (nume + '' + id + '' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reductor: Reductor de identitate folosit

Comandă pentru a rula:

jar hadoop dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Ieșire: tabelul MySQL transferat la HDFS

hadoop dfs -ls / dbtohdfs / *

Cazul 2: Transfer de la un tabel în MySQL la altul în MySQL

crearea tabelului de ieșire în MySQL

creați tabelul angajat1 (nume varchar (20), id int, dept varchar (20))

pachet com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) aruncă Excepție {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // clasa driverului 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // nume utilizator' root ') // parolă Job job = job nou (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nume tabel de intrare nul, nul, nou String [] {'emp ',' nume ',' dept '} // coloane tabel) DBOutputFormat.setOutput (job,' angajat1 ', // ieșire nume tabel Șir [] {' nume ',' id ',' dept '} // tabel coloane) System.exit (job.waitForCompletion (adevărat)? 0: 1)}}

Această bucată de cod ne permite să configurăm numele tabelului de ieșire în SQL DB. Parametrii sunt instanța jobului, numele tabelului de ieșire și respectiv numele coloanelor de ieșire.

DBOutputFormat.setOutput (job, 'angajat1', // ieșire nume tabel șir [] {'nume', 'id', 'dept'} // coloane tabel)

Mapper: La fel ca în cazul 1

Reductor:

pachet com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (Key text, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (line [0] .toString (), Integer.parseInt (line [1] .toString ()), line [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Comandă pentru a rula:

jar hadoop dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Ieșire: date transferate din tabelul EMP din MySQL către alt tabel Employee1 din MySQL

Cazul 3: Transfer din tabel în MySQL în tabelul NoSQL (Hbase)

Crearea tabelului Hbase pentru a găzdui ieșirea din tabelul SQL:

creați „angajat”, „informații_oficial”

Clasa șoferului:

pachet Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // clasa driverului 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // numele utilizatorului 'root') // parola Job job = job nou (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('angajat', Reduce.class, job) Job.setInputFormat. class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nume tabel de intrare nul, nul, șir nou [] {'empid', 'nume', 'dept'} // coloane de tabel) System.exit (job.waitForCompletion (adevărat)? 0: 1)}}

Această bucată de cod vă permite să configurați clasa cheii de ieșire care, în cazul hbase, este ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Aici trecem numele tabelului hbase și reductorul pentru a acționa pe masă.

TableMapReduceUtil.initTableReducerJob („angajat”, Reduce.class, job)

Cartograf:

pachet Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extinde Mapper {private IntWritable one = new IntWritable (1) hartă nulă protejată (LongWritable id, DBInputWritable value, context context) {încercați {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), Text nou (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

În această bucată de cod, luăm valori de la getters din clasa DBinputwritable și apoi le transmitem
ImmutableBytesWritable astfel încât să ajungă la reductor sub formă de bytewriatble pe care Hbase îl înțelege.

String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), Text nou (line +' '+ dept )))

Reductor:

pachet Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce extinde TableReducer {public void reduce (cheie ImmutableBytesWritable, valori Iterable, context context) aruncă IOException, InterruptedException {String [] cause = null // Loop values pentru (Text val: valori) {cauza = val.toString (). split ('')} // Puneți în HBase Puneți pune = nou Puneți (key.get ()) put.add (Bytes.toBytes ('official_info' ), Bytes.toBytes ('nume'), Bytes.toBytes (cauza [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('departament'), Bytes.toBytes (cauza [1] ])) context.write (key, put)}}

Această bucată de cod ne permite să decidem rândul exact și coloana în care ar fi stocate valorile din reductor. Aici stocăm fiecare empid în rând separat, pe măsură ce am făcut empid ca cheie de rând, care ar fi unică. În fiecare rând stocăm informațiile oficiale ale angajaților în familia de coloane „official_info” sub coloanele „nume” și respectiv „departament”.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('nume'), Bytes.toBytes (cauza [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('departamentul'), Bytes.toBytes (cauza [1])) context.write (cheie, pus)

Date transferate în Hbase:

angajat scanare

După cum vedem, am reușit să finalizăm cu succes sarcina de a migra datele noastre de afaceri dintr-un DB SQL relațional într-un DB NoSQL.

În următorul blog vom afla cum să scriem și să executăm coduri pentru alte formate de intrare și ieșire.

Continuați să publicați comentariile, întrebările sau orice feedback. Mi-ar plăcea să aud de la tine.

Ai o întrebare pentru noi? Vă rugăm să o menționați în secțiunea de comentarii și vă vom răspunde.

Postări asemănatoare: