sql >> Databasteknik >  >> NoSQL >> Redis

Redis on Spark:Task kan inte serialiseras

I Spark, funktionerna på RDD s (som map här) serialiseras och skickas till exekutörerna för bearbetning. Detta innebär att alla element som ingår i dessa operationer bör kunna serialiseras.

Redis-anslutningen här är inte serialiserbar eftersom den öppnar TCP-anslutningar till mål-DB som är bundna till maskinen där den skapades.

Lösningen är att skapa dessa kopplingar på exekutörerna, i det lokala exekveringssammanhanget. Det finns få sätt att göra det på. Två som du tänker på är:

  • rdd.mapPartitions :låter dig bearbeta en hel partition på en gång och därför amortera kostnaden för att skapa anslutningar)
  • Singleton anslutningshanterare:Skapa anslutningen en gång per utförare

mapPartitions är enklare eftersom det enda som krävs är en liten ändring av programstrukturen:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

En singleton-anslutningshanterare kan modelleras med ett objekt som har en lat referens till en anslutning (notera:en föränderlig ref kommer också att fungera).

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Det här objektet kan sedan användas för att instansiera 1 anslutning per arbetar-JVM och används som en Serializable föremål i en operationsstängning.

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

Fördelen med att använda singleton-objektet är mindre overhead eftersom anslutningar bara skapas en gång av JVM (i motsats till 1 per RDD-partition)

Det finns också några nackdelar:

  • rengöring av anslutningar är knepigt (avstängningskrok/timer)
  • man måste säkerställa trådsäkerhet för delade resurser

(*) koden tillhandahålls i illustrationssyfte. Inte kompilerad eller testad.



  1. MongoDB-prestanda:Kör MongoDB-aggregeringar på sekundärer

  2. Redis-servern kan inte köra mer än 1024M maxheap

  3. MongoDB $summa och $avg av underdokument

  4. Redis Client List syfte och beskrivning