Jak przepakowywać pola złożone w (py)sparku?

Andrzej Galiński · 2022-01-07

Cel: pokazać proste sposoby przetwarzania tablic, struktur i słowników.

Motywacja

W Sparku oprócz typów prostych występują także typy złożone: ArrayType (tablice), MapType (słowniki) i StructType (struktury). Silnik SQLowy posiada funkcje do wykonywania typowych operacji na elementach tablic i słowników. Jeśli potrzebujemy zrobić coś niestandardowego, możemy napisać dedykowaną funkcję i zarejestrować ją jako UDF.

Jest jeszcze jedna opcja: czasem najpraktyczniej jest spłaszczyć strukturę danych, tj. wypakować wartości zagnieżdżone w takim polu do osobnych wierszy lub kolummn. Spłaszczone dane i przetwarzający je kod bywają łatwiejsze do zrozumienia i zdebugowania, a czasami są wręcz koniecznością - np. gdy musimy agregować kolumnę typu tablicowego funkcjami okienkowymi.

Lokalna sesja Sparka

Dla wygody używam Jupytera (jupyter-notebook), ale sam pyspark oczywiście też wystarczy.

python3 -m venv venv
source venv/bin/activate
pip instal pyspark jupyter-notebook

Ustawiam lokalną sesję:

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()

Przetworzenie danych wewnątrz tablicy

Funkcja explode zmienia 1 rekord z tablicą w wiele rekordów z pojedynczymi wartościami; groupby razem z collect_list pozwalają wykonać odwrotną operację:

df1 = spark.createDataFrame([
    Row(name='a', numbers=[1, 2, 3]),
    Row(name='b', numbers=[4, 5, 6])
])
df1.toPandas()

#   name    numbers
# 0    a  [1, 2, 3]
# 1    b  [4, 5, 6]

df11=(df1
  .select('name', f.explode('numbers').alias('n'))
  .withColumn('n', f.expr('n + 1'))  # przekształcenie
  .groupby('name').agg(f.collect_list('n').alias('numbers'))
)
df11.toPandas()

#   name    numbers
# 0    b  [5, 6, 7]
# 1    a  [2, 3, 4]

Przetworzenie danych wewnątrz struktury

Do tworzenia struktur z osobnych kolumn służy funkcja struct(). Operator . pozwala odwołać się do pól wewnątrz struktury - pojedynczo lub równocześnie do wszystkich (.*):

# ----------- przygotowanie -----------

df2 = spark.createDataFrame([
    Row(name='słoik', price=7.0, qty=11),
    Row(name='makaron śrubki', price=4.40, qty=4)
])
df21 = df2.select('name', f.struct('price', 'qty').alias('product_info'))
df21.toPandas()

#              name product_info
# 0           słoik    (7.0, 11)
# 1  makaron śrubki     (4.4, 4)

# ------------ wypakowanie ------------

df22 = df21.select('name', 'product_info.*')
df22.toPandas()

#              name  price  qty
# 0           słoik    7.0   11
# 1  makaron śrubki    4.4    4

# --------- zmiana/spakowanie ---------

df23 = (df22
  .withColumn('total', f.expr('price*qty')) 
  .select('name', f.struct('price', 'qty', 'total').alias("info"))
)
df23.toPandas()

#              name             info
# 0           słoik  (7.0, 11, 77.0)
# 1  makaron śrubki   (4.4, 4, 17.6)

Przetworzenie danych wewnątrz słownika

Kilka razy zdarzyło mi się trafić na kolumny słownikowe, które powstały w wyniku odgadywania schematu przez Sparka. Takim danym można zdefiniować schemat lub pracować z mapą jak ze strukturą, byle ostrożnie - przy luźniejszych typach łatwiej o głupie błędy.

Jeśli interesują nas pary klucz-wartość, możemy zmienić mapę w tablicę struktur (key, value) funkcją map_entries i wypakować z niej osobne wiersze dla każdego wpisu:

df3=spark.createDataFrame([
    Row(lang='en', translations={"lis pospolity": "red fox"}),
    Row(lang='la', translations={"lis pospolity": "Vulpes vulpes", 
                                 "kruk zwyczajny": "Corvus corax"})
])
df3.toPandas()

#   lang                                       translations
# 0   en                       {'lis pospolity': 'red fox'}
# 1   la  {'lis pospolity': 'Vulpes vulpes', 'kruk zwycz...

df31=(df3
  .withColumn('exploded', f.explode(f.map_entries('translations')))
  .select(
     'lang',
     'exploded.key',
     'exploded.value',
  )
)

print(df31.toPandas())

#   lang             key          value
# 0   en   lis pospolity        red fox
# 1   la   lis pospolity  Vulpes vulpes
# 2   la  kruk zwyczajny   Corvus corax

Nie zdarzyło mi się wypakowywać do osobnych kolumn elementów prawdziwego słownika, takiego z różnymi kluczami w różnych wierszach. Pewnie w jakichś okolicznościach to może być przydatne, ale ja ich nie znam :)

Rozmaitości

  • Uwaga na nazwy wypakowanych pól! Po wypakowaniu pola ze struktury w schemacie pojawia się tylko jego ostatni człon, np.: df.select("user.address.street").columns --> [street]

Linki