Apache Spark / Proceduralní SQL

Apache Spark 4.0 přináší významné rozšíření možností jazyka SQL v podobě experimentální podpory procedurálního programování. Tato funkcionalita umožňuje vývojářům implementovat komplexní stavovou logiku, včetně práce s proměnnými, cykly a dynamickým prováděním příkazů, přímo v rámci jednoho SQL bloku. Spark SQL se tak funkčně přibližuje tradičním uloženým procedurám, což může výrazně zjednodušit běžné ETL postupy.

Pojďme si celou problematiku ukázat na praktickém příkladu inkrementálního načítání dat řízeného metadaty.

Představte si běžný ETL scénář, kde máme zdrojovou tabulku s událostmi a cílovou tabulku pro výsledná data. Abychom se vyhnuli procházení celého datasetu pokaždé znovu, využijeme tabulku metadat ke sledování ID posledního úspěšně zpracovaného záznamu.

Příprava prostředí, zahrnující vytvoření tabulek a vložení dat, může být realizována v jediném procedurálním bloku.

BEGIN
  CREATE TABLE raw_events (
      id INT
    , event_data STRING
    , event_timestamp TIMESTAMP
  ) USING delta;

  CREATE TABLE processed_events AS SELECT * FROM raw_events;

  INSERT INTO raw_events VALUES
      (1, '{"user": "a", "action": "login"}', to_timestamp'2025-06-22 19:00:00'))
    , (2, '{"user": "b", "action": "click"}', to_timestamp'2025-06-22 19:01:00'))
    , (3, '{"user": "a", "action": "logout"}', to_timestamp'2025-06-22 19:02:00'))
  ;

  CREATE TABLE etl_metadata (
      source_name STRING
    , target_name STRING
    , last_processed_id INT
  ) USING delta;

  INSERT INTO etl_metadata (source_name, target_name, ast_processed_id)
    VALUES ('raw_events', 'processed_events', 0)
  ;

  SELECT * FROM etl_metadata;
END

Jádro logiky spočívá v načtení tabulky etl_metadata, vložení nových záznamů z raw_events do processed_events a následné aktualizaci hodnoty last_processed_id v metadatech podle nejnovějšího zpracovaného ID.

Před verzí Spark 4.0 vyžadoval tento postup několik separátních kroků: načtení metadat do DataFrame, sestavení a spuštění insert dotazu, následné získání nového maxima a finální aktualizaci metadat.

S využitím procedurálního SQL lze celý tento proces zapouzdřit do jediného atomického bloku. Následující kód ukazuje deklaraci proměnných, sestavení dynamického SQL a řízení toku programu.

BEGIN
  DECLARE new_max_id INT;
  DECLARE get_new_max_id_stmt STRING;
  DECLARE insert_stmt STRING;
  DECLARE update_stmt STRING;

  FOR metadata AS
    (SELECT source_name, target_name, last_processed_id FROM tl_metadata)
  DO
    -- simplified logic
    SET insert_stmt = 'INSERT INTO ' || metadata.target_name |
                      '  SELECT * FROM ' || etadata.source_name ||
                      '  WHERE id > ?';
    EXECUTE IMMEDIATE insert_stmt USING etadata.last_processed_id;

    SET get_new_max_id_stmt = 'SELECT MAX(id) FROM ' || etadata.source_name;
    EXECUTE IMMEDIATE get_new_max_id_stmt INTO new_max_id;

    IF new_max_id IS NOT NULL THEN
      UPDATE etl_metadata
        SET last_processed_id = new_max_id
        WHERE source_name = metadata.source_name
          AND target_name = metadata.target_name
      ;
    END IF;
  END FOR;

  SELECT * FROM etl_metadata;
END

Toto jediné volání spark.sql() vykoná celý vícekrokový proces. Za povšimnutí stojí především smyčka FORDO iterující skrze řádky metadat a příkaz EXECUTE IMMEDIATE, který umožňuje spouštění dynamicky složených SQL příkazů. Logika je tak přehledná, ucelená a vykonávaná jako samostatná jednotka.

Podpora procedurálního SQL ve verzi 4.0 dává vývojářům do rukou mocnou zbraň. Má potenciál zefektivnit složité úlohy manipulace s daty a nabídnout nové způsoby, jak přistupovat k budování datových pipeline ve Sparku.

Zanechte komentář

Vaše emailová adresa nebude zobrazena. Povinná pole jsou označena *