I Spark, funktionerna på RDD
s (som map
här) serialiseras och skickas till exekutörerna för bearbetning. Detta innebär att alla element som ingår i dessa operationer bör kunna serialiseras.
Redis-anslutningen här är inte serialiserbar eftersom den öppnar TCP-anslutningar till mål-DB som är bundna till maskinen där den skapades.
Lösningen är att skapa dessa kopplingar på exekutörerna, i det lokala exekveringssammanhanget. Det finns få sätt att göra det på. Två som du tänker på är:
rdd.mapPartitions
:låter dig bearbeta en hel partition på en gång och därför amortera kostnaden för att skapa anslutningar)- Singleton anslutningshanterare:Skapa anslutningen en gång per utförare
mapPartitions
är enklare eftersom det enda som krävs är en liten ändring av programstrukturen:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
En singleton-anslutningshanterare kan modelleras med ett objekt som har en lat referens till en anslutning (notera:en föränderlig ref kommer också att fungera).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Det här objektet kan sedan användas för att instansiera 1 anslutning per arbetar-JVM och används som en Serializable
föremål i en operationsstängning.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Fördelen med att använda singleton-objektet är mindre overhead eftersom anslutningar bara skapas en gång av JVM (i motsats till 1 per RDD-partition)
Det finns också några nackdelar:
- rengöring av anslutningar är knepigt (avstängningskrok/timer)
- man måste säkerställa trådsäkerhet för delade resurser
(*) koden tillhandahålls i illustrationssyfte. Inte kompilerad eller testad.