Du kan göra det med fast-csv genom att hämta headers
från schemadefinitionen som kommer att returnera de analyserade raderna som "objekt". Du har faktiskt några felmatchningar, så jag har markerat dem med korrigeringar:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Så länge schemat faktiskt stämmer överens med den angivna CSV-filen är det okej. Det här är korrigeringarna som jag kan se, men om du behöver justera de faktiska fältnamnen annorlunda måste du justera. Men det fanns i princip ett Number
i den position där det finns en String
och i huvudsak ett extra fält, som jag antar är det tomma i CSV-filen.
De allmänna sakerna är att hämta uppsättningen av fältnamn från schemat och överföra det till alternativen när man gör csv-parser-instansen:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
När du verkligen gör det får du tillbaka ett "Objekt" istället för en array:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Oroa dig inte för "typerna" eftersom Mongoose kommer att kasta värdena enligt schema.
Resten sker inom hanteraren för data
händelse. För maximal effektivitet använder vi insertMany()
att bara skriva till databasen en gång var 10 000:e rad. Hur det faktiskt går till servern och processerna beror på MongoDB-versionen, men 10 000 borde vara ganska rimligt baserat på det genomsnittliga antalet fält du skulle importera för en enskild samling i termer av "avvägningen" för minnesanvändning och att skriva en rimlig nätverksbegäran. Gör siffran mindre om det behövs.
De viktiga delarna är att markera dessa samtal som async
funktioner och await
resultatet av insertMany()
innan du fortsätter. Vi måste också pause()
strömmen och resume()
på varje objekt annars riskerar vi att skriva över buffer
av dokument som ska infogas innan de faktiskt skickas. pause()
och resume()
är nödvändiga för att sätta "mottryck" på röret, annars fortsätter föremål bara att "komma ut" och avfyra data
händelse.
Naturligtvis kräver kontrollen för de 10 000 posterna att vi kontrollerar det både vid varje iteration och vid strömavslut för att tömma bufferten och skicka eventuella återstående dokument till servern.
Det är verkligen vad du vill göra, eftersom du absolut inte vill avfyra en asynkbegäran till servern både vid "varje" iteration genom data
händelse eller i princip utan att vänta på att varje begäran ska slutföras. Du kommer undan med att inte kontrollera det för "mycket små filer", men för alla verkliga belastningar är du säker på att överskrida samtalsstacken på grund av "in flight" asynkrona samtal som ännu inte har slutförts.
FYI - en package.json
Begagnade. mz
är valfritt eftersom det bara är ett moderniserat Promise
aktiverat bibliotek av standardnod "inbyggda" bibliotek som jag helt enkelt är van vid att använda. Koden är naturligtvis helt utbytbar med fs
modul.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Med Node v8.9.x och senare kan vi till och med göra detta mycket enklare med en implementering av AsyncIterator
genom stream-to-iterator
modul. Det finns fortfarande i Iterator<Promise<T>>
läge, men det bör göra det tills Node v10.x blir stabil LTS:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
I grund och botten ersätts all hantering av "händelse" för strömmen, paus och återupptagande av en enkel for
loop:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Lätt! Detta rensas upp i senare nodimplementering med for..await..of
när det blir mer stabilt. Men ovanstående fungerar bra på från den angivna versionen och uppåt.