Cel: pokazać proste sposoby przetwarzania tablic, struktur i słowników.
Motywacja
W Sparku oprócz typów prostych występują także typy złożone: ArrayType
(tablice), MapType
(słowniki) i StructType
(struktury). Silnik SQLowy
posiada funkcje
do wykonywania typowych operacji na elementach tablic i słowników. Jeśli
potrzebujemy zrobić coś niestandardowego, możemy napisać dedykowaną funkcję i
zarejestrować ją jako UDF.
Jest jeszcze jedna opcja: czasem najpraktyczniej jest spłaszczyć strukturę danych, tj. wypakować wartości zagnieżdżone w takim polu do osobnych wierszy lub kolummn. Spłaszczone dane i przetwarzający je kod bywają łatwiejsze do zrozumienia i zdebugowania, a czasami są wręcz koniecznością - np. gdy musimy agregować kolumnę typu tablicowego funkcjami okienkowymi.
Lokalna sesja Sparka
Dla wygody używam Jupytera (jupyter-notebook
), ale sam pyspark
oczywiście też wystarczy.
python3 -m venv venv
source venv/bin/activate
pip instal pyspark jupyter-notebook
Ustawiam lokalną sesję:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as f
spark = SparkSession.builder.getOrCreate()
Przetworzenie danych wewnątrz tablicy
Funkcja explode
zmienia 1 rekord z tablicą w wiele rekordów z pojedynczymi
wartościami; groupby
razem z collect_list
pozwalają wykonać odwrotną
operację:
df1 = spark.createDataFrame([
Row(name='a', numbers=[1, 2, 3]),
Row(name='b', numbers=[4, 5, 6])
])
df1.toPandas()
# name numbers
# 0 a [1, 2, 3]
# 1 b [4, 5, 6]
df11=(df1
.select('name', f.explode('numbers').alias('n'))
.withColumn('n', f.expr('n + 1')) # przekształcenie
.groupby('name').agg(f.collect_list('n').alias('numbers'))
)
df11.toPandas()
# name numbers
# 0 b [5, 6, 7]
# 1 a [2, 3, 4]
Przetworzenie danych wewnątrz struktury
Do tworzenia struktur z osobnych kolumn służy funkcja struct()
. Operator .
pozwala odwołać się do pól wewnątrz struktury - pojedynczo lub równocześnie do
wszystkich (.*
):
# ----------- przygotowanie -----------
df2 = spark.createDataFrame([
Row(name='słoik', price=7.0, qty=11),
Row(name='makaron śrubki', price=4.40, qty=4)
])
df21 = df2.select('name', f.struct('price', 'qty').alias('product_info'))
df21.toPandas()
# name product_info
# 0 słoik (7.0, 11)
# 1 makaron śrubki (4.4, 4)
# ------------ wypakowanie ------------
df22 = df21.select('name', 'product_info.*')
df22.toPandas()
# name price qty
# 0 słoik 7.0 11
# 1 makaron śrubki 4.4 4
# --------- zmiana/spakowanie ---------
df23 = (df22
.withColumn('total', f.expr('price*qty'))
.select('name', f.struct('price', 'qty', 'total').alias("info"))
)
df23.toPandas()
# name info
# 0 słoik (7.0, 11, 77.0)
# 1 makaron śrubki (4.4, 4, 17.6)
Przetworzenie danych wewnątrz słownika
Kilka razy zdarzyło mi się trafić na kolumny słownikowe, które powstały w wyniku odgadywania schematu przez Sparka. Takim danym można zdefiniować schemat lub pracować z mapą jak ze strukturą, byle ostrożnie - przy luźniejszych typach łatwiej o głupie błędy.
Jeśli interesują nas pary klucz-wartość, możemy zmienić mapę w tablicę struktur
(key, value)
funkcją map_entries
i wypakować z niej osobne wiersze dla
każdego wpisu:
df3=spark.createDataFrame([
Row(lang='en', translations={"lis pospolity": "red fox"}),
Row(lang='la', translations={"lis pospolity": "Vulpes vulpes",
"kruk zwyczajny": "Corvus corax"})
])
df3.toPandas()
# lang translations
# 0 en {'lis pospolity': 'red fox'}
# 1 la {'lis pospolity': 'Vulpes vulpes', 'kruk zwycz...
df31=(df3
.withColumn('exploded', f.explode(f.map_entries('translations')))
.select(
'lang',
'exploded.key',
'exploded.value',
)
)
print(df31.toPandas())
# lang key value
# 0 en lis pospolity red fox
# 1 la lis pospolity Vulpes vulpes
# 2 la kruk zwyczajny Corvus corax
Nie zdarzyło mi się wypakowywać do osobnych kolumn elementów prawdziwego słownika, takiego z różnymi kluczami w różnych wierszach. Pewnie w jakichś okolicznościach to może być przydatne, ale ja ich nie znam :)
Rozmaitości
- Uwaga na nazwy wypakowanych pól! Po wypakowaniu pola ze struktury w schemacie
pojawia się tylko jego ostatni człon, np.:
df.select("user.address.street").columns --> [street]
Linki
- Dokumentacja funkcji SparkSQL, w tym tych do przetwarzania typów złożonych: https://spark.apache.org/docs/3.2.0/sql-ref-functions-builtin.html