Enligt felet har du redan en sträng, (du gjorde redan df.selectExpr("CAST(value AS STRING)")
), så du bör försöka hämta Row-händelsen som en String
, och inte en Array[Byte]
Börja med att ändra
val valueStr = new String(record.getAs[Array[Byte]]("value"))
till
val valueStr = record.getAs[String]("value")
Jag förstår att du kanske redan har ett kluster för att köra Spark-kod, men jag skulle föreslå att du fortfarande tittar på Kafka Connect Mongo Sink Connector så att du inte behöver skriva och underhålla din egen Mongo-skrivare i Spark-kod.
Eller så kan du skriva Spark-datauppsättningar direkt till mongo också