Tack till Pengyu Wang, mjukvaruutvecklare på FINRA, för tillstånd att publicera det här inlägget igen.
Salted Apache HBase-tabeller med pre-split är en beprövad effektiv HBase-lösning för att ge enhetlig arbetsbelastningsfördelning över RegionServers och förhindra hot spots under bulkskrivningar. I denna design görs en radnyckel med en logisk nyckel plus salt i början. Ett sätt att generera salt är genom att beräkna n (antal regioner) modulo på hashkoden för den logiska radnyckeln (datum, etc).
Saltningsradsnycklar
Till exempel kan en tabell som accepterar dataladdning dagligen använda logiska radnycklar som börjar med ett datum, och vi vill dela upp tabellen i förväg i 1 000 regioner. I det här fallet räknar vi med att generera 1 000 olika salter. Saltet kan genereras till exempel som:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey =2015-04-26|abcrowKey =893|2015-04-26|abc
Utdata från hashCode()
med modulo ger slumpmässighet för saltvärde från "000" till "999". Med denna nyckeltransform är tabellen fördelad på saltgränserna när den skapas. Detta kommer att göra radvolymerna jämnt fördelade när hFiles laddas med MapReduce bulkload. Det garanterar att radnycklar med samma salt hamnar i samma region.
I många användningsfall, som dataarkivering, måste du skanna eller kopiera data över ett visst logiskt nyckelintervall (datumintervall) med hjälp av MapReduce-jobbet. Standardtabell MapReduce-jobb ställs in genom att tillhandahålla Scan
instans med nyckelintervallsattribut.
Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Ställ in tabellmappningsjobbet */TableMapReduceUtil.initTableMapperJob(tabellnamn,scan,DataScanMapper.class,ImmutableBytesWritable.class, KeyValue.class,job, true, TableInputFormat.class);...
Men installationen av ett sådant jobb blir utmanande för saltade fördelade bord. Start- och stoppradnycklar kommer att vara olika för varje region eftersom var och en har ett unikt salt. Och vi kan inte ange flera intervall för en Scan
instans.
För att lösa detta problem måste vi undersöka hur tabellen MapReduce fungerar. Generellt skapar MapReduce-ramverket en kartuppgift för att läsa och bearbeta varje indatadelning. Varje uppdelning genereras i InputFormat
klassbas, med metoden getSplits()
.
I HBase-tabellen MapReduce-jobb, TableInputFormat
används som InputFormat
. Inuti implementeringen, getSplits()
metoden åsidosätts för att hämta start- och stoppradnycklarna från Scan
exempel. Eftersom start- och stoppradsnycklarna sträcker sig över flera regioner delas intervallet med regiongränser och returnerar listan med TableSplit
objekt som täcker skanningsnyckelområdet. Istället för att vara baserad på HDFS-block, TableSplit
s är baserade på region. Genom att skriva över getSplits()
metoden kan vi styra TableSplit
.
Bygga anpassat TableInputFormat
För att ändra beteendet för getSplits()
metod, en anpassad klass som utökar TableInputFormat
krävs. Syftet med getSplits()
här är att täcka det logiska nyckelområdet i varje region, konstruera deras radnyckelintervall med deras unika salt. Klassen HTable tillhandahåller metoden getStartEndKeys()
som returnerar start- och slutradnycklar för varje region. Från varje startknapp, analysera motsvarande salt för regionen.
Par nycklar =table.getStartEndKeys();for (int i =0; iJobbkonfigurationen klarar logiskt nyckelområde
TableInputFormat
hämtar start- och stoppnyckeln frånScan
exempel. Eftersom vi inte kan användaScan
i vårt MapReduce-jobb skulle vi kunna användaConfiguration
istället för att skicka dessa två variabler och endast logisk start- och stoppnyckel är tillräckligt bra (en variabel kan vara ett datum eller annan affärsinformation).getSplits()
metoden harJobContext
argument, Konfigurationsinstansen kan läsas somcontext.getConfiguration()
.I MapReduce-drivrutinen:
Configuration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop ", "2015-04-27");I
Custom TableInputFormat
:@Override public List getSplits(JobContext context) kastar IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan .stop");...}Rekonstruera Salted Key Range efter region
Nu när vi har salt- och logisk start/stopp-nyckel för varje region kan vi bygga om det faktiska radnyckelintervallet.
byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);Skapa en tabelldelning för varje region
Med radnyckelintervall kan vi nu initiera
TableSplit
instans för regionen.List splits =new ArrayList(keys.getFirst().length);for (int i =0; iEn sak till att titta på är datalokalitet. Ramverket använder platsinformation i varje ingångsdelning för att tilldela en kartuppgift i sin lokala värd. För vårt
TableInputFormat
, använder vi metodengetTableRegionLocation()
för att hämta regionsplatsen som betjänar radnyckeln.Denna plats skickas sedan till
TableSplit
konstruktör. Detta säkerställer att mapparen som bearbetar tabelldelningen är på samma regionserver. En metod, kalladDNS.reverseDns()
, kräver adressen för HBase-namnservern. Detta attribut lagras i konfigurationen "hbase.nameserver.address
".this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);...public String getTableRegionLocation(HTable-tabell, byte[] rowKey) kastar IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;försök {regionLocation =omvändDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostname();}retur regionLocation; }skyddad String reverseDNS(InetAddress ipAddress) kastar NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); .reverseDNSCacheMap.put(ipAddress, hostName);}retur hostName;}En komplett kod för
getSplits
kommer att se ut så här:@Override public List getSplits(JobContext context) kastar IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Ingen tabell tillhandahölls.");}// Hämta namnserveradressen och standardvärdet är null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Pair nycklar =table.getStartEndKeys();if (nycklar ==null || keys.getFirst() ==null || keys.getFirst(). length ==0) {throw new RuntimeException("Minst en region förväntas");}List splits =new ArrayList(keys.getFirst().length);for (int i =0; iAnvänd Custom TableInoutFormat i MapReduce-drivrutinen
Nu måste vi ersätta
TableInputFormat
klass med den anpassade build vi använde för tabell MapReduce-jobbinställning.Configuration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Ställ in tabellmappningsjobbet */TableMapReduceUtil.initTableMapperJob(tabellnamn,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, true, MultiRangeTable);Tillvägagångssättet för anpassat
TableInputFormat
ger en effektiv och skalbar skanningskapacitet för HBase-tabeller som är designade för att använda salt för en balanserad databelastning. Eftersom skanningen kan kringgå alla orelaterade radnycklar, oavsett hur stor tabellen är, begränsas skanningens komplexitet endast till storleken på måldata. I de flesta användningsfall kan detta garantera relativt konsekvent handläggningstid när tabellen växer.