Introduktion
Python används flitigt bland dataingenjörer och dataforskare för att lösa alla möjliga problem från ETL/ELT-pipelines till att bygga modeller för maskininlärning. Apache HBase är ett effektivt datalagringssystem för många arbetsflöden men att få åtkomst till denna data specifikt via Python kan vara en kamp. För dataproffs som vill använda data som lagrats i HBase kan det senaste uppströmsprojektet "hbase-connectors" användas med PySpark för grundläggande operationer.
I den här bloggserien kommer vi att förklara hur man konfigurerar PySpark och HBase tillsammans för grundläggande Spark-användning såväl som för jobb som underhålls i CDSW. För de som inte är bekanta med CDSW är det en säker företagsdatavetenskapsplattform med självbetjäning för datavetare att hantera sina egna analyspipelines och på så sätt påskynda maskininlärningsprojekt från utforskning till produktion. För mer information om CDSW besök Cloudera Data Science Workbench produktsida.
I det här inlägget kommer flera operationer att förklaras och demonstreras tillsammans med exempelutdata. För sammanhanget körs alla exempeloperationer i det här specifika blogginlägget med en CDSW-distribution.
Förutsättningar:
- Ha ett CDP-kluster med HBase och Spark
- Om du ska följa exempel via CDSW behöver du den installerad – Installera Cloudera Data Science Workbench
- Python 3 installeras på varje nod på samma väg
Konfiguration:
Först måste HBase och Spark konfigureras tillsammans för att Spark SQL-frågor ska fungera korrekt. För att göra det finns det två delar:först, konfigurera HBase Region-servrarna genom Cloudera Manager; och för det andra, se till att Spark-körtiden har HBase-bindningar. En anmärkning att komma ihåg är dock att Cloudera Manager redan ställer in vissa konfigurations- och miljövariabler för att automatiskt peka Spark mot HBase åt dig. Ändå är det första steget med att konfigurera Spark SQL-frågor vanligt genom alla typer av distribution på CDP-kluster, men det andra är något annorlunda beroende på typ av distribution.
Konfigurera HBase Region-servrar
- Gå till Cloudera Manager och välj HBase-tjänsten.
- Sök efter "regionserver environment"
- Lägg till en ny miljövariabel med hjälp av RegionServer Environment Advanced Configuration Snippet (säkerhetsventil):
- Nyckel:HBASE_CLASSPATH
- Värde:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Se till att du använder rätt versionsnummer.
- Starta om regionservrar.
När du har följt stegen ovan följer du stegen nedan beroende på om du vill ha en CDSW- eller icke-CDSW-distribution.
Lägga till HBase-bindningar till Spark Runtime i icke-CDSW-distributioner
För att distribuera skalet eller använda spark-submit korrekt, använd följande kommandon för att säkerställa att sparken har rätt HBase-bindningar.
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. burk
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/clooudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- shaded.jar
Lägga till HBase-bindningar till Spark Runtime i CDSW-distributioner
För att konfigurera CDSW med HBase och PySpark finns det några steg du måste ta.
1) Se till att Python 3 är installerat på varje klusternod och notera sökvägen till den
2) Gör ett nytt projekt i CDSW och använd en PySpark-mall
3) Öppna projektet, gå till Inställningar -> Motor -> Miljövariabler.
4) Ställ in PYSPARK3_DRIVER_PYTHON och PYSPARK3_PYTHON till sökvägen där Python är installerad på dina klusternoder (Sökväg noteras i steg 1).
Nedan är ett exempel på hur det ska se ut.
5) I ditt projekt, gå till Files -> spark-defaults.conf och öppna det i Workbench
6) Kopiera och klistra in raden nedan i filen och se till att den har sparats innan du startar en ny session.
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
Vid det här laget är CDSW nu konfigurerat för att köra PySpark-jobb på HBase! Resten av det här blogginlägget hänvisar till några exempel på operationer på en CDSW-distribution.
Exempel på operationer
Put Operations
Det finns två sätt att infoga och uppdatera rader i HBase. Den första och mest rekommenderade metoden är att bygga en katalog, vilket är ett schema som mappar kolumnerna i en HBase-tabell till en PySpark-dataram samtidigt som tabellnamnet och namnutrymmet specificeras. Att bygga detta användardefinierade JSON-format är den mest föredragna metoden eftersom det också kan användas med andra operationer. För mer information om kataloger, se den här dokumentationen http://hbase.apache.org/book.html#_define_catalog. Den andra metoden använder en specifik mappningsparameter som kallas "hbase.columns.mapping" som bara tar en sträng nyckel-värdepar.
- Använda kataloger
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
Verifiera att en ny tabell som heter "tblEmployee" skapas i HBase genom att helt enkelt öppna HBase-skalet och köra följande kommando:
skanna 'tblEmployee', {'LIMIT' => 2}
Att använda kataloger kan också göra det möjligt för dig att ladda HBase-tabeller enkelt. Detta kommer att diskuteras i en framtida del.
- Använda hbase.columns.mapping
När du skriver PySpark Dataframe kan ett alternativ som heter "hbase.columns.mapping" läggas till för att inkludera en sträng som mappar kolumnerna korrekt. Det här alternativet låter dig bara infoga rader i befintliga tabeller.
I HBase-skalet, låt oss först skapa en tabell skapa 'tblEmployee2', 'personal'
Låt oss nu infoga 2 rader i PySpark med "hbase.columns.mapping"
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
Återigen, verifiera bara att en ny tabell som heter "tblAnställd2" har dessa nya rader.
skanna 'tblEmployee2', {'LIMIT' => 2}
Det kompletterar våra exempel på hur man infogar rader genom PySpark i HBase-tabeller. I nästa avsnitt kommer jag att diskutera Get and Scan Operations, PySpark SQL och lite felsökning. Tills dess bör du skaffa ett CDP-kluster och arbeta dig igenom dessa exempel.