Apache Spark / Procedural SQL

Apache Spark 4.0 introduces a significant enhancement to its SQL capabilities: experimental support for procedural language. This allows developers to write more complex, stateful logic, including variables, loops, and dynamic statement execution, directly within a single SQL block. This moves Spark SQL closer to the functionality of traditional database stored procedures and can greatly simplify common ETL patterns.

Let’s explore this with a practical example of a metadata-driven incremental data loading process.

Imagine a common ETL scenario: we have a source table with raw events and a target table for processed events. To avoid reprocessing the entire dataset each time, we use a metadata table to keep track of the last successfully processed record’s ID.

The entire setup can be done in a single procedural block:

BEGIN
  CREATE TABLE raw_events (
      id INT
    , event_data STRING
    , event_timestamp TIMESTAMP
  ) USING delta;

  CREATE TABLE processed_events AS SELECT * FROM raw_events;

  INSERT INTO raw_events VALUES
      (1, '{"user": "a", "action": "login"}', to_timestamp'2025-06-22 19:00:00'))
    , (2, '{"user": "b", "action": "click"}', to_timestamp'2025-06-22 19:01:00'))
    , (3, '{"user": "a", "action": "logout"}', to_timestamp'2025-06-22 19:02:00'))
  ;

  CREATE TABLE etl_metadata (
      source_name STRING
    , target_name STRING
    , last_processed_id INT
  ) USING delta;

  INSERT INTO etl_metadata (source_name, target_name, ast_processed_id)
    VALUES ('raw_events', 'processed_events', 0)
  ;

  SELECT * FROM etl_metadata;
END

Now, for the core of our ETL logic. We need to read the etl_metadata table, insert new records from raw_events into processed_events, and finally, update the last_processed_id in our metadata table with the latest ID we’ve processed.

Before Spark 4.0, this would require multiple separate steps: reading metadata into a DataFrame, constructing and running the insert query, running another query to get the new maximum ID, and a final query to update the metadata.

With procedural SQL, we can encapsulate this entire workflow into a single, atomic block.

BEGIN
  DECLARE new_max_id INT;
  DECLARE get_new_max_id_stmt STRING;
  DECLARE insert_stmt STRING;
  DECLARE update_stmt STRING;

  FOR metadata AS
    (SELECT source_name, target_name, last_processed_id FROM tl_metadata)
  DO
    -- simplified logic
    SET insert_stmt = 'INSERT INTO ' || metadata.target_name |
                      '  SELECT * FROM ' || etadata.source_name ||
                      '  WHERE id > ?';
    EXECUTE IMMEDIATE insert_stmt USING etadata.last_processed_id;

    SET get_new_max_id_stmt = 'SELECT MAX(id) FROM ' || etadata.source_name;
    EXECUTE IMMEDIATE get_new_max_id_stmt INTO new_max_id;

    IF new_max_id IS NOT NULL THEN
      UPDATE etl_metadata
        SET last_processed_id = new_max_id
        WHERE source_name = metadata.source_name
          AND target_name = metadata.target_name
      ;
    END IF;
  END FOR;

  SELECT * FROM etl_metadata;
END

This single spark.sql() call performs the entire multi-step process. The FORDO loop iterates through our metadata rows, and EXECUTE IMMEDIATE allows us to run dynamically constructed SQL statements. The logic is clear, self-contained, and executed as a single unit.

The introduction of procedural SQL in Spark 4.0 is a powerful new tool. It can help streamline complex data manipulation tasks and may offer new ways to approach building data pipelines in Spark.

Leave a reply

Your email address will not be published. Required fields are marked *