sql >> Databasteknik >  >> NoSQL >> MongoDB

Importera CSV med Mongoose Schema

Du kan göra det med fast-csv genom att hämta headers från schemadefinitionen som kommer att returnera de analyserade raderna som "objekt". Du har faktiskt några felmatchningar, så jag har markerat dem med korrigeringar:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Så länge schemat faktiskt stämmer överens med den angivna CSV-filen är det okej. Det här är korrigeringarna som jag kan se, men om du behöver justera de faktiska fältnamnen annorlunda måste du justera. Men det fanns i princip ett Number i den position där det finns en String och i huvudsak ett extra fält, som jag antar är det tomma i CSV-filen.

De allmänna sakerna är att hämta uppsättningen av fältnamn från schemat och överföra det till alternativen när man gör csv-parser-instansen:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

När du verkligen gör det får du tillbaka ett "Objekt" istället för en array:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Oroa dig inte för "typerna" eftersom Mongoose kommer att kasta värdena enligt schema.

Resten sker inom hanteraren för data händelse. För maximal effektivitet använder vi insertMany() att bara skriva till databasen en gång var 10 000:e rad. Hur det faktiskt går till servern och processerna beror på MongoDB-versionen, men 10 000 borde vara ganska rimligt baserat på det genomsnittliga antalet fält du skulle importera för en enskild samling i termer av "avvägningen" för minnesanvändning och att skriva en rimlig nätverksbegäran. Gör siffran mindre om det behövs.

De viktiga delarna är att markera dessa samtal som async funktioner och await resultatet av insertMany() innan du fortsätter. Vi måste också pause() strömmen och resume() på varje objekt annars riskerar vi att skriva över buffer av dokument som ska infogas innan de faktiskt skickas. pause() och resume() är nödvändiga för att sätta "mottryck" på röret, annars fortsätter föremål bara att "komma ut" och avfyra data händelse.

Naturligtvis kräver kontrollen för de 10 000 posterna att vi kontrollerar det både vid varje iteration och vid strömavslut för att tömma bufferten och skicka eventuella återstående dokument till servern.

Det är verkligen vad du vill göra, eftersom du absolut inte vill avfyra en asynkbegäran till servern både vid "varje" iteration genom data händelse eller i princip utan att vänta på att varje begäran ska slutföras. Du kommer undan med att inte kontrollera det för "mycket små filer", men för alla verkliga belastningar är du säker på att överskrida samtalsstacken på grund av "in flight" asynkrona samtal som ännu inte har slutförts.

FYI - en package.json Begagnade. mz är valfritt eftersom det bara är ett moderniserat Promise aktiverat bibliotek av standardnod "inbyggda" bibliotek som jag helt enkelt är van vid att använda. Koden är naturligtvis helt utbytbar med fs modul.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Med Node v8.9.x och senare kan vi till och med göra detta mycket enklare med en implementering av AsyncIterator genom stream-to-iterator modul. Det finns fortfarande i Iterator<Promise<T>> läge, men det bör göra det tills Node v10.x blir stabil LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

I grund och botten ersätts all hantering av "händelse" för strömmen, paus och återupptagande av en enkel for loop:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Lätt! Detta rensas upp i senare nodimplementering med for..await..of när det blir mer stabilt. Men ovanstående fungerar bra på från den angivna versionen och uppåt.



  1. Vad är en bra strategi för att gruppera liknande ord?

  2. Hur fungerar sortering med ett index i MongoDB?

  3. Vad är det korrekta sättet att indexera i MongoDB när stora kombinationer av fält finns

  4. Det går inte att låsa ett mongodb-dokument. Vad händer om jag behöver?