Apache Spark / CSV soubor
Apache Spark, jakožto jeden z hlavních zástupců distribuovaných výpočetních systémů, podporuje hned několik formátů pro čtení a zápis. Tím pravděpodobně nejjednodušším je textový formát s oddělovači, ve kterém poskytuje svoje data i Český statistický úřad.
Pro účely ukázky, jak takový soubor načíst, aplikovat základní filtr a znovu uložit, jsem si vybral dataset s vývojem počtu hospodářských zvířat v České republice.
val df = sparkSession.read .option("header", true) .option("inferSchema", true) .csv("data/zvirata.csv")
Při zapnuté volbě automatického odvození datových typů sloupců je načtení CSV souboru otázka několika řádků kódu. Vzhledem k tomu, že Spark musí data projít dvakrát, je vhodné odvozovat typy jen u malých kolekcí.
df.printSchema()
root |-- idhod: integer (nullable = true) |-- hodnota: integer (nullable = true) |-- stapro_kod: integer (nullable = true) |-- DRUHZVIRE_cis: integer (nullable = true) |-- DRUHZVIRE_kod: integer (nullable = true) |-- refobdobi: integer (nullable = true) |-- rok: integer (nullable = true) |-- uzemi_cis: integer (nullable = true) |-- uzemi_kod: integer (nullable = true) |-- STAPRO_TXT: string (nullable = true) |-- uzemi_txt: string (nullable = true) |-- DRUHZVIRE_txt: string (nullable = true)
Obecně je dobrou praxí vždy zadefinovat schema manuálně, a to nejen kvůli rychlosti, ale i zamezení vzniku případných chyb.
val schema = StructType( Seq( StructField("idhod", StringType), StructField("hodnota", IntegerType), StructField("stapro_kod", StringType), StructField("DRUHZVIRE_cis", StringType), StructField("DRUHZVIRE_kod", StringType), StructField("refobdobi", DateType), StructField("rok", IntegerType), StructField("uzemi_cis", StringType), StructField("uzemi_kod", StringType), StructField("STAPRO_TXT", StringType), StructField("uzemi_txt", StringType), StructField("DRUHZVIRE_txt", StringType) ) ) val dfWithSchema = sparkSession.read .schema(schema) .option("header", true) .option("dateFormat", "yyyyMMdd") .csv("data/zvirata.csv")
Jelikož dataset obsahuje datum v nestandardním formátu, je potřeba Sparku také podsunout masku parametrem dateFormat
.
dfWithSchema.printSchema()
root |-- idhod: string (nullable = true) |-- hodnota: integer (nullable = true) |-- stapro_kod: string (nullable = true) |-- DRUHZVIRE_cis: string (nullable = true) |-- DRUHZVIRE_kod: string (nullable = true) |-- refobdobi: date (nullable = true) |-- rok: integer (nullable = true) |-- uzemi_cis: string (nullable = true) |-- uzemi_kod: string (nullable = true) |-- STAPRO_TXT: string (nullable = true) |-- uzemi_txt: string (nullable = true) |-- DRUHZVIRE_txt: string (nullable = true)
Aplikace jednoduchého filtru a následné uložení do souboru je opět otázka několika řádků. Volbou coalesce
lze docílit snížení počtu výsledných souborů, v tomto případě na jeden.
dfWithSchema .filter(col("rok") === lit("2018")) .coalesce(1) .write .option("header", true) .mode("overwrite") .csv("data/zvirata_2018")
Celý projekt s příklady je k dispozici na githubu.