I de två föregående artiklarna i den här serien diskuterade vi hur man använder Python och SQLAlchemy för att utföra ETL-processen. Idag kommer vi att göra detsamma, men den här gången använder vi Python och SQL Alchemy utan SQL-kommandon i textformat. Detta gör det möjligt för oss att använda SQLAlchemy oavsett vilken databasmotor vi är anslutna till. Så låt oss börja.
Idag kommer vi att diskutera hur man utför ETL-processen med Python och SQLAlchemy. Vi skapar ett skript för att extrahera daglig data från vår operativa databas, omvandla den och sedan ladda in i vårt datalager.
Detta är den tredje artikeln i serien. Om du inte har läst de två första artiklarna (Använder Python och MySQL i ETL-processen och SQLAlchemy), rekommenderar jag starkt att du gör det innan du fortsätter.
Hela denna serie är en fortsättning på vår datalagerserie:
- Skapa en DWH, del ett:en affärsdatamodell för prenumeration
- Skapa en DWH, del två:En affärsdatamodell för prenumeration
- Skapa ett datalager, del 3:en affärsdatamodell för prenumeration
Okej, nu börjar vi med dagens ämne. Låt oss först titta på datamodellerna.
Datamodellerna
Operativ (live) databasdatamodell
DWH-datamodell
Det här är de två datamodellerna vi kommer att använda. För mer information om datalager (DWH), kolla in dessa artiklar:
- Stjärnschemat
- Snöflingaschemat
- Stjärnschema vs. Snowflake Schema
Varför SQLAlchemy?
Hela idén bakom SQLAlchemy är att efter att vi har importerat databaser behöver vi inte SQL-kod som är specifik för den relaterade databasmotorn. Istället kan vi importera objekt till SQLAlchemy och använda SQLAlchemy-syntaxen för satser. Det gör att vi kan använda samma språk oavsett vilken databasmotor vi är anslutna till. Den största fördelen här är att en utvecklare inte behöver ta hand om skillnaderna mellan olika databasmotorer. Ditt SQLAlchemy-program kommer att fungera exakt likadant (med mindre ändringar) om du migrerar till en annan databasmotor.
Jag har bestämt mig för att bara använda SQLAlchemy-kommandon och Python-listor för att kommunicera till temporär lagring och mellan olika databaser. De viktigaste skälen bakom detta beslut är att 1) Python-listor är välkända och 2) koden skulle vara läsbar för dem utan Python-kunskaper.
Detta är inte att säga att SQLAlchemy är perfekt. Det har vissa begränsningar, som vi kommer att diskutera senare. För nu, låt oss bara ta en titt på koden nedan:
Köra skriptet och resultatet
Detta är kommandot Python som används för att anropa vårt skript. Skriptet kontrollerar data i operationsdatabasen, jämför värdena med DWH och importerar de nya värdena. I det här exemplet uppdaterar vi värden i två dimensionstabeller och en faktatabell; skriptet returnerar lämplig utdata. Hela skriptet är skrivet så att du kan köra det flera gånger om dagen. Den kommer att radera "gammal" data för den dagen och ersätta den med ny.
Låt oss analysera hela manuset, med början från toppen.
Importerar SQLAlchemy
Det första vi behöver göra är att importera modulerna vi kommer att använda i skriptet. Vanligtvis importerar du dina moduler när du skriver skriptet. I de flesta fall vet du inte exakt vilka moduler du behöver från början.
from datetime import date # import SQLAlchemy from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case
Vi har importerat Pythons datetime
modul, som förser oss med klasser som arbetar med datum.
Därefter har vi sqlalchemy
modul. Vi kommer inte att importera hela modulen, bara de saker vi behöver – några specifika för SQLAlchemy (create_engine
, MetaData
, Table
), vissa delar av SQL-satsen (select
, and_
, case
), och func
, vilket gör att vi kan använda funktioner som count() och sum() .
Ansluter till databaserna
Vi måste ansluta till två databaser på vår server. Vi kan ansluta till fler databaser (MySQL, SQL Server eller någon annan) från olika servrar om det behövs. I det här fallet är båda databaserna MySQL-databaser och lagras på min lokala dator.
# connect to databases engine_live = sqlalchemy.create_engine('mysql+pymysql://: @localhost:3306/subscription_live') connection_live = engine_live.connect() engine_dwh = sqlalchemy.create_engine('mysql+pymysql:// : @localhost:3306/subscription_dwh') connection_dwh = engine_dwh.connect() metadata = MetaData(bind=None)
Vi har skapat två motorer och två anslutningar. Jag kommer inte att gå in på detaljer här eftersom vi redan har förklarat det här i föregående artikel.
Uppdaterar dim_time Dimension
Mål:Infoga gårdagens datum om det inte redan är infogat i tabellen.
I vårt skript kommer vi att uppdatera tvådimensionella tabeller med nya värden. Resten av dem följer samma mönster, så vi går bara över detta en gång; vi behöver inte skriva ner nästan identisk kod några gånger till.
Tanken är väldigt enkel. Vi kör alltid skriptet för att infoga ny data för gårdagen. Därför måste vi kontrollera om det datumet har infogats i dimensionstabellen. Om den redan finns där kommer vi inte att göra någonting; om det inte är det lägger vi till det. Låt oss ta en titt på koden för att uppdatera dim_time
bord.
Först kontrollerar vi om datumet finns. Om det inte finns lägger vi till det. Vi börjar med att lagra gårdagens datum i en variabel. I Python gör du så här:
yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = str(yesterday)
Den första raden tar ett aktuellt datum, konverterar det till ett numeriskt värde, subtraherar 1 från det värdet och konverterar det numeriska värdet tillbaka till ett datum (igår =idag – 1 ). Den andra raden lagrar datumet i ett textformat.
Därefter testar vi om datumet redan finns i databasen:
table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh) stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str) result = connection_dwh.execute(stmt).fetchall() date_exists = len(result)
När tabellen har lästs in kör vi en fråga som ska returnera alla rader från dimensionstabellen där värdet för tid/datum är lika med gårdagen. Resultatet kan ha 0 (inget sådant datum i tabellen) eller 1 rad (datumet finns redan i tabellen).
Om datumet inte redan finns i tabellen använder vi kommandot insert() för att lägga till det:
if date_exists == 0: print("New value added.") stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday()) connection_dwh.execute(stmt) else: print("No new values.")
En ny sak här som jag skulle vilja peka på är användningen av. .year
, .month
, .isocalendar()[1]
och .weekday
för att få dateparts.
Uppdaterar dim_city Dimension
Mål:Infoga nya städer om det finns några (dvs. jämför listan över städer i livedatabasen med listan över städer i DWH och lägg till saknade).
Uppdaterar dim_time
dimensionen var ganska enkel. Vi testade helt enkelt om ett datum fanns i tabellen och infogade det om det inte redan fanns där. För att testa ett värde i DWH-databasen använde vi en Python-variabel (igår ). Vi kommer att använda den processen igen, men den här gången med listor.
Eftersom det inte finns ett enkelt sätt att kombinera tabeller från olika databaser i en enda SQLAlchemy-fråga, kan vi inte använda metoden som beskrivs i del 1 av denna serie. Därför behöver vi ett objekt för att lagra de värden som behövs för att kommunicera mellan dessa två databaser. Jag har bestämt mig för att använda listor, eftersom de är vanliga och de gör jobbet.
Först läser vi in country
och city
tabeller från en livedatabas till de relevanta objekten.
# dim_city print("\nUpdating... dim_city") table_city = Table('city', metadata, autoload = True, autoload_with = engine_live) table_country = Table('country', metadata, autoload = True, autoload_with = engine_live) table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)
Närnäst laddar vi dim_city
tabell från DWH till en lista:
# load whole dwh table in the list stmt = select([table_dim_city]); table_dim_city_list = connection_dwh.execute(stmt).fetchall()
Då gör vi samma sak för värdena från livedatabasen. Vi går med i tabellerna country
och city
så vi har all information som behövs i den här listan:
# load all live values in the list stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\ .select_from(table_city\ .join(table_country)) table_city_list = connection_live.execute(stmt).fetchall()
Nu går vi igenom listan som innehåller data från livedatabasen. För varje post jämför vi värden (city_name
, postal_code
och country_name
). Om vi inte hittar sådana värden lägger vi till en ny post i dim_city
bord.
# loop through live_db table # for each record test if it is missing in the dwh table new_values_added = 0 for city in table_city_list: id = -1; for dim_city in table_dim_city_list: if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]: id = dim_city[0] if id == -1: stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2]) connection_dwh.execute(stmt) new_values_added = 1 if new_values_added == 0: print("No new values.") else: print("New value(s) added.")
För att avgöra om värdet redan finns i DWH testade vi en kombination av attribut som borde vara unika. (Den primära nyckeln från livedatabasen hjälper oss inte mycket här.) Vi kan använda liknande kod för att uppdatera andra ordböcker. Det är inte den snyggaste lösningen, men det är ändå ganska elegant. Och det kommer att göra precis vad vi behöver.
Uppdatering av fact_customer_subscribed Tabell
Mål:Om vi har gamla data för gårdagens datum, radera dem först. Lägg till gårdagens data i DWH – oavsett om vi har tagit bort något i föregående steg eller inte.
Efter att ha uppdaterat alla dimensionstabeller bör vi uppdatera faktatabellerna. I vårt manus uppdaterar vi endast en faktatabell. Resonemanget är detsamma som i föregående avsnitt:att uppdatera andra tabeller skulle följa samma mönster, så vi skulle oftast upprepa koden.
Innan vi infogar värden i faktatabellen måste vi känna till värdena för de relaterade nycklarna från dimensionstabellerna. För att göra det läser vi in dimensioner i listor igen och jämför dem med värden från livedatabasen.
Det första vi ska göra är att ladda kunden och fact_customer_subscribed
tabeller till objekt:
# fact_customer_subscribed print("\nUpdating... fact_customer_subscribed") table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live) table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)
Nu måste vi hitta nycklar för den relaterade tidsdimensionen. Eftersom vi alltid infogar data för gårdagen söker vi efter det datumet i dim_time
tabell och använd dess ID. Frågan returnerar 1 rad och ID är i första positionen (indexet börjar från 0, så det är result[0][0]
):
# find key for the dim_time dimension stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday) result = connection_dwh.execute(stmt).fetchall() dim_time_id = result[0][0]
För den tiden tar vi bort alla associerade poster från faktatabellen:
# delete any existing data in the fact table for that time dimension value stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id) connection_dwh.execute(stmt)
Okej, nu har vi ID för tidsdimensionen lagrat i dim_time_id
variabel. Detta var lätt eftersom vi bara kan ha ett tidsdimensionsvärde. Historien kommer att vara annorlunda för stadsdimensionen. Först laddar vi alla de värden vi behöver – värden som unikt beskriver staden (inte ID) och aggregerade värden:
# prepare data for insert stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\ .select_from(table_customer\ .join(table_city)\ .join(table_country))\ .group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)
Det finns några saker jag skulle vilja betona om frågan ovan:
func.sum(...)
är SUM(...) från "standard SQL".- Fallet
case(...)
syntax använderand_
före villkor, inte mellan dem. .label(...)
fungerar som ett SQL AS-alias.- Vi använder
\
för att gå till nästa rad och öka läsbarheten för frågan. (Tro mig, det är ganska mycket oläsligt utan snedstreck – jag har provat det :) ) .group_by(...)
spelar rollen som SQL:s GROUP BY.
Därefter går vi igenom varje post som returneras med den föregående frågan. För varje post jämför vi värden som unikt definierar en stad (city_name
, postal_code
, country_name
) med värdena lagrade i listan skapad av DWH dim_city
tabell. Om alla tre värdena matchar, lagrar vi ID:t från listan och använder det när vi infogar ny data. På så sätt kommer vi att ha ID:n för båda dimensionerna för varje post:
# loop through all new records # use time dimension # for each record find key for city dimension # insert row new_values = connection_live.execute(stmt).fetchall() for new_value in new_values: dim_city_id = -1; for dim_city in table_dim_city_list: if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]: dim_city_id = dim_city[0] if dim_city_id > 0: stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6]) connection_dwh.execute(stmt_insert) dim_city_id = -1 print("Completed.")
Och det är allt. Vi har uppdaterat vår DWH. Skriptet skulle bli mycket längre om vi uppdaterade alla dimensions- och faktatabeller. Komplexiteten skulle också bli större när en faktatabell är relaterad till fler dimensionstabeller. I så fall skulle vi behöva ett för slinga för varje dimensionstabell.
Det här fungerar inte!
Jag blev mycket besviken när jag skrev det här manuset och sedan fick reda på att något sådant här inte kommer att fungera:
stmt = select([table_city.columns.city_name])\ .select_from(table_city\ .outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\ .where(table_dim_city.columns.id.is_(None))
I det här exemplet försöker jag använda tabeller från två olika databaser. Om vi upprättar två separata anslutningar kommer den första anslutningen inte att "se" tabeller från en annan anslutning. Om vi ansluter direkt till servern och inte till en databas kommer vi inte att kunna ladda tabeller.
Tills detta ändras (förhoppningsvis snart), måste du använda någon form av struktur (t.ex. vad vi gjorde idag) för att kommunicera mellan de två databaserna. Detta komplicerar koden eftersom du måste ersätta en enda fråga med två listor och kapslade för slingor.
Dela dina tankar om SQLAlchemy och Python
Detta var den sista artikeln i den här serien. Men vem vet? Kanske kommer vi att prova ett annat tillvägagångssätt i kommande artiklar, så håll utkik. Under tiden, vänligen dela dina tankar om SQLAlchemy och Python i kombination med databaser. Vad tycker du att vi saknar i den här artikeln? Vad skulle du lägga till? Berätta för oss i kommentarerna nedan.
Du kan ladda ner hela skriptet som vi använde i den här artikeln här.
Och särskilt tack går till Dirk J Bosman (@dirkjobosman), som rekommenderade denna artikelserie.