Apache Spark
Apache Spark

Apache Spark / CSV soubor

Apache Spark, jakožto jeden z hlavních zástupců distribuovaných výpočetních systémů, podporuje hned několik formátů pro čtení a zápis. Tím pravděpodobně nejjednodušším je textový formát s oddělovači, ve kterém poskytuje svoje data i Český statistický úřad.

Pro účely ukázky, jak takový soubor načíst, aplikovat základní filtr a znovu uložit, jsem si vybral dataset s vývojem počtu hospodářských zvířat v České republice.

val df = sparkSession.read
  .option("header", true)
  .option("inferSchema", true)
  .csv("data/zvirata.csv")

Při zapnuté volbě automatického odvození datových typů sloupců je načtení CSV souboru otázka několika řádků kódu. Vzhledem k tomu, že Spark musí data projít dvakrát, je vhodné odvozovat typy jen u malých kolekcí.

df.printSchema()
root
 |-- idhod: integer (nullable = true)
 |-- hodnota: integer (nullable = true)
 |-- stapro_kod: integer (nullable = true)
 |-- DRUHZVIRE_cis: integer (nullable = true)
 |-- DRUHZVIRE_kod: integer (nullable = true)
 |-- refobdobi: integer (nullable = true)
 |-- rok: integer (nullable = true)
 |-- uzemi_cis: integer (nullable = true)
 |-- uzemi_kod: integer (nullable = true)
 |-- STAPRO_TXT: string (nullable = true)
 |-- uzemi_txt: string (nullable = true)
 |-- DRUHZVIRE_txt: string (nullable = true)

Obecně je dobrou praxí vždy zadefinovat schema manuálně, a to nejen kvůli rychlosti, ale i zamezení vzniku případných chyb.

val schema = StructType(
  Seq(
    StructField("idhod", StringType),
    StructField("hodnota", IntegerType),
    StructField("stapro_kod", StringType),
    StructField("DRUHZVIRE_cis", StringType),
    StructField("DRUHZVIRE_kod", StringType),
    StructField("refobdobi", DateType),
    StructField("rok", IntegerType),
    StructField("uzemi_cis", StringType),
    StructField("uzemi_kod", StringType),
    StructField("STAPRO_TXT", StringType),
    StructField("uzemi_txt", StringType),
    StructField("DRUHZVIRE_txt", StringType)
  )
)

val dfWithSchema = sparkSession.read
  .schema(schema)
  .option("header", true)
  .option("dateFormat", "yyyyMMdd")
  .csv("data/zvirata.csv")

Jelikož dataset obsahuje datum v nestandardním formátu, je potřeba Sparku také podsunout masku parametrem dateFormat.

dfWithSchema.printSchema()
root
 |-- idhod: string (nullable = true)
 |-- hodnota: integer (nullable = true)
 |-- stapro_kod: string (nullable = true)
 |-- DRUHZVIRE_cis: string (nullable = true)
 |-- DRUHZVIRE_kod: string (nullable = true)
 |-- refobdobi: date (nullable = true)
 |-- rok: integer (nullable = true)
 |-- uzemi_cis: string (nullable = true)
 |-- uzemi_kod: string (nullable = true)
 |-- STAPRO_TXT: string (nullable = true)
 |-- uzemi_txt: string (nullable = true)
 |-- DRUHZVIRE_txt: string (nullable = true)

Aplikace jednoduchého filtru a následné uložení do souboru je opět otázka několika řádků. Volbou coalesce lze docílit snížení počtu výsledných souborů, v tomto případě na jeden.

dfWithSchema
  .filter(col("rok") === lit("2018"))
  .coalesce(1)
  .write
  .option("header", true)
  .mode("overwrite")
  .csv("data/zvirata_2018")

Celý projekt s příklady je k dispozici na githubu.

Zanechte komentář

Vaše emailová adresa nebude zobrazena. Povinná pole jsou označena *