Scala :
Om allt du behöver är unika nummer kan du använda zipWithUniqueId
och återskapa DataFrame. Först några importer och dummydata:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Extrahera schema för vidare användning:
val schema = df.schema
Lägg till id-fält:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Skapa DataFrame:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Samma sak i Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Om du föredrar ett löpande nummer kan du ersätta zipWithUniqueId
med zipWithIndex
men det är lite dyrare.
Direkt med DataFrame
API :
(universell Scala, Python, Java, R med i stort sett samma syntax)
Tidigare har jag missat monotonicallyIncreasingId
funktion som borde fungera bra så länge du inte kräver på varandra följande nummer:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Även om det är användbart monotonicallyIncreasingId
är icke-deterministiskt. Det är inte bara id som kan skilja sig från exekvering till exekvering, men utan ytterligare trick kan inte användas för att identifiera rader när efterföljande operationer innehåller filter.
Obs :
Det är också möjligt att använda rowNumber
fönsterfunktion:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Tyvärr:
VARNA Fönster:Ingen partition definierad för fönsterdrift! Om all data flyttas till en enda partition kan detta orsaka allvarlig prestandaförsämring.
Så om du inte har ett naturligt sätt att partitionera dina data och säkerställa att unikhet inte är särskilt användbar just nu.