Jag är ingen expert på mongodb, men utifrån de exempel jag har sett är det här ett mönster jag skulle prova.
Jag har utelämnat andra händelser än data, eftersom strypningen av den ena verkar vara huvudproblemet.
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
Jag försöker sätta ihop ett test av detta Rx-flöde utan mongodb, under tiden kan det här ge dig några idéer.