sql >> Databasteknik >  >> NoSQL >> HBase

Så här gör du:Skanna Salted Apache HBase-tabeller med regionspecifika nyckelområden i MapReduce

Tack till Pengyu Wang, mjukvaruutvecklare på FINRA, för tillstånd att publicera det här inlägget igen.

Salted Apache HBase-tabeller med pre-split är en beprövad effektiv HBase-lösning för att ge enhetlig arbetsbelastningsfördelning över RegionServers och förhindra hot spots under bulkskrivningar. I denna design görs en radnyckel med en logisk nyckel plus salt i början. Ett sätt att generera salt är genom att beräkna n (antal regioner) modulo på hashkoden för den logiska radnyckeln (datum, etc).

Saltningsradsnycklar

Till exempel kan en tabell som accepterar dataladdning dagligen använda logiska radnycklar som börjar med ett datum, och vi vill dela upp tabellen i förväg i 1 000 regioner. I det här fallet räknar vi med att generera 1 000 olika salter. Saltet kan genereras till exempel som:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey =2015-04-26|abcrowKey =893|2015-04-26|abc

Utdata från hashCode() med modulo ger slumpmässighet för saltvärde från "000" till "999". Med denna nyckeltransform är tabellen fördelad på saltgränserna när den skapas. Detta kommer att göra radvolymerna jämnt fördelade när hFiles laddas med MapReduce bulkload. Det garanterar att radnycklar med samma salt hamnar i samma region.

I många användningsfall, som dataarkivering, måste du skanna eller kopiera data över ett visst logiskt nyckelintervall (datumintervall) med hjälp av MapReduce-jobbet. Standardtabell MapReduce-jobb ställs in genom att tillhandahålla Scan instans med nyckelintervallsattribut.

Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Ställ in tabellmappningsjobbet */TableMapReduceUtil.initTableMapperJob(tabellnamn,scan,DataScanMapper.class,ImmutableBytesWritable.class, KeyValue.class,job, true, TableInputFormat.class);...

Men installationen av ett sådant jobb blir utmanande för saltade fördelade bord. Start- och stoppradnycklar kommer att vara olika för varje region eftersom var och en har ett unikt salt. Och vi kan inte ange flera intervall för en Scan instans.

För att lösa detta problem måste vi undersöka hur tabellen MapReduce fungerar. Generellt skapar MapReduce-ramverket en kartuppgift för att läsa och bearbeta varje indatadelning. Varje uppdelning genereras i InputFormat klassbas, med metoden getSplits() .

I HBase-tabellen MapReduce-jobb, TableInputFormat används som InputFormat . Inuti implementeringen, getSplits() metoden åsidosätts för att hämta start- och stoppradnycklarna från Scan exempel. Eftersom start- och stoppradsnycklarna sträcker sig över flera regioner delas intervallet med regiongränser och returnerar listan med TableSplit objekt som täcker skanningsnyckelområdet. Istället för att vara baserad på HDFS-block, TableSplit s är baserade på region. Genom att skriva över getSplits() metoden kan vi styra TableSplit .

Bygga anpassat TableInputFormat

För att ändra beteendet för getSplits() metod, en anpassad klass som utökar TableInputFormat krävs. Syftet med getSplits() här är att täcka det logiska nyckelområdet i varje region, konstruera deras radnyckelintervall med deras unika salt. Klassen HTable tillhandahåller metoden getStartEndKeys() som returnerar start- och slutradnycklar för varje region. Från varje startknapp, analysera motsvarande salt för regionen.

Par nycklar =table.getStartEndKeys();for (int i =0; i  

Jobbkonfigurationen klarar logiskt nyckelområde

TableInputFormat hämtar start- och stoppnyckeln från Scan exempel. Eftersom vi inte kan använda Scan i vårt MapReduce-jobb skulle vi kunna använda Configuration istället för att skicka dessa två variabler och endast logisk start- och stoppnyckel är tillräckligt bra (en variabel kan vara ett datum eller annan affärsinformation). getSplits() metoden har JobContext argument, Konfigurationsinstansen kan läsas som context.getConfiguration() .

I MapReduce-drivrutinen:

Configuration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop ", "2015-04-27");

I Custom TableInputFormat :

@Override public List getSplits(JobContext context) kastar IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan .stop");...}

Rekonstruera Salted Key Range efter region

Nu när vi har salt- och logisk start/stopp-nyckel för varje region kan vi bygga om det faktiska radnyckelintervallet.

byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);

Skapa en tabelldelning för varje region

Med radnyckelintervall kan vi nu initiera TableSplit instans för regionen.

List splits =new ArrayList(keys.getFirst().length);for (int i =0; i  

En sak till att titta på är datalokalitet. Ramverket använder platsinformation i varje ingångsdelning för att tilldela en kartuppgift i sin lokala värd. För vårt TableInputFormat , använder vi metoden getTableRegionLocation() för att hämta regionsplatsen som betjänar radnyckeln.

Denna plats skickas sedan till TableSplit konstruktör. Detta säkerställer att mapparen som bearbetar tabelldelningen är på samma regionserver. En metod, kallad DNS.reverseDns() , kräver adressen för HBase-namnservern. Detta attribut lagras i konfigurationen "hbase.nameserver.address ".

this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);...public String getTableRegionLocation(HTable-tabell, byte[] rowKey) kastar IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;försök {regionLocation =omvändDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostname();}retur regionLocation; }skyddad String reverseDNS(InetAddress ipAddress) kastar NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); .reverseDNSCacheMap.put(ipAddress, hostName);}retur hostName;}

En komplett kod för getSplits kommer att se ut så här:

@Override public List getSplits(JobContext context) kastar IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Ingen tabell tillhandahölls.");}// Hämta namnserveradressen och standardvärdet är null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Pair nycklar =table.getStartEndKeys();if (nycklar ==null || keys.getFirst() ==null || keys.getFirst(). length ==0) {throw new RuntimeException("Minst en region förväntas");}List splits =new ArrayList(keys.getFirst().length);for (int i =0; i  

Använd Custom TableInoutFormat i MapReduce-drivrutinen

Nu måste vi ersätta TableInputFormat klass med den anpassade build vi använde för tabell MapReduce-jobbinställning.

Configuration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Ställ in tabellmappningsjobbet */TableMapReduceUtil.initTableMapperJob(tabellnamn,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, true, MultiRangeTable); 

Tillvägagångssättet för anpassat TableInputFormat ger en effektiv och skalbar skanningskapacitet för HBase-tabeller som är designade för att använda salt för en balanserad databelastning. Eftersom skanningen kan kringgå alla orelaterade radnycklar, oavsett hur stor tabellen är, begränsas skanningens komplexitet endast till storleken på måldata. I de flesta användningsfall kan detta garantera relativt konsekvent handläggningstid när tabellen växer.


  1. Google Cloud Platform – Kan inte ansluta till mongodb

  2. Yeoman, Mongoose och MongoDB Ställningar

  3. Go JSON-avkodning är mycket långsam. Vad skulle vara ett bättre sätt att göra det?

  4. MongooseError [MongooseServerSelectionError]:anslutning <monitor> till 52.6.250.237:27017 stängd