Map-reduce är kanske den mest mångsidiga av de aggregeringsoperationer som MongoDB stöder.
Map-Reduce är en populär programmeringsmodell som har sitt ursprung hos Google för att bearbeta och aggregera stora datamängder parallellt. En detaljerad diskussion om Map-Reduce faller utanför omfattningen av denna artikel, men i huvudsak är det en aggregeringsprocess i flera steg. De två viktigaste stegen är kartskedet (bearbeta varje dokument och skicka ut resultat) och reduceringssteget (sammanställer resultat som sänds ut under kartskedet).
MongoDB stöder tre typer av aggregeringsoperationer:Map-Reduce, aggregeringspipeline och aggregeringskommandon för enstaka ändamål. Du kan använda detta MongoDB-jämförelsedokument för att se vilket som passar dina behov.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
I mitt förra inlägg såg vi, med exempel, hur man kör Aggregation pipelines på sekundärer. I det här inlägget kommer vi att gå igenom att köra Map-Reduce-jobb på MongoDB sekundära repliker.
MongoDB Map-Reduce
MongoDB stöder körning av Map-Reduce-jobb på databasservrarna. Detta ger flexibiliteten att skriva komplexa aggregeringsuppgifter som inte är lika lätta att göra via aggregeringspipelines. MongoDB låter dig skriva anpassad karta och reducera funktioner i Javascript som kan skickas till databasen via Mongo-skal eller någon annan klient. På stora och ständigt växande datamängder kan man till och med överväga att köra inkrementella Map-Reduce-jobb för att undvika att bearbeta äldre data varje gång.
Historiskt har kartan och reduceringsmetoderna använts för att exekveras i en entrådig kontext. Den begränsningen togs dock bort i version 2.4.
Varför köra Map-Reduce-jobb på sekundären?
Liksom andra aggregeringsjobb är Map-Reduce också ett resurskrävande "batch"-jobb så det passar bra för att köra på skrivskyddade repliker. Förbehåll för att göra det är:
1) Det borde vara ok att använda lite inaktuella data. Eller så kan du justera skrivproblemet för att säkerställa att replikerna alltid är synkroniserade med den primära. Det här andra alternativet förutsätter att det är acceptabelt att ta en träff på skrivprestandan.
2) Utdata från Map-Reduce-jobbet ska inte skrivas till en annan samling i databasen utan snarare returneras till applikationen (dvs. inga skrivningar till databasen).
Låt oss titta på hur man gör detta via exempel, både från mongo-skalet och Java-drivrutinen.
Map-Reduce on Replica Sets
Datauppsättning
Som illustration kommer vi att använda en ganska enkel datamängd:En daglig transaktionspostdump från en återförsäljare. En exempelpost ser ut så här:
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
I våra exempel kommer vi att beräkna den totala utgiften för en given kund den dagen. Sålunda, givet vårt schema, kommer kart- och reduceringsmetoderna att se ut så här:
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
Med vårt schema etablerat, låt oss titta på Map-Reduce i aktion.
MongoDB Shell
För att säkerställa att ett Map-Reduce-jobb exekveras på det sekundära, bör läspreferensen sättas till sekundär . Som vi sa ovan, för att en Map-Reduce ska kunna köras på en sekundär måste resultatet av resultatet vara inline (Faktum är att det är det enda utvärde som tillåts på sekundärer). Låt oss se hur det fungerar.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
En titt på loggarna på sekundären bekräftar att jobbet verkligen kördes på sekundären.
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Java
Låt oss nu försöka köra ett Map-Reduce-jobb på läsreplikerna från en Java-applikation. På MongoDB Java-drivrutinen gör det susen att ställa in läspreferensen. Utdata är inline som standard så inga ytterligare parametrar behöver skickas. Här är ett exempel med drivrutinsversion 3.2.2:
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
Som framgår av loggarna kördes jobbet på den sekundära:
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDB Map-Reduce på delade kluster
MongoDB stöder Map-Reduce på sharded kluster, både när en sharded samling är indata och när det är output från ett Map-Reduce-jobb. Men MongoDB stöder för närvarande inte körning av kartreducerande jobb på sekundärer av ett fragmenterat kluster. Så även om ut-alternativet är inställd på inline , Map-Reduce-jobb kommer alltid att köras på grundvalen av ett fragmenterat kluster. Det här problemet spåras genom detta JIRA-fel.
Syntaxen för att köra ett Map-Reduce-jobb på ett fragmenterat kluster är samma som för en replikuppsättning. Så exemplen som ges i avsnittet ovan håller. Om ovanstående Java-exempel körs på ett delat kluster, visas loggmeddelanden på primärerna som indikerar att kommandot kördes där.
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
Besök vår MongoDB-produktsida för att ta reda på om vår omfattande lista med funktioner.