sql >> Databasteknik >  >> RDS >> Sqlserver

Implementering av inkrementell belastning med Change Data Capture i SQL Server

Den här artikeln kommer att vara intressant för dem som ofta har att göra med dataintegration.

Introduktion

Antag att det finns en databas där användare alltid ändrar data (uppdatera eller ta bort). Kanske används denna databas av ett stort program som inte tillåter modifiering av tabellstrukturen. Uppgiften är att då och då ladda data från denna databas till en annan databas på en annan server. Det enklaste sättet att lösa problemet är att ladda ny data från en källdatabas till en måldatabas med en preliminär rensning av måldatabasen. Du kan använda den här metoden så länge som tiden för att ladda data är acceptabel och inte överskrider förinställda deadlines. Vad händer om det tar flera dagar att ladda data? Dessutom leder instabila kommunikationskanaler till situationen när dataladdningen stoppar och startar om. Om du möter dessa hinder föreslår jag att du överväger en av algoritmerna för att ladda om data. Det betyder att endast dataändringar har skett sedan den senaste laddningen laddas.

CDC

I SQL Server 2008 introducerade Microsoft en dataspårningsmekanism som heter Change Data Capture (CDC). I stort sett är syftet med denna mekanism att aktivering av CDC för vilken databastabell som helst kommer att skapa en systemtabell i samma databas med ett liknande namn som den ursprungliga tabellen har (schemat kommer att vara som följer:'cdc' som ett prefix plus gammalt schemanamn plus "_" och slutet "_CT". Till exempel är den ursprungliga tabellen dbo.Exempel, då kommer systemtabellen att kallas cdc.dbo_Example_CT). Den kommer att lagra all data som har ändrats.

För att gräva djupare i CDC, överväg faktiskt exemplet. Men först, se till att SQL Agent som använder CDC fungerar på SQL Server-testinstansen.

Dessutom kommer vi att överväga ett skript som skapar en databas och testtabell, fyller denna tabell med data och aktiverar CDC för denna tabell.

För att förstå och förenkla uppgiften kommer vi att använda en SQL Server-instans utan att distribuera käll- och måldatabaserna till olika servrar.

använd mastergo-- skapa en källdatabas om den inte finns (välj * från sys.databases där namn ='db_src_cdc') skapa databasen db_src_cdcgouse db_src_cdcgo-- aktivera CDC om den är inaktiverad om den inte finns (välj * från sys.databases där namn =db_name() och is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- skapa en roll för tabeller med CDCif inte existerar(välj * från sys.sysusers där namn ='CDC_Reader' och issqlrole=1) skapa roll CDC_Readergo-- skapa en tableifergo-- skapa en tabell object_id('dbo.Example','U') är null skapa tabell dbo.Example ( ID int identitetsbegränsning PK_Example primärnyckel, Titel varchar(200) inte null )go-- fyll i tableinsert dbo.Example (Title) värden( 'One'),('Två'),('Three'),('Four'),('Five');go-- aktivera CDC för tabellen om den inte finns (välj * från sys.tables där is_tracked_by_cdc =1 och namn ='Exempel') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Exempel', @role_name ='CDC_Reader'go-- fyll i tabellen med lite data. Vi kommer att ändra eller ta bort något uppdatera dbo.Exampleset Title =reverse(Title)where ID in (2,3,4);delete from dbo.Examples where ID in (1,2);set identity_insert dbo.Example on;sert dbo. Exempel (ID, Titel) values(1,'One'),(6,'Six');set identity_insert dbo.Example off;go

Låt oss nu titta på vad vi har efter att ha kört det här skriptet i tabellerna dbo.Example och cdc.dbo_Example_CT (det bör noteras att CDC är asynkront. Data fylls i tabellerna där ändringsspårningen lagras efter en viss tidsperiod ).

välj * från dbo.Exempel;
ID Titel---- ---------------------------- 1 En 3 eerhT 4 ruoF 5 Fem 6 Sex
välj row_number() över (partition efter ID-ordning efter __$start_lsn desc, __$seqval desc ) som __$rn, *från cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Titel------ --------------------- ----------- ---------------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Fyra 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A00000580003A00000580003A00000580003A00000580003A0000058005 

Betrakta i detalj tabellstrukturen i vilken ändringsspårning lagras. Fälten __ $start_lsn och __ $seqval är LSN (loggsekvensnummer i databasen) respektive transaktionsnumret inom transaktionen. Det finns en viktig egenskap i dessa fält, nämligen att vi kan vara säkra på att posten med ett högre LSN kommer att utföras senare. På grund av den här egenskapen kan vi enkelt få det senaste tillståndet för varje post i frågan, genom att filtrera vårt urval efter villkoret – där __ $ rn =1.

Fältet __$operation innehåller transaktionskoden:

  • 1 – posten raderas
  • 2 – posten infogas
  • 3, 4 – posten uppdateras. Gamla data före uppdatering är 3, nya data är 4.

Förutom tjänstefält med prefixet «__$», är fälten i den ursprungliga tabellen helt duplicerade. Denna information räcker för att vi ska gå vidare till den inkrementella belastningen.

Ställa in en databas för att ladda data

Skapa en tabell i vår testmåldatabas, till vilken data ska laddas, samt en extra tabell för att lagra data om laddningsloggen.

använd mastergo-- skapa en måldatabas om den inte finns (välj * från sys.databases där namn ='db_dst_cdc') skapa databasen db_dst_cdcgouse db_dst_cdcgo-- skapa en tableif object_id('dbo.Example','U') är null skapa tabell dbo.Example ( ID int begränsning PK_Example primärnyckel, Titel varchar(200) inte null )go-- skapa en tabell för att lagra load logif object_id('dbo.log_cdc','U') är null skapa tabell dbo .log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), constraint pk_log_cdc primärnyckel (tabellnamn,dt desc) )go

Jag skulle vilja uppmärksamma er på fälten i tabellen LOG_CDC:

  • TABLE_NAME lagrar information om vilken tabell som laddades (det är möjligt att ladda flera tabeller i framtiden, från olika databaser eller till och med från olika servrar; tabellformatet är 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
  • DT är ett fält för laddningsdatum och tid, vilket är valfritt för den inkrementella belastningen. Det kommer dock att vara användbart för granskning av laddning.
  • LSN – efter att en tabell har laddats måste vi lagra information om var vi ska starta nästa laddning, om det behövs. Följaktligen, efter varje laddning, lägger vi till den senaste (maximalt) __ $ start_lsn i den här kolumnen.

Algorithm för dataladdning

Som beskrivits ovan, med hjälp av frågan, kan vi få det senaste tillståndet för tabellen med hjälp av fönsterfunktioner. Om vi ​​känner till LSN för den senaste laddningen, nästa gång vi laddar kan vi filtrera från källan all data, vars ändringar är högre än det lagrade LSN, om det fanns minst en fullständig tidigare laddning:

med incr_Example as( välj row_number() över (partition efter ID-ordning efter __$start_lsn desc, __$seqval desc ) som __$rn, * från db_src_cdc.cdc.dbo_Example_CT där __$operation <> 3 och __$ start_lsn> @lsn) välj * från incr_Example

Sedan kan vi få alla poster för den fullständiga lasten, om lasten LSN inte lagras:

med incr_Example as( välj row_number() över (partition efter ID-ordning efter __$start_lsn desc, __$seqval desc ) som __$rn, * från db_src_cdc.cdc.dbo_Example_CT där __$operation <> 3 och __$ start_lsn> @lsn), full_Example as( välj * från db_src_cdc.dbo.Exempel där @lsn är null) välj ID, Titel, __$operationfrom incr_Examplewhere __$rn =1union välj ID, Titel, 2 som __$operationfrom full_Example före> 

Alltså, beroende på @LSN-värdet, kommer denna fråga att visa antingen alla de senaste ändringarna (som går förbi de interimistiska) med statusen Borttagen eller inte, eller all data från den ursprungliga tabellen, lägga till status 2 (ny post) – detta fält används endast för att förena två val. Med den här frågan kan vi enkelt implementera antingen full belastning eller ladda om med kommandot MERGE (som börjar med SQL 2008-versionen).

För att undvika flaskhalsar som kan skapa alternativa processer och för att ladda matchade data från olika tabeller (i framtiden kommer vi att ladda flera tabeller och eventuellt kan det finnas relationsrelationer mellan dem), föreslår jag att du använder en DB-ögonblicksbild på källdatabasen ( en annan SQL 2008-funktion).

Den fullständiga texten för inläsningen är som följer:

[expand title="Kod"]

/* Algoritm för dataladdning*/-- skapa en ögonblicksbild av databasen om det finns (välj * från sys.databases där namn ='db_src_cdc_ss' ) släpp databasen db_src_cdc_ss;deklarera @query nvarchar(max); välj @query =N' skapa databasen db_src_cdc_ss på ( namn =N'''+namn+ ''', filnamn =N'''+[filnamn]+'.ss'' ) som ögonblicksbild av db_src_cdc'från db_src_cdc.sys.sysfiles där groupid =1; exec ( @query );-- läs LSN från föregående loaddeclare @lsn binary(10) =(välj max(lsn) från db_dst_cdc.dbo.log_cdc där tabellnamn ='localhost.db_src_cdc.dbo.Example');-- rensa en tabell före den fullständiga loadif @lsn är null trunkera tabellen db_dst_cdc.dbo.Example;-- ladda processwith incr_Example as( select row_number() over (partition efter ID-ordning efter __$start_lsn desc, __$seqval desc ) som __$rn , * från db_src_cdc_ss.cdc.dbo_Example_CT där __$operation <> 3 och __$start_lsn> @lsn), full_Example as( välj * från db_src_cdc_ss.dbo.Exempel där @lsn är null), cte_Examp Titel, __$operation från incr_Example där __$rn =1 union välj alla ID, Titel, 2 som __$operation från full_Example)merge db_dst_cdc.dbo.Example as trg med cte_Example som src på trg.ID=src.IDnär matchad och __$operation =1 radera sedan när matchad och __$operation <> 1 uppdatera sedan set trg.Title =src.Title när den inte matchas av mål och __$operation <> 1 infoga sedan (ID, Titel) värden (src.ID, src .Title);-- markera slutet på laddningsprocessen och de senaste LSNinsert db_dst_cdc.dbo.log_cdc (tabellnamn, lsn)värden ('localhost.db_src_cdc.dbo.Example', isnull((välj max(__$start_lsn) från db_src_cdc_ss.cdc.dbo_Example_CT),0))-- ta bort databasens ögonblicksbild om det finns (välj * från sys.databases där namn ='db_src_cdc_ss' ) släpp databasen db_src_cdc_ss

[/expand]


  1. Oracle RAC och sekvenser

  2. Steg för steg uppgraderingsprocess till R12.2 Uppgraderingsdel -2 (Huvuduppgraderingsdrivrutin för R12.2.0)

  3. Hur listar du primärnyckeln för en SQL Server-tabell?

  4. Fix Msg 241 "Konvertering misslyckades vid konvertering av datum och/eller tid från teckensträng" i SQL Server