Nyligen släppte MongoDB en ny funktion från och med version 3.6, Change Streams. Detta ger dig omedelbar tillgång till dina data som hjälper dig att hålla dig uppdaterad med dina dataändringar. I dagens värld vill alla ha omedelbara aviseringar snarare än att få det efter några timmar eller minuter. För vissa applikationer är det viktigt att skicka realtidsmeddelanden till alla prenumererade användare för varje uppdatering. MongoDB gjorde denna process väldigt enkel genom att introducera den här funktionen. I den här artikeln kommer vi att lära oss om MongoDB change stream och dess applikationer med några exempel.
Definiera ändringsströmmar
Ändringsströmmar är inget annat än realtidsströmmen av alla ändringar som sker i databasen eller samlingen eller till och med i distributioner. Till exempel, när en uppdatering (Infoga, Uppdatera eller Ta bort) sker i en specifik samling, utlöser MongoDB en förändringshändelse med all data som har ändrats.
Du kan definiera ändringsströmmar på vilken samling som helst precis som alla andra normala aggregeringsoperatorer med $changeStream-operatorn och watch()-metoden. Du kan också definiera ändringsström med metoden MongoCollection.watch().
Exempel
db.myCollection.watch()
Ändra strömningsfunktioner
-
Filtrera ändringar
Du kan filtrera ändringarna för att få händelseaviseringar endast för vissa riktade data.
Exempel:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
Den här koden ser till att du bara får uppdateringar för poster som har ett namn som är lika med Bob. På så sätt kan du skriva vilka pipelines som helst för att filtrera förändringsströmmarna.
-
Återuppta ändringsströmmar
Denna funktion säkerställer att det inte sker någon dataförlust vid eventuella fel. Varje svar i strömmen innehåller återuppta token som kan användas för att starta om strömmen från en specifik punkt. För vissa frekventa nätverksfel kommer mongodb-drivrutinen att försöka återupprätta anslutningen med abonnenterna med den senaste återuppta-tokenen. Även om, i händelse av fullständigt programfel, bör återuppta token underhållas av klienterna för att återuppta strömmen.
-
Beställda ändringsströmmar
MongoDB använder en global logisk klocka för att ordna alla förändringsströmhändelser över alla repliker och skärvor av alla kluster, så mottagaren kommer alltid att ta emot aviseringarna i samma ordning som kommandona tillämpades på databasen.
-
Händelser med fullständiga dokument
MongoDB returnerar delen av matchande dokument som standard. Men du kan ändra strömkonfigurationen för att få ett fullständigt dokument. För att göra det skickar du { fullDocument:“updateLookup”} till övervakningsmetoden.
Exempel:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
Hållbarhet
Ändringsströmmar kommer endast att meddela för de data som är anslutna till majoriteten av replikerna. Detta kommer att se till att händelser genereras av majoritetsbeständighetsdata, vilket säkerställer meddelandets hållbarhet.
-
Säkerhet/åtkomstkontroll
Ändringsströmmar är mycket säkra. Användare kan skapa ändringsströmmar endast på de samlingar som de har läsbehörighet för. Du kan skapa ändringsströmmar baserat på användarroller.
Exempel på ändringsströmmar
I det här exemplet kommer vi att skapa förändringsströmmar i aktiesamlingen för att få ett meddelande när en aktiekurs överstiger en tröskel.
-
Konfigurera klustret
För att använda ändringsströmmar måste vi först skapa replikuppsättningar. Kör följande kommando för att skapa en replikuppsättning för en nod.
mongod --dbpath ./data --replSet “rs”
-
Infoga några poster i Aktiesamlingen
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
Ställ in nodmiljö och installationsberoenden
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
Prenumerera på ändringarna
Skapa en index.js-fil och lägg in följande kod i den.
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
Kör nu den här filen:
node index.js
-
Infoga en ny post i db för att få en uppdatering
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
Kontrollera nu din konsol, du kommer att få en uppdatering från MongoDB.
Exempelsvar:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
Här kan du ändra värdet på parameter operationType med följande operationer för att lyssna efter olika typer av ändringar i en samling:
- Infoga
- Ersätt (förutom unikt ID)
- Uppdatera
- Ta bort
- Ogiltigförklara (när Mongo returnerar ogiltig markör)
Andra lägen för ändringsströmmar
Du kan börja ändra strömmar mot en databas och driftsättning på samma sätt som mot insamling. Den här funktionen har släppts från MongoDB version 4.0. Här är kommandona för att öppna en ändringsström mot databas och distributioner.
Against DB: db.watch()
Against deployment: Mongo.watch()
Slutsats
MongoDB Change Streams förenklar integrationen mellan frontend och backend på ett sömlöst sätt i realtid. Den här funktionen kan hjälpa dig att använda MongoDB för pubsub-modellen så att du inte behöver hantera Kafka- eller RabbitMQ-distributioner längre. Om din applikation kräver information i realtid måste du kolla in den här funktionen i MongoDB. Jag hoppas att det här inlägget kommer att få dig igång med MongoDB change streams.