Apache Spark / CSV soubor
Apache Spark, jakožto jeden z hlavních zástupců distribuovaných výpočetních systémů, podporuje hned několik formátů pro čtení a zápis. Tím pravděpodobně nejjednodušším je textový formát s oddělovači, ve kterém poskytuje svoje data i Český statistický úřad.
Pro účely ukázky, jak takový soubor načíst, aplikovat základní filtr a znovu uložit, jsem si vybral dataset s vývojem počtu hospodářských zvířat v České republice.
val df = sparkSession.read
.option("header", true)
.option("inferSchema", true)
.csv("data/zvirata.csv")
Při zapnuté volbě automatického odvození datových typů sloupců je načtení CSV souboru otázka několika řádků kódu. Vzhledem k tomu, že Spark musí data projít dvakrát, je vhodné odvozovat typy jen u malých kolekcí.
df.printSchema()
root |-- idhod: integer (nullable = true) |-- hodnota: integer (nullable = true) |-- stapro_kod: integer (nullable = true) |-- DRUHZVIRE_cis: integer (nullable = true) |-- DRUHZVIRE_kod: integer (nullable = true) |-- refobdobi: integer (nullable = true) |-- rok: integer (nullable = true) |-- uzemi_cis: integer (nullable = true) |-- uzemi_kod: integer (nullable = true) |-- STAPRO_TXT: string (nullable = true) |-- uzemi_txt: string (nullable = true) |-- DRUHZVIRE_txt: string (nullable = true)
Obecně je dobrou praxí vždy zadefinovat schema manuálně, a to nejen kvůli rychlosti, ale i zamezení vzniku případných chyb.
val schema = StructType(
Seq(
StructField("idhod", StringType),
StructField("hodnota", IntegerType),
StructField("stapro_kod", StringType),
StructField("DRUHZVIRE_cis", StringType),
StructField("DRUHZVIRE_kod", StringType),
StructField("refobdobi", DateType),
StructField("rok", IntegerType),
StructField("uzemi_cis", StringType),
StructField("uzemi_kod", StringType),
StructField("STAPRO_TXT", StringType),
StructField("uzemi_txt", StringType),
StructField("DRUHZVIRE_txt", StringType)
)
)
val dfWithSchema = sparkSession.read
.schema(schema)
.option("header", true)
.option("dateFormat", "yyyyMMdd")
.csv("data/zvirata.csv")
Jelikož dataset obsahuje datum v nestandardním formátu, je potřeba Sparku také podsunout masku parametrem dateFormat.
dfWithSchema.printSchema()
root |-- idhod: string (nullable = true) |-- hodnota: integer (nullable = true) |-- stapro_kod: string (nullable = true) |-- DRUHZVIRE_cis: string (nullable = true) |-- DRUHZVIRE_kod: string (nullable = true) |-- refobdobi: date (nullable = true) |-- rok: integer (nullable = true) |-- uzemi_cis: string (nullable = true) |-- uzemi_kod: string (nullable = true) |-- STAPRO_TXT: string (nullable = true) |-- uzemi_txt: string (nullable = true) |-- DRUHZVIRE_txt: string (nullable = true)
Aplikace jednoduchého filtru a následné uložení do souboru je opět otázka několika řádků. Volbou coalesce lze docílit snížení počtu výsledných souborů, v tomto případě na jeden.
dfWithSchema
.filter(col("rok") === lit("2018"))
.coalesce(1)
.write
.option("header", true)
.mode("overwrite")
.csv("data/zvirata_2018")
Celý projekt s příklady je k dispozici na githubu.