I de två föregående delarna har vi presenterat livedatabasmodellen för en prenumerationsbaserad verksamhet och ett datalager (DWH) som vi kan använda för rapportering. Även om det är uppenbart att de borde fungera tillsammans, fanns det ingen koppling mellan dessa två modeller. Idag tar vi nästa steg och skriver koden för att överföra data från livedatabasen till vår DWH.
Datamodellerna
Innan vi dyker in i koden, låt oss påminna oss själva om de två modellerna vi kommer att arbeta med. Först är transaktionsdatamodellen vi kommer att använda för att lagra vår realtidsdata. Med tanke på att vi driver en prenumerationsbaserad verksamhet måste vi lagra kund- och prenumerationsinformation, kunders beställningar och orderstatus.
Det finns verkligen mycket vi skulle kunna lägga till i den här modellen, som att spåra betalningar och lagra historisk data (särskilt förändringar i kund- och prenumerationsdata). För att betona ETL-processen (extrahera, transformera och ladda) vill jag dock hålla den här modellen så enkel som möjligt.
Att använda en transaktionsdatamodell som en rapporteringsdatabas kan fungera i vissa fall, men det fungerar inte i alla fall. Vi har redan nämnt det, men det är värt att upprepa det. Om vi vill separera våra rapporteringsuppgifter från våra realtidsprocesser bör vi skapa någon form av rapporteringsdatabas. Ett datalager är en lösning.
Vår DWH är centrerad kring fyra faktatabeller. De två första spårar antalet kunder och prenumerationer på daglig nivå. De återstående två spårar antalet leveranser och de produkter som ingår i dessa leveranser.
Mitt antagande är att vi kommer att köra vår ETL-process en gång om dagen. Först kommer vi att fylla i dimensionstabeller med nya värden (där det behövs). Efter det kommer vi att fylla i faktatabeller.
För att undvika onödiga upprepningar kommer jag bara att visa koden som kommer att fylla de två första dimensionstabellerna och de två första faktatabellerna. De återstående tabellerna kan fyllas i med mycket liknande kod. Jag uppmuntrar dig att skriva ner koden själv. Det finns inget bättre sätt att lära sig något nytt än att prova det.
Idén:Måtttabeller
Den allmänna idén är att skapa lagrade procedurer som vi regelbundet skulle kunna använda för att fylla i DWH -- dimensionstabeller såväl som faktatabeller. Dessa procedurer kommer att överföra data mellan två databaser på samma server. Detta innebär att vissa frågor i dessa procedurer kommer att använda tabeller från båda databaserna. Detta förväntas; vi måste jämföra tillståndet för DWH med live-DB och göra ändringar i DWH enligt vad som händer i live-DB.
Vi har fyra dimensionstabeller i vår DWH:dim_time
, dim_city
, dim_product
och dim_delivery_status
.
Tidsdimensionen fylls i genom att föregående datum läggs till. Huvudantagandet är att vi kommer att köra denna procedur dagligen, efter avslutad verksamhet.
Stads- och produktdimensionerna kommer att bero på de aktuella värdena som är lagrade i city
och product
ordböcker i livedatabasen. Om vi lägger till något i dessa ordböcker kommer nya värden att läggas till i dimensionstabellerna vid nästa DWH-uppdatering.
Den sista dimensionstabellen är dim_delivery_status
tabell. Det kommer inte att uppdateras eftersom det bara innehåller tre standardvärden. En leverans är antingen på väg, avbruten eller levererad.
Idén:faktatabeller
Att fylla i faktatabeller är faktiskt det riktiga jobbet. Även om ordböckerna i livedatabasen inte innehåller ett tidsstämpelattribut, gör tabeller med data infogade som ett resultat av våra operationer det. Du kommer att märka två tidsstämpelattribut, time_inserted
och time_updated
, i datamodellen.
Återigen, jag antar att vi framgångsrikt kommer att köra DWH-importen en gång om dagen. Detta gör det möjligt för oss att aggregera data på en daglig nivå. Vi räknar antalet aktiva och avbrutna kunder och prenumerationer, såväl som leveranser och levererade produkter för det datumet.
Vår livemodell fungerar bra om vi kör en insättningsprocedur efter COB (stängning). Ändå, om vi vill ha mer flexibilitet bör vi göra några ändringar i modellen. En sådan förändring kan vara att ha en separat historiktabell för att spåra det exakta ögonblicket när data relaterade till kunder eller prenumerationer ändrades. Med vår nuvarande organisation vet vi att ändringen har skett, men vi vet inte om det har skett några ändringar före denna (t.ex. en kund som avbröt igår, återaktiverade sitt konto efter midnatt och avslutade sedan igen idag) .
Följa dimensionstabeller
Som nämnts tidigare, kommer jag att gå med antagandet att vi kommer att köra DWH-importen exakt en gång om dagen. Om så inte är fallet skulle vi behöva ytterligare kod för att radera nyinlagda data från dimensions- och faktatabellerna. För dimensionstabellerna skulle detta vara begränsat till att ta bort det givna datumet.
Först kontrollerar vi om det givna datumet finns i dim_time
tabell. Om inte, lägger vi till en ny rad i tabellen; om det gör det behöver vi inte göra någonting. I de flesta fall infogas alla datum under den första produktionsinstallationen. Men jag ska gå med det här exemplet i utbildningssyfte.
För dim_city
och dim_product
dimensioner, lägger jag bara till alla nya värden jag upptäcker i city
och product
tabeller. Jag kommer inte att göra några raderingar eftersom alla tidigare infogade värden kan refereras till i någon faktatabell. Vi skulle kunna gå med en mjuk radering, t.ex. ha en "aktiv" flagga som vi kan slå på och av.
För den sista tabellen, dim_delivery_status
, jag kommer inte att göra någonting eftersom det alltid kommer att innehålla samma tre värden.
Koden nedan skapar en procedur som fyller i dimensionstabellerna dim_time
och dim_city
.
För tidsdimensionen lägger jag till gårdagens datum. Jag utgår från antagandet att ETL-processen startar strax efter midnatt. Jag ska kontrollera om den dimensionen redan finns och om inte, lägger jag till det nya datumet i tabellen.
För stadsdimensionen använder jag en LEFT JOIN för att sammanfoga data från livedatabasen och DWH-databasen för att avgöra vilka rader som saknas. Sedan lägger jag bara till saknad data i dimensionstabellen. Det är värt att nämna att det finns några sätt att kontrollera om data har ändrats. Denna process kallas förändringsdatainsamling, eller CDC. En vanlig metod är att söka efter uppdaterade tidsstämplar eller versioner. Det finns några ytterligare sätt, men de ligger utanför den här artikeln.
Låt oss ta en titt på koden nu, som är skriven med MySQL-syntax .
DROP PROCEDURE IF EXISTS p_update_dimensions// CREATE PROCEDURE p_update_dimensions () BEGIN SET @time_exists = 0; SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates dimension tables with new values -- dim_time SET @time_exists = (SELECT COUNT(*) FROM subscription_dwh.dim_time dim_time WHERE dim_time.time_date = @time_date); IF (@time_exists = 0) THEN INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) SELECT @time_date AS time_date, YEAR(@time_date) AS time_year, MONTH(@time_date) AS time_month, WEEK(@time_date) AS time_week, WEEKDAY(@time_date) AS time_weekday, NOW() AS ts; END IF; -- dim_city INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now() FROM subscription_live.city city_live INNER JOIN subscription_live.country country_live ON city_live.country_id = country_live.id LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name WHERE city_dwh.id IS NULL; END// -- CALL p_update_dimensions ()
Kör den här proceduren -- vilket vi gör med hjälp av den kommenterade proceduren CALL -- infogar ett nytt datum och alla saknade städer i dimensionstabellerna. Försök att lägga till din egen kod för att fylla i de återstående två dimensionstabellerna med nya värden.
ETL-processen i ett datalager
Huvudtanken bakom data warehousing är att innehålla aggregerad data i önskat format. Självklart bör vi känna till det formatet innan vi ens börjar bygga lagret. Om vi har gjort allt som planerat kan vi få alla fördelar som en DWH erbjuder oss. Den största fördelen är förbättrad prestanda när du kör frågor. Våra frågor fungerar med färre poster (eftersom de är aggregerade) och körs på rapportdatabasen (istället för den levande).
Men innan vi kan fråga måste vi lagra fakta i vår databas. Hur vi gör det beror på vad vi behöver göra med vår data senare. Om vi inte har en bra helhetsbild innan vi börjar bygga vår DWH kan vi snart hamna i problem! snart.
Namnet på denna process är ETL:E =Extrahera, T =Transform, L =Last. Den tar tag i data, omvandlar den för att passa DWH-strukturen och laddar den i DWH. För att vara exakt, den faktiska processen vi kommer att använda är ELT:Extrahera, ladda, transformera. Eftersom vi använder lagrade procedurer kommer vi att extrahera data, ladda den och sedan omvandla den för att möta våra behov. Det är bra att veta att även om ETL och ELT är lite olika, används termerna ibland omväxlande.
Fylla faktatabellerna
Att fylla faktatabeller är därför vi verkligen är här. Idag kommer jag att fylla i två faktatabeller, fact_customer_subscribed
tabellen och fact_subscription_status
tabell. De återstående två faktatabellerna är dina att prova som läxa.
Innan vi går vidare till att fylla i faktatabell måste vi anta att dimensionstabellerna är fyllda med nya värden. Att fylla faktatabellerna följer samma mönster. Eftersom de har samma struktur kommer jag att förklara dem båda tillsammans.
Vi grupperar data efter två dimensioner:tid och stad. Tidsdimensionen ställs in på igår, och vi hittar ID:t för den relaterade posten i dim_time
tabell genom att jämföra datum (den sista INNER JOIN i båda frågorna).
ID:t för dim_city
extraheras genom att sammanfoga alla attribut som bildar en UNIK kombination i dimensionstabellen (stadsnamn, postnummer och landsnamn).
I den här frågan testar vi värden med CASE och summerar dem sedan. För aktiva och inaktiva kunder har jag inte testat datumet. Jag har dock valt befintliga värden för dessa fält. För nya och avslutade konton har jag testat den uppdaterade tiden.
DROP PROCEDURE IF EXISTS p_update_facts// CREATE PROCEDURE p_update_facts () BEGIN SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates fact tables with new values -- fact_customer_subscribed INSERT INTO `fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN customer_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN customer_live.active = 1 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN customer_live.active = 0 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; -- fact_subscription_status INSERT INTO `fact_subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN subscription_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN subscription_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN subscription_live.active = 1 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN subscription_live.active = 0 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id = customer_live.id INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; END// -- CALL p_update_facts ()
Än en gång har jag kommenterat den sista raden. Ta bort kommentaren och du kan använda den här raden för att anropa proceduren och infoga nya värden. Observera att jag inte har tagit bort några befintliga gamla värden, så den här proceduren fungerar inte om vi redan har värden för det datumet och orten. Detta kan lösas genom att utföra raderingar före infogning.
Kom ihåg att vi måste fylla i de återstående faktatabellerna i vår DWH. Jag uppmuntrar dig att prova det själv!
En annan sak jag definitivt skulle rekommendera är att placera hela processen i en transaktion. Det skulle säkerställa att antingen alla insättningar lyckas eller så görs inga. Detta är mycket viktigt när vi vill undvika att ha data delvis införda, t.ex. om vi har flera procedurer för att infoga dimensioner och fakta och några av dem gör sitt jobb medan andra misslyckas.
Vad tycker du?
Idag har vi sett hur vi kunde utföra ELT/ETL-processen och ladda data från en livedatabas till ett datalager. Även om processen vi visade är ganska förenklad, innehåller den alla element som behövs för att extrahera data, T(omforma) den till ett lämpligt format och slutligen L(oad) den till DWH. Vad tror du? Berätta för oss om dina erfarenheter i kommentarerna nedan.