Apache Spark / Procedural SQL
Apache Spark 4.0 introduces a significant enhancement to its SQL capabilities: experimental support for procedural language. This allows developers to write more complex, stateful logic, including variables, loops, and dynamic statement execution, directly within a single SQL block. This moves Spark SQL closer to the functionality of traditional database stored procedures and can greatly simplify common ETL patterns.
Let’s explore this with a practical example of a metadata-driven incremental data loading process.
Imagine a common ETL scenario: we have a source table with raw events and a target table for processed events. To avoid reprocessing the entire dataset each time, we use a metadata table to keep track of the last successfully processed record’s ID.
The entire setup can be done in a single procedural block:
BEGIN
CREATE TABLE raw_events (
id INT
, event_data STRING
, event_timestamp TIMESTAMP
) USING delta;
CREATE TABLE processed_events AS SELECT * FROM raw_events;
INSERT INTO raw_events VALUES
(1, '{"user": "a", "action": "login"}', to_timestamp('2025-06-22 19:00:00'))
, (2, '{"user": "b", "action": "click"}', to_timestamp('2025-06-22 19:01:00'))
, (3, '{"user": "a", "action": "logout"}', to_timestamp('2025-06-22 19:02:00'))
;
CREATE TABLE etl_metadata (
source_name STRING
, target_name STRING
, last_processed_id INT
) USING delta;
INSERT INTO etl_metadata (source_name, target_name, last_processed_id)
VALUES ('raw_events', 'processed_events', 0)
;
SELECT * FROM etl_metadata;
END
Now, for the core of our ETL logic. We need to read the etl_metadata table, insert new records from raw_events into processed_events, and finally, update the last_processed_id in our metadata table with the latest ID we’ve processed.
Before Spark 4.0, this would require multiple separate steps: reading metadata into a DataFrame, constructing and running the insert query, running another query to get the new maximum ID, and a final query to update the metadata.
With procedural SQL, we can encapsulate this entire workflow into a single, atomic block.
BEGIN
DECLARE new_max_id INT;
DECLARE get_new_max_id_stmt STRING;
DECLARE insert_stmt STRING;
DECLARE update_stmt STRING;
FOR metadata AS
(SELECT source_name, target_name, last_processed_id FROM etl_metadata)
DO
-- simplified logic
SET insert_stmt = 'INSERT INTO ' || metadata.target_name ||
' SELECT * FROM ' || metadata.source_name ||
' WHERE id > ' || metadata.last_processed_id;
EXECUTE IMMEDIATE insert_stmt;
SET get_new_max_id_stmt = 'SELECT MAX(id) FROM ' || metadata.source_name;
EXECUTE IMMEDIATE get_new_max_id_stmt INTO new_max_id;
IF new_max_id IS NOT NULL THEN
UPDATE etl_metadata
SET last_processed_id = new_max_id
WHERE source_name = metadata.source_name
AND target_name = metadata.target_name
;
END IF;
END FOR;
SELECT * FROM etl_metadata;
END
This single spark.sql() call performs the entire multi-step process. The FOR…DO loop iterates through our metadata rows, and EXECUTE IMMEDIATE allows us to run dynamically constructed SQL statements. The logic is clear, self-contained, and executed as a single unit.
The introduction of procedural SQL in Spark 4.0 is a powerful new tool. It can help streamline complex data manipulation tasks and may offer new ways to approach building data pipelines in Spark.
The entire project with examples is available on github.