sql >> Databasteknik >  >> NoSQL >> MongoDB

Realtidsdataströmning med MongoDB Change Streams

Nyligen släppte MongoDB en ny funktion från och med version 3.6, Change Streams. Detta ger dig omedelbar tillgång till dina data som hjälper dig att hålla dig uppdaterad med dina dataändringar. I dagens värld vill alla ha omedelbara aviseringar snarare än att få det efter några timmar eller minuter. För vissa applikationer är det viktigt att skicka realtidsmeddelanden till alla prenumererade användare för varje uppdatering. MongoDB gjorde denna process väldigt enkel genom att introducera den här funktionen. I den här artikeln kommer vi att lära oss om MongoDB change stream och dess applikationer med några exempel.

Definiera ändringsströmmar

Ändringsströmmar är inget annat än realtidsströmmen av alla ändringar som sker i databasen eller samlingen eller till och med i distributioner. Till exempel, när en uppdatering (Infoga, Uppdatera eller Ta bort) sker i en specifik samling, utlöser MongoDB en förändringshändelse med all data som har ändrats.

Du kan definiera ändringsströmmar på vilken samling som helst precis som alla andra normala aggregeringsoperatorer med $changeStream-operatorn och watch()-metoden. Du kan också definiera ändringsström med metoden MongoCollection.watch().

Exempel

db.myCollection.watch()

Ändra strömningsfunktioner

  • Filtrera ändringar

    Du kan filtrera ändringarna för att få händelseaviseringar endast för vissa riktade data.

    Exempel:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Den här koden ser till att du bara får uppdateringar för poster som har ett namn som är lika med Bob. På så sätt kan du skriva vilka pipelines som helst för att filtrera förändringsströmmarna.

  • Återuppta ändringsströmmar

    Denna funktion säkerställer att det inte sker någon dataförlust vid eventuella fel. Varje svar i strömmen innehåller återuppta token som kan användas för att starta om strömmen från en specifik punkt. För vissa frekventa nätverksfel kommer mongodb-drivrutinen att försöka återupprätta anslutningen med abonnenterna med den senaste återuppta-tokenen. Även om, i händelse av fullständigt programfel, bör återuppta token underhållas av klienterna för att återuppta strömmen.

  • Beställda ändringsströmmar

    MongoDB använder en global logisk klocka för att ordna alla förändringsströmhändelser över alla repliker och skärvor av alla kluster, så mottagaren kommer alltid att ta emot aviseringarna i samma ordning som kommandona tillämpades på databasen.

  • Händelser med fullständiga dokument

    MongoDB returnerar delen av matchande dokument som standard. Men du kan ändra strömkonfigurationen för att få ett fullständigt dokument. För att göra det skickar du { fullDocument:“updateLookup”} till övervakningsmetoden.
    Exempel:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Hållbarhet

    Ändringsströmmar kommer endast att meddela för de data som är anslutna till majoriteten av replikerna. Detta kommer att se till att händelser genereras av majoritetsbeständighetsdata, vilket säkerställer meddelandets hållbarhet.

  • Säkerhet/åtkomstkontroll

    Ändringsströmmar är mycket säkra. Användare kan skapa ändringsströmmar endast på de samlingar som de har läsbehörighet för. Du kan skapa ändringsströmmar baserat på användarroller.

Severalnines Become a MongoDB DBA - Bringing MongoDB to ProductionLäs om vad du behöver veta för att distribuera, övervaka, hantera och skala MongoDBDownload gratis

Exempel på ändringsströmmar

I det här exemplet kommer vi att skapa förändringsströmmar i aktiesamlingen för att få ett meddelande när en aktiekurs överstiger en tröskel.

  • Konfigurera klustret

    För att använda ändringsströmmar måste vi först skapa replikuppsättningar. Kör följande kommando för att skapa en replikuppsättning för en nod.

    mongod --dbpath ./data --replSet “rs”
  • Infoga några poster i Aktiesamlingen

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Ställ in nodmiljö och installationsberoenden

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Prenumerera på ändringarna

    Skapa en index.js-fil och lägg in följande kod i den.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Kör nu den här filen:

    node index.js
  • Infoga en ny post i db för att få en uppdatering

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Kontrollera nu din konsol, du kommer att få en uppdatering från MongoDB.
    Exempelsvar:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Här kan du ändra värdet på parameter operationType med följande operationer för att lyssna efter olika typer av ändringar i en samling:

  • Infoga
  • Ersätt (förutom unikt ID)
  • Uppdatera
  • Ta bort
  • Ogiltigförklara (när Mongo returnerar ogiltig markör)

Andra lägen för ändringsströmmar

Du kan börja ändra strömmar mot en databas och driftsättning på samma sätt som mot insamling. Den här funktionen har släppts från MongoDB version 4.0. Här är kommandona för att öppna en ändringsström mot databas och distributioner.

Against DB: db.watch()
Against deployment: Mongo.watch()

Slutsats

MongoDB Change Streams förenklar integrationen mellan frontend och backend på ett sömlöst sätt i realtid. Den här funktionen kan hjälpa dig att använda MongoDB för pubsub-modellen så att du inte behöver hantera Kafka- eller RabbitMQ-distributioner längre. Om din applikation kräver information i realtid måste du kolla in den här funktionen i MongoDB. Jag hoppas att det här inlägget kommer att få dig igång med MongoDB change streams.


  1. MongoDB Ersätt specifika matrisvärden

  2. Det gick inte att automatiskt konfigurera en datakälla:'spring.datasource.url' är inte specificerad

  3. Lagra Enums som strängar i MongoDB

  4. Mongo $i operatörsprestanda