sql >> Databasteknik >  >> NoSQL >> HBase

Spark-on-HBase:DataFrame-baserad HBase-kontakt

Det här blogginlägget publicerades på Hortonworks.com före sammanslagningen med Cloudera. Vissa länkar, resurser eller referenser kanske inte längre är korrekta.

Vi är stolta över att kunna presentera den tekniska förhandsvisningen av Spark-HBase Connector, utvecklad av Hortonworks i samarbete med Bloomberg.

Spark-HBase-anslutningen utnyttjar Data Source API (SPARK-3247) som introducerades i Spark-1.2.0. Den överbryggar klyftan mellan det enkla HBase Key Value-lagret och komplexa relationsbaserade SQL-frågor och gör det möjligt för användare att utföra komplexa dataanalyser ovanpå HBase med hjälp av Spark. En HBase DataFrame är en standard Spark DataFrame och kan interagera med andra datakällor som Hive, ORC, Parquet, JSON, etc.

Bakgrund

Det finns flera Spark HBase-kontakter med öppen källkod tillgängliga antingen som Spark-paket, som oberoende projekt eller i HBase-trunk.

Spark har flyttat till Dataset/DataFrame API, som ger inbyggd frågeplansoptimering. Nu föredrar slutanvändare att använda DataFrames/Datasets baserat gränssnitt.

HBase-kontakten i HBase-stammen har ett rikt stöd på RDD-nivå, t.ex. BulkPut, etc, men dess DataFrame-stöd är inte lika rikt. HBase trunkkontakt förlitar sig på standard HadoopRDD med HBase inbyggt TableInputFormat har vissa prestandabegränsningar. Dessutom kan BulkGet som utförs i drivrutinen vara en enda felpunkt.

Det finns några andra alternativa implementeringar. Ta Spark-SQL-on-HBase som ett exempel. Den tillämpar mycket avancerade anpassade optimeringstekniker genom att bädda in sin egen frågeoptimeringsplan i standard Spark Catalyst-motorn, skickar RDD till HBase och utför komplicerade uppgifter, såsom partiell aggregering, inuti HBase-samprocessorn. Detta tillvägagångssätt kan uppnå hög prestanda, men det är svårt att underhålla på grund av dess komplexitet och den snabba utvecklingen av Spark. Att även tillåta godtycklig kod att köras inuti en samprocessor kan innebära säkerhetsrisker.

Spark-on-HBase Connector (SHC) har utvecklats för att övervinna dessa potentiella flaskhalsar och svagheter. Den implementerar standard Spark Datasource API och utnyttjar Spark Catalyst-motorn för frågeoptimering. Parallellt konstrueras RDD:n från början istället för att använda TableInputFormat för att uppnå hög prestanda. Med denna skräddarsydda RDD kan alla kritiska tekniker tillämpas och implementeras fullt ut, såsom partitionsbeskärning, kolumnbeskärning, predikat pushdown och datalokalitet. Designen gör underhållet mycket enkelt, samtidigt som en bra avvägning mellan prestanda och enkelhet uppnås.

Arkitektur

Vi antar att Spark och HBase är utplacerade i samma kluster och Spark-exekutorer är samlokaliserade med regionservrar, som illustreras i figuren nedan.

Figur 1. Spark-on-HBase Connector Architecture

På en hög nivå behandlar kontakten både Scan och Get på ett liknande sätt, och båda åtgärderna utförs i exekutörerna. Drivrutinen bearbetar frågan, sammanställer skanningar/hämtningar baserat på regionens metadata och genererar uppgifter per region. Uppgifterna skickas till de föredragna exekutorerna som är samlokaliserade med regionservern och utförs parallellt i executorerna för att uppnå bättre datalokalitet och samtidighet. Om en region inte har de data som krävs tilldelas den regionservern ingen uppgift. En uppgift kan bestå av flera skanningar och bulkGets, och dataförfrågningarna från en uppgift hämtas från endast en regionserver, och denna regionserver kommer också att vara lokalinställningen för uppgiften. Observera att föraren inte är inblandad i det verkliga jobbet utom för schemaläggningsuppgifter. Detta undviker att föraren blir flaskhalsen.

Tabellkatalog

För att få in HBase-tabellen som en relationstabell i Spark, definierar vi en mappning mellan HBase- och Spark-tabeller, kallad Table Catalog. Det finns två viktiga delar av denna katalog. Den ena är radnyckeldefinitionen och den andra är mappningen mellan tabellkolumnen i Spark och kolumnfamiljen och kolumnkvalificeraren i HBase. Se avsnittet Användning för detaljer.

Native Avro-stöd

Anslutningen stöder Avro-formatet inbyggt, eftersom det är en mycket vanlig praxis att bevara strukturerad data i HBase som en byte-array. Användaren kan bevara Avro-posten i HBase direkt. Internt konverteras Avro-schemat automatiskt till en inbyggd Spark Catalyst-datatyp. Observera att båda nyckel-värde-delarna i en HBase-tabell kan definieras i Avro-format. Se exemplen/testfallen i repan för exakt användning.

Predikat Pushdown

Anslutningen hämtar endast nödvändiga kolumner från regionservern för att minska nätverkskostnader och undvika redundant bearbetning i Spark Catalyst-motorn. Befintliga standard HBase-filter används för att utföra predikat-push-down utan att utnyttja samprocessorkapaciteten. Eftersom HBase inte är medveten om datatypen förutom byte-arrayen och ordningsinkonsekvensen mellan Java primitiva typer och byte-array, måste vi förbehandla filtervillkoret innan vi ställer in filtret i Scan-operationen för att undvika dataförlust. Inuti regionservern filtreras poster som inte matchar frågevillkoret bort.

Partitionsbeskärning

Genom att extrahera radnyckeln från predikaten delar vi Scan/BulkGet i flera icke-överlappande intervall, endast regionservrarna som har den begärda informationen kommer att utföra Scan/BulkGet. För närvarande utförs partitionsbeskärningen på den första dimensionen av radnycklarna. Till exempel, om en radnyckel är "nyckel1:nyckel2:nyckel3", kommer partitionsbeskärningen endast att baseras på "nyckel1". Observera att WHERE-villkoren måste definieras noggrant. Annars kan det hända att partitionsbeskärningen inte träder i kraft. Till exempel, WHERE rowkey1> "abc" ELLER kolumn ="xyz" (där rowkey1 är den första dimensionen av radnyckeln och kolumn är en vanlig hbase-kolumn) kommer att resultera i en fullständig genomsökning, eftersom vi måste täcka alla områden eftersom av ELLER logik.

Datalokalitet

När en Spark-exekutor är samlokaliserad med HBase-regionservrar, uppnås datalokalitet genom att identifiera regionserverns plats, och gör det bästa för att samlokalisera uppgiften med regionservern. Varje exekutor utför Scan/BulkGet på den del av data som är samlokaliserad på samma värd.

Skanna och BulkGet

Dessa två operatorer exponeras för användare genom att ange WHERE CLAUSE, t.ex. WHERE kolumn> x och kolumn för skanning och WHERE kolumn =x glömma. Operationerna utförs i exekutörerna, och föraren konstruerar endast dessa operationer. Internt konverteras de till scan och/eller get, och Iterator[Row] återförs till katalysatormotorn för bearbetning av det övre skiktet.

Användning

Följande illustrerar den grundläggande proceduren för hur man använder kontakten. För mer information och avancerade användningsfall, som Avro och stöd för sammansatta nyckel, se exemplen i arkivet.

1) Definiera katalogen för schemamappningen:

[code language="scala"]def catalog =s"""{        |"table":{"namespace":"default", "name":"table1"},        |"rowkey":"key" ,        |"columns":{          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},          |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"},          |"col2":{"cf":"cf2", "col":"col2", "type":"dubbel"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"},          |"col4":{"cf":"cf4", "col":" col4", "type":"int"},          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},          |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"},          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}        |}      |}""".stripMargin[/code] 

2) Förbered data och fyll i HBase-tabellen:
fallklass HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

objekt HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 till 255).map { i =>  HBaseRecord(i, “extra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Ladda DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(catalog)

4) Språkintegrerad fråga:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
 $”col0″ ===“row005” ||
 $”col0″ ===“row020” ||
 $”col0″ === “r20” ||
 $”col0″ <=“row005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .visa

5) SQL-fråga:
df.registerTempTable(“tabell”)
sqlContext.sql(“select count(col1) from table”).show

Konfigurera Spark-paketet

Användare kan använda Spark-on-HBase-kontakten som ett standardpaket för Spark. För att inkludera paketet i din Spark-applikation, använd:

spark-shell, pyspark eller spark-submit

> $SPARK_HOME/bin/spark-shell –paket zhzhan:shc:0.0.11-1.6.1-s_2.10

Användare kan också inkludera paketet som beroende i din SBT-fil. Formatet är spark-paketnamn:version

spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”

Körs i Secure Cluster

För att köra i ett Kerberos-aktiverat kluster måste användaren inkludera HBase-relaterade burkar i klassvägen eftersom hämtning och förnyelse av HBase-token görs av Spark och är oberoende av anslutningen. Användaren behöver med andra ord initiera miljön på normalt sätt, antingen genom kinit eller genom att tillhandahålla principal/keytab. Följande exempel visar hur man kör i ett säkert kluster med både garnklient- och garnklusterläge. Observera att SPARK_CLASSPATH måste ställas in för båda lägena, och exempelburken är bara en platshållare för Spark.

export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Anta att hrt_qa är ett huvudlöst konto, användaren kan använda följande kommando för kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –antal-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Lägg ihop allt

Vi har precis gett en snabb översikt över hur HBase stöder Spark på DataFrame-nivå. Med DataFrame API kan Spark-applikationer arbeta med data lagrade i HBase-tabellen lika enkelt som all data lagrad i andra datakällor. Med denna nya funktion kan data i HBase-tabeller enkelt konsumeras av Spark-applikationer och andra interaktiva verktyg, t.ex. användare kan köra en komplex SQL-fråga ovanpå en HBase-tabell inuti Spark, utföra en tabellkoppling mot Dataframe eller integrera med Spark Streaming för att implementera ett mer komplicerat system.

Vad är nästa steg?

För närvarande är kontakten värd i Hortonworks repo och publiceras som ett Spark-paket. Den håller på att migreras till Apache HBase trunk. Under migreringen identifierade vi några kritiska buggar i HBase-trunken, och de kommer att fixas tillsammans med sammanslagningen. Gemenskapsarbetet spåras av paraplyet HBase JIRA HBASE-14789, inklusive HBASE-14795 och HBASE-14796  för att optimera den underliggande datorarkitekturen för Scan och BulkGet, HBASE-14801 för att tillhandahålla JSON-användargränssnitt för enkel användning, HBASE-153BASE DataFrame-skrivvägen, HBASE-15334 för Avro-stöd, HBASE-15333  för att stödja primitiva Java-typer, såsom short, int, long, float och double, etc., HBASE-15335 för att stödja sammansatt radnyckel och HBASE-15572 för att lägga till valfri tidsstämpel semantik. Vi ser fram emot att ta fram en framtida version av kontakten som gör kontakten ännu lättare att arbeta med.

Bekräftelse

Vi vill tacka Hamel Kothari, Sudarshan Kadambi och Bloomberg-teamet för att vägleda oss i detta arbete och även hjälpa oss att validera detta arbete. Vi vill också tacka HBase-communityt för att de lämnat feedback och gjort detta bättre. Slutligen har detta arbete utnyttjat lärdomarna från tidigare Spark HBase-integrationer och vi vill tacka deras utvecklare för att de banade vägen.

Referens:

SHC:https://github.com/hortonworks/shc-release

Spark-package:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. Hur man får återuppringning när nyckeln går ut i REDIS

  2. Letar efter en lösning mellan att ställa in många timers eller att använda en schemalagd uppgiftskö

  3. MongoDB findAndModify()

  4. Hur man kontrollerar att socket är levande (anslutet) i socket.io med flera noder och socket.io-redis