Apache Spark
Apache Spark

Apache Spark / CSV file

Apache Spark, as one of the main representatives of distributed computing systems, supports several formats for reading and writing. Probably the simplest of these is the delimited text format, in which the Czech Statistical Office also provides its data.

For the purpose of demonstrating how to load such a file, apply a basic filter, and save it again, I chose a dataset with the development of the number of farm animals in the Czech Republic.

val df = sparkSession.read
  .option("header", true)
  .option("inferSchema", true)
  .csv("data/zvirata.csv")

With the option for automatic inference of column data types enabled, loading a CSV file is a matter of a few lines of code. Given that Spark has to go through the data twice, it is advisable to infer types only for small collections.

df.printSchema()
root
 |-- idhod: integer (nullable = true)
 |-- hodnota: integer (nullable = true)
 |-- stapro_kod: integer (nullable = true)
 |-- DRUHZVIRE_cis: integer (nullable = true)
 |-- DRUHZVIRE_kod: integer (nullable = true)
 |-- refobdobi: integer (nullable = true)
 |-- rok: integer (nullable = true)
 |-- uzemi_cis: integer (nullable = true)
 |-- uzemi_kod: integer (nullable = true)
 |-- STAPRO_TXT: string (nullable = true)
 |-- uzemi_txt: string (nullable = true)
 |-- DRUHZVIRE_txt: string (nullable = true)

Generally, it is good practice to always define the schema manually, not only for the performance reasons, but also to prevent potential errors.

val schema = StructType(
  Seq(
    StructField("idhod", StringType),
    StructField("hodnota", IntegerType),
    StructField("stapro_kod", StringType),
    StructField("DRUHZVIRE_cis", StringType),
    StructField("DRUHZVIRE_kod", StringType),
    StructField("refobdobi", DateType),
    StructField("rok", IntegerType),
    StructField("uzemi_cis", StringType),
    StructField("uzemi_kod", StringType),
    StructField("STAPRO_TXT", StringType),
    StructField("uzemi_txt", StringType),
    StructField("DRUHZVIRE_txt", StringType)
  )
)

val dfWithSchema = sparkSession.read
  .schema(schema)
  .option("header", true)
  .option("dateFormat", "yyyyMMdd")
  .csv("data/zvirata.csv")

Since the dataset contains a date in a non-standard format, it is also necessary to provide Spark a mask using the dateFormat parameter.

dfWithSchema.printSchema()
root
 |-- idhod: string (nullable = true)
 |-- hodnota: integer (nullable = true)
 |-- stapro_kod: string (nullable = true)
 |-- DRUHZVIRE_cis: string (nullable = true)
 |-- DRUHZVIRE_kod: string (nullable = true)
 |-- refobdobi: date (nullable = true)
 |-- rok: integer (nullable = true)
 |-- uzemi_cis: string (nullable = true)
 |-- uzemi_kod: string (nullable = true)
 |-- STAPRO_TXT: string (nullable = true)
 |-- uzemi_txt: string (nullable = true)
 |-- DRUHZVIRE_txt: string (nullable = true)

Applying a simple filter and subsequently saving to a file is again a matter of a few lines. Using the coalesce option, the number of resulting files can be reduced, in this case to one.

dfWithSchema
  .filter(col("rok") === lit("2018"))
  .coalesce(1)
  .write
  .option("header", true)
  .mode("overwrite")
  .csv("data/zvirata_2018")

The entire project with examples is available on github.

Leave a reply

Your email address will not be published. Required fields are marked *