sql >> Databasteknik >  >> RDS >> PostgreSQL

Primära nycklar med Apache Spark

Scala :

Om allt du behöver är unika nummer kan du använda zipWithUniqueId och återskapa DataFrame. Först några importer och dummydata:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Extrahera schema för vidare användning:

val schema = df.schema

Lägg till id-fält:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Skapa DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Samma sak i Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Om du föredrar ett löpande nummer kan du ersätta zipWithUniqueId med zipWithIndex men det är lite dyrare.

Direkt med DataFrame API :

(universell Scala, Python, Java, R med i stort sett samma syntax)

Tidigare har jag missat monotonicallyIncreasingId funktion som borde fungera bra så länge du inte kräver på varandra följande nummer:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Även om det är användbart monotonicallyIncreasingId är icke-deterministiskt. Det är inte bara id som kan skilja sig från exekvering till exekvering, men utan ytterligare trick kan inte användas för att identifiera rader när efterföljande operationer innehåller filter.

Obs :

Det är också möjligt att använda rowNumber fönsterfunktion:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Tyvärr:

VARNA Fönster:Ingen partition definierad för fönsterdrift! Om all data flyttas till en enda partition kan detta orsaka allvarlig prestandaförsämring.

Så om du inte har ett naturligt sätt att partitionera dina data och säkerställa att unikhet inte är särskilt användbar just nu.



  1. Hur LEFT() fungerar i MariaDB

  2. Få första veckodagen i SQL Server

  3. PostgreSQL användarbehörigheter

  4. Få storleken på en databas i MariaDB