Nedan är svaret:
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.math.indexeddataset.Schema
import org.apache.mahout.sparkbindings
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.RDD
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
implicit val mc = sparkbindings.mahoutSparkContext(masterUrl = "local", appName = "RowSimilarity")
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://hostname:27017/db.collection")
val documents: RDD[(Object, BSONObject)] = mc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val documents_Array: RDD[(String, Array[String])] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-").mkString(" "))
)
)
val new_doc: RDD[(String, String)] = documents_Array.flatMapValues(x => x)
val myIDs = IndexedDatasetSpark(new_doc)(mc)
val readWriteSchema = new Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"omitScore" -> false,
"elementDelim" -> " "
)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://hadoop:9000/mongo-hadoop-rowsimilarity", readWriteSchema)(mc)
}
build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
Jag har använt mongo-hadoop för att hämta data från Mongo och använda den. Eftersom mina data hade en array, var jag tvungen att använda flatMapValues för att platta till den och sedan skicka till IDS för korrekt utdata.
PS:Jag postade svaret här och inte den länkade frågan eftersom denna Q&A täcker hela omfattningen av att hämta data och bearbeta dem.