Apache Spark / Procedural SQL
Apache Spark 4.0 introduces a significant enhancement to its SQL capabilities: experimental support for procedural language. This allows developers to write more complex, stateful logic, including variables, loops, and dynamic statement execution, directly within a single SQL block. This moves Spark SQL closer to the functionality of traditional database stored procedures and can greatly simplify common ETL patterns.
Let’s explore this with a practical example of a metadata-driven incremental data loading process.
Imagine a common ETL scenario: we have a source table with raw events and a target table for processed events. To avoid reprocessing the entire dataset each time, we use a metadata table to keep track of the last successfully processed record’s ID.
The entire setup can be done in a single procedural block:
BEGIN
CREATE TABLE raw_events (
id INT
, event_data STRING
, event_timestamp TIMESTAMP
) USING delta;
CREATE TABLE processed_events AS SELECT * FROM raw_events;
INSERT INTO raw_events VALUES
(1, '{"user": "a", "action": "login"}', to_timestamp'2025-06-22 19:00:00'))
, (2, '{"user": "b", "action": "click"}', to_timestamp'2025-06-22 19:01:00'))
, (3, '{"user": "a", "action": "logout"}', to_timestamp'2025-06-22 19:02:00'))
;
CREATE TABLE etl_metadata (
source_name STRING
, target_name STRING
, last_processed_id INT
) USING delta;
INSERT INTO etl_metadata (source_name, target_name, ast_processed_id)
VALUES ('raw_events', 'processed_events', 0)
;
SELECT * FROM etl_metadata;
END
Now, for the core of our ETL logic. We need to read the etl_metadata table, insert new records from raw_events into processed_events, and finally, update the last_processed_id in our metadata table with the latest ID we’ve processed.
Before Spark 4.0, this would require multiple separate steps: reading metadata into a DataFrame, constructing and running the insert query, running another query to get the new maximum ID, and a final query to update the metadata.
With procedural SQL, we can encapsulate this entire workflow into a single, atomic block.
BEGIN
DECLARE new_max_id INT;
DECLARE get_new_max_id_stmt STRING;
DECLARE insert_stmt STRING;
DECLARE update_stmt STRING;
FOR metadata AS
(SELECT source_name, target_name, last_processed_id FROM tl_metadata)
DO
-- simplified logic
SET insert_stmt = 'INSERT INTO ' || metadata.target_name |
' SELECT * FROM ' || etadata.source_name ||
' WHERE id > ?';
EXECUTE IMMEDIATE insert_stmt USING etadata.last_processed_id;
SET get_new_max_id_stmt = 'SELECT MAX(id) FROM ' || etadata.source_name;
EXECUTE IMMEDIATE get_new_max_id_stmt INTO new_max_id;
IF new_max_id IS NOT NULL THEN
UPDATE etl_metadata
SET last_processed_id = new_max_id
WHERE source_name = metadata.source_name
AND target_name = metadata.target_name
;
END IF;
END FOR;
SELECT * FROM etl_metadata;
END
This single spark.sql() call performs the entire multi-step process. The FOR…DO loop iterates through our metadata rows, and EXECUTE IMMEDIATE allows us to run dynamically constructed SQL statements. The logic is clear, self-contained, and executed as a single unit.
The introduction of procedural SQL in Spark 4.0 is a powerful new tool. It can help streamline complex data manipulation tasks and may offer new ways to approach building data pipelines in Spark.