Adding Quality Checks to Data Pipelines in Snowflake
using Data Metric Functions
Data pipeline observability in Snowflake makes it easy to check health and performance metrics. Are all steps in my pipeline running? Are they running as fast as expected? What caused a failure or a delay?
However, an efficient and reliable pipeline is not worth much if the data itself is incomplete or inaccurate. Data quality is the third, equally important dimension for data engineers to monitor and get notified at certain thresholds just like for failures and delays.
Snowflake’s new Data Metric Functions (DMFs) provide a native solution to run a wide range of quality checks on your data. You can use Snowflake’s growing library of system DMFs or create custom User Defined Metric Functions (UDMFs) with your own logic and thresholds.
For data engineers it is often essential to integrate them as “quality gates” within pipelines, rather than running quality checks independently. This ensures that processing does not continue with bad data, saving time, resources, and protecting downstream datasets.
Using Snowflake’s Tasks, a native orchestration capability, you can schedule, modularize, and orchestrate your processing steps by connecting multiple Tasks into a Task Graph (aka DAG). Each Task can run a variety of actions (Python, Java, SQL-scripts, functions, stored procedures, notebooks, etc.), optionally triggered by defined conditions. Since Tasks can also run Data Metric Functions, you can easily integrate data quality checks into new or existing ELT pipelines.
With the following 6 steps we will set up a simple ELT data pipeline based on data quality checks that you can easily apply to your existing or next Task pipeline.
Step 1: Setting up a demo data ingestion stream
For simplicity we will just use the ACCOUNTADMIN role for this demo setup. If you don’t have it or want to use a separate role for this demo, you can check the Appendix at the end to grant all required privileges.
use role ACCOUNTADMIN;
create warehouse if not exists DEX_WH
warehouse_size = XSMALL
auto_suspend = 2;create database if not exists DEX_DB;
create schema if not exists DEX_DB.DEMO;
All following code will run in the context of this DEMO schema. So make sure you keep the context or use your own schema and warehouse.
Just to have a live demo we will first set up a Task that loads new rows into our source table to simulate a continuous ingestion. In your case that could be input from a user interface, or something like sensor-data or analytics from a connector or some other database.
We will use some free weather data from the Snowflake Marketplace:
- Go to Snowflake Marketplace
- Get the free “Weather Source LLC: frostbyte” data share
(This data may be used in connection with the Snowflake Quickstart, but is provided solely by WeatherSource, and not by or on behalf of Snowflake.) - Under “Options” rename the shared database “DEMO_WEATHER_DATA” just to shorten it
Now we can run the script below to create a Task that continuously loads small batches of data into a source table, while intentionally adding some quality issues to it.
--- copy a sample of the data share into a new table
create or replace table ALL_WEATHER_DATA
as
select
ROW_NUMBER() over (order by DATE_VALID_STD desc, POSTAL_CODE) as ROW_ID,
DATE_VALID_STD as DS,
POSTAL_CODE as ZIPCODE,
MIN_TEMPERATURE_AIR_2M_F as MIN_TEMP_IN_F,
AVG_TEMPERATURE_AIR_2M_F as AVG_TEMP_IN_F,
MAX_TEMPERATURE_AIR_2M_F as MAX_TEMP_IN_F,
from
DEMO_WEATHER_DATA.ONPOINT_ID.HISTORY_DAY
where
COUNTRY = 'US'
order by
DATE_VALID_STD desc,
POSTAL_CODE
limit
100000;
--- continuously growing table with weather data as "external data source"
create or replace table CONTINUOUS_WEATHER_DATA(
ROW_ID number,
INSERTED timestamp,
DS date,
ZIPCODE varchar,
MIN_TEMP_IN_F number,
AVG_TEMP_IN_F number,
MAX_TEMP_IN_F number
);
create or replace task ADD_WEATHER_DATA_TO_SOURCE
warehouse = 'DEX_WH'
schedule = '5 minutes'
comment = 'adding 10 rows of weather data every 5 minutes and adding occasional anomalies'
as
begin
if (
(select
count(*)
from
ALL_WEATHER_DATA A
left join
CONTINUOUS_WEATHER_DATA C
ON A.ROW_ID = C.ROW_ID
where
C.ROW_ID is NULL
) != 0 )
then
delete from CONTINUOUS_WEATHER_DATA;
end if;
insert into CONTINUOUS_WEATHER_DATA (
ROW_ID,
INSERTED,
DS,
ZIPCODE,
MIN_TEMP_IN_F,
AVG_TEMP_IN_F,
MAX_TEMP_IN_F
)
select
A.ROW_ID,
current_timestamp() as INSERTED,
A.DS,
A.ZIPCODE as ZIPCODE,
A.MIN_TEMP_IN_F,
A.AVG_TEMP_IN_F,
case when uniform(1, 100, random()) != 1 then A.MAX_TEMP_IN_F else A.MAX_TEMP_IN_F * 8 end as MAX_TEMP_IN_F
from
ALL_WEATHER_DATA A
left join
CONTINUOUS_WEATHER_DATA C
ON A.ROW_ID = C.ROW_ID
where
C.ROW_ID is NULL
limit
10;
end;alter task ADD_WEATHER_DATA_TO_SOURCE resume;
Step 2: Setting up the demo transformation pipeline
For this demo setup we will use 4 tables:
- Source table — where new data comes in
- Landing table — where we load the new batch and run the quality checks on it
- Target table — for all “clean” data that meets expectations
- Quarantine table — for all “bad” data that failed expectations
The source table was already created in Step 2. So we now create the other three:
create or replace table RAW_WEATHER_DATA (
ROW_ID number,
INSERTED timestamp,
DS date,
ZIPCODE varchar,
MIN_TEMP_IN_F number,
AVG_TEMP_IN_F number,
MAX_TEMP_IN_F number
)
comment = 'Demo Landing table'
;
create or replace table CLEAN_WEATHER_DATA (
DS date,
ZIPCODE varchar,
MIN_TEMP_IN_F number,
AVG_TEMP_IN_F number,
MAX_TEMP_IN_F number
)
comment = 'Demo Target table'
;create or replace table QUARANTINED_WEATHER_DATA (
INSERTED timestamp,
DS date,
ZIPCODE varchar,
MIN_TEMP_IN_F number,
AVG_TEMP_IN_F number,
MAX_TEMP_IN_F number
)
comment = 'Demo Quarantine table'
;
Now we can build a Task Graph that automatically runs whenever new data is added to the source table. First we set up a Stream on the source table CONTINUOUS_WEATHER_DATA:
create or replace stream NEW_WEATHER_DATA
on table CONTINUOUS_WEATHER_DATA
append_only = TRUE
comment = 'checking for new weather data coming in'
;
Next we create the first Task to insert all new rows from the Stream into the landing table RAW_WEATHER_TABLE as soon as new data is available.
create or replace task LOAD_RAW_DATA
warehouse = 'DEX_WH'
when
SYSTEM$STREAM_HAS_DATA('NEW_WEATHER_DATA')
as
declare
ROWS_LOADED number;
RESULT_STRING varchar;
begin
insert into RAW_WEATHER_DATA ( -- our landing table
ROW_ID,
INSERTED,
DS,
ZIPCODE,
MIN_TEMP_IN_F,
AVG_TEMP_IN_F,
MAX_TEMP_IN_F
)
select
ROW_ID,
INSERTED,
DS,
ZIPCODE,
MIN_TEMP_IN_F,
AVG_TEMP_IN_F,
MAX_TEMP_IN_F
from
NEW_WEATHER_DATA -- our source table
;
--- to see the number of rows loaded in the IU
ROWS_LOADED := (select $1 from table(RESULT_SCAN(LAST_QUERY_ID())));
RESULT_STRING := :ROWS_LOADED||' rows loaded into RAW_WEATHER_DATA';
--- show result string as task return value
call SYSTEM$SET_RETURN_VALUE(:RESULT_STRING);
end;
We use the return value here to show the number of rows that were loaded for each run in the Snowsight UI.
Task 2: Transformation
This second task will run directly after the first task and simulate a transformation of the new dataset. In your case this might be much more complex. For our demo we keep it simple and just filter for the hot days with an average temperature over 68°F.
Once the new data is inserted into the target table CLEAN_WEATHER_DATA we empty the landing table again.
create or replace task TRANSFORM_DATA
warehouse = 'DEX_WH'
after
LOAD_RAW_DATA
as
begin
insert into CLEAN_WEATHER_DATA (
INSERTED,
DS,
ZIPCODE,
MIN_TEMP_IN_F,
AVG_TEMP_IN_F,
MAX_TEMP_IN_F
)
select
INSERTED,
DS,
ZIPCODE,
MIN_TEMP_IN_F,
AVG_TEMP_IN_F,
MAX_TEMP_IN_F
from
RAW_WEATHER_DATA
where
AVG_TEMP_IN_F > 68
;
delete from RAW_WEATHER_DATA;
end;
-- lets just add one more to indicate the potential for further steps
create or replace task MORE_TRANSFORMATION
warehouse = 'DEX_WH'
after
TRANSFORM_DATA
as
select
count(*)
from
CLEAN_WEATHER_DATA
;-- resume all Tasks of the graph
select SYSTEM$TASK_DEPENDENTS_ENABLE('LOAD_RAW_DATA');
Let’s switch to the Task Graph view of our Tasks to see the graph we created and check the run history to see if everything runs as expected.
Step 3: Assigning quality checks to the landing table
First let’s have a look at all system Data Metric Functions that are already available by default. We can see them in Snowsight as Functions under the SNOWFLAKE.CORE schema or alternatively list all DMFs in the account that our role is allowed to see by running
show data metric functions in account;
System DMFs on the left in the Object Explorer and all DMFs combined in the query result
Now for our specific Demo dataset we want to also add a range-check to make sure that our temperature values are plausible and further data analysis from consumers downstream is not impacted by unrealistic values caused by faulty sensors.
For that we can write a UDMF (user-defined Data Metric Function) defining a range of plausible Fahrenheit values:
create or replace data metric function CHECK_FARENHEIT_PLAUSIBLE(
TABLE_NAME table(
COLUMN_VALUE number
)
)
returns NUMBER
as
$$
select
count(*)
from
TABLE_NAME
where
COLUMN_VALUE is not NULL
and COLUMN_VALUE not between -40 and 140
$$
;
Now we can assign our UDMF together with a few system DMFs to columns from our landing table:
-- always set the schedule first
alter table RAW_WEATHER_DATA
set DATA_METRIC_SCHEDULE = 'TRIGGER_ON_CHANGES';
--- assign DMFs to our RAW_WEATHER_DATA
alter table RAW_WEATHER_DATA
add data metric function SNOWFLAKE.CORE.DUPLICATE_COUNT on (ROW_ID);alter table RAW_WEATHER_DATA
add data metric function SNOWFLAKE.CORE.NULL_COUNT on (DS);alter table RAW_WEATHER_DATA
add data metric function SNOWFLAKE.CORE.NULL_COUNT on (ZIPCODE);-- add a custom DMF
alter table RAW_WEATHER_DATA
add data metric function CHECK_FARENHEIT_PLAUSIBLE on (MAX_TEMP_IN_F);
Step 4: Run DMFs as “Quality Gate” of the pipeline
Because we want our quality check Task to run all DMFs that are assigned to our landing table, even if we add or remove some DMFs later on, we don’t just want to call them explicitly from the Task.
Instead we build us a helper function to modularize our code for scalability.
The function (UDTF) will accept a table name as argument and return all DMFs that are currently assigned to a column of this table.
create or replace function GET_ACTIVE_QUALITY_CHECKS("TABLE_NAME" VARCHAR)
returns table(DMF VARCHAR, COL VARCHAR)
language SQL
as
$$
select
t1.METRIC_DATABASE_NAME||'.'||METRIC_SCHEMA_NAME||'.'||METRIC_NAME as DMF,
REF.value:name ::string as COL
from
table(
INFORMATION_SCHEMA.DATA_METRIC_FUNCTION_REFERENCES(
REF_ENTITY_NAME => TABLE_NAME,
REF_ENTITY_DOMAIN => 'table'
)) as t1,
table(flatten(input => parse_json(t1.REF_ARGUMENTS))) as REF
where
SCHEDULE_STATUS = 'STARTED'
$$
;
Before we call it within the Task, let’s test run it first:
select DMF, COL from table(GET_ACTIVE_QUALITY_CHECKS('RAW_WEATHER_DATA'));
Now we can define a new Task to get all DMFs from this function and then check them all. We store the result of each check in a TEST_RESULT variable and then sum up all test results in a RESULTS_SUMMARY variable. This will give us the total number of issues found from all checks and we can pass it on to the return value as output of this Task.
If our RESULT_SUMMARY remains ‘0’ then we know all checks have passed.
-- suspend the graph so we can make changes
alter task LOAD_RAW_DATA suspend;
-- new task to run all DMFs on the landing table
create or replace task CHECK_DATA_QUALITY
warehouse = 'DEX_WH'
after
LOAD_RAW_DATA
as
declare
TEST_RESULT number;
RESULTS_SUMMARY number default 0;
RESULT_STRING varchar;
c1 CURSOR for
--- get all DMFs and columns for active quality checks on this table by using the custom function
select DMF, COL from table(GET_ACTIVE_QUALITY_CHECKS('DEX_DB.DEMO.RAW_WEATHER_DATA'));
begin
OPEN c1;
--- looping throught all DMFs assigned to the table
for REC in c1 DO --- manually run the DMF
execute immediate 'select '||REC.DMF||'(select '||REC.COL||' from RAW_WEATHER_DATA);'; ---get the test result
TEST_RESULT := (select $1 from table(RESULT_SCAN(LAST_QUERY_ID())));
-- Construct the results summary: if check did not pass then add issues to the counter
if (:TEST_RESULT != 0)
then RESULTS_SUMMARY := (:RESULTS_SUMMARY + :TEST_RESULT);
end if;
end for;
CLOSE c1; --- construct result-string to act as condition for downstream tasks and to show number of quality issues found
RESULT_STRING := (:RESULTS_SUMMARY||' separate quality issues found in table RAW_WEATHER_DATA');
case when :RESULTS_SUMMARY = 0
then
call SYSTEM$SET_RETURN_VALUE('✅ All quality checks on RAW_WEATHER_DATA passed');
else
call SYSTEM$SET_RETURN_VALUE(:RESULT_STRING);
end;
end;
Now we just have to update our downstream transformation tasks to run AFTER the new quality check task. And we are adding a condition to run ONLY if all quality checks have passed. For that we can use the Task return value as a condition.
-- changing transformation task to now run after quality checks on only if all checks passed
alter task TRANSFORM_DATA remove after LOAD_RAW_DATA;
alter task TRANSFORM_DATA add after CHECK_DATA_QUALITY;alter task TRANSFORM_DATA modify when SYSTEM$GET_PREDECESSOR_RETURN_VALUE('CHECK_DATA_QUALITY') = '✅ All quality checks on RAW_WEATHER_DATA passed';-- resume all Tasks of the graph
select SYSTEM$TASK_DEPENDENTS_ENABLE('LOAD_RAW_DATA');
Okay, let’s switch back to the “Graph” tab of our Task to see the updated graph and check the run history:
Step 5: Isolate datasets with quality issues
Now we could just completely ignore the new dataset, clear the landing table and wait for the next one. More likely though we want to analyze that dataset and try to fix the data quality issues. To do that later we will first isolate this batch into our quarantine table.
So we add another Task to our graph and invert ( != ) the condition so that it only runs when a quality check failed:
-- suspend the graph so we can make changes
alter task LOAD_RAW_DATA suspend;
create or replace task ISOLATE_DATA_ISSUES
comment = 'isolate bad rows and clear landing table'
warehouse = 'DEX_WH'
after
CHECK_DATA_QUALITY
when
SYSTEM$GET_PREDECESSOR_RETURN_VALUE('CHECK_DATA_QUALITY') != '✅ All quality checks on RAW_WEATHER_DATA passed'
as
begin
insert into QUARANTINED_WEATHER_DATA (
DS,
ZIPCODE,
MIN_TEMP_IN_F,
AVG_TEMP_IN_F,
MAX_TEMP_IN_F
)
select
DS,
ZIPCODE,
MIN_TEMP_IN_F,
AVG_TEMP_IN_F,
MAX_TEMP_IN_F
from
RAW_WEATHER_DATA
;
delete from RAW_WEATHER_DATA;
end;
-- resume all Tasks of the graph
select SYSTEM$TASK_DEPENDENTS_ENABLE('LOAD_RAW_DATA');
Let’s check again our Graph view to confirm:
Now we can let this run, knowing that all batches with quality issues will be isolated and all batches that are good will be transformed further. Since we can not predict if and when this might happen, we want to finish this demo by adding a notification in case of quality issues.
Step 6: Add notification about quality issues
Let us add another Task to our graph to send a notification when quality issues have been detected and rows were isolated. But maybe we know our data is not perfect and we don’t want to get a notification every single time.
So let’s use DMFs one more time to define a threshold and notify only when more than 1% of our our total weather data was quarantined. First we create a new UDMF to compare the number of rows in the quarantine table to those in the target table:
create or replace data metric function DEMO.OVER_1PCT_ISOLATED_ROWS(
TABLE_NAME table(
DS date
)
)
returns NUMBER
as
$$
select
case
when (select count(*) from QUARANTINED_WEATHER_DATA) > (select count(*) from CLEAN_WEATHER_DATA)
then 1
else
case when
(select count(*) from QUARANTINED_WEATHER_DATA) * 100.0 /
(select count(*) from CLEAN_WEATHER_DATA) > 1
then 1
else 0
end
end
$$
;
Now we assign it to the quarantine table:
-- always set the schedule first
alter table QUARANTINED_WEATHER_DATA
set DATA_METRIC_SCHEDULE = 'TRIGGER_ON_CHANGES';
-- assign UDMF to QUARANTINED_WEATHER_DATA
alter table QUARANTINED_WEATHER_DATA
add data metric function OVER_1PCT_ISOLATED_ROWS on (DS);-- add a row-count system DMF for additional context
alter table QUARANTINED_WEATHER_DATA
add data metric function SNOWFLAKE.CORE.ROW_COUNT on ();
And now we can create another task that runs only if new rows were isolated and then checks if they surpass the 1% threshold and only then sends us a notification.
create or replace task NOTIFY_ABOUT_QUALITY_ISSUE
warehouse = 'DEX_WH'
after
ISOLATE_DATA_ISSUES
as
declare
TEST_RESULT integer;
begin
TEST_RESULT := (select OVER_1_PERCENT from(
select OVER_1PCT_ISOLATED_ROWS( select DS from QUARANTINED_WEATHER_DATA)as OVER_1_PERCENT
)
); case when :TEST_RESULT > 0 then
call SYSTEM$SEND_SNOWFLAKE_NOTIFICATION(
SNOWFLAKE.NOTIFICATION.TEXT_HTML(
'More than 1 percent of new weather data was quarantined due to data quality issues.' -- my html message for emails
),
SNOWFLAKE.NOTIFICATION.EMAIL_INTEGRATION_CONFIG(
'YOUR_EMAIL_NOTIFICATION_INTEGRATION', -- email integration
'Snowflake DEMO Pipeline Alert', -- email header
ARRAY_CONSTRUCT('YOUR_EMAIL_HERE') -- validated user email addresses
)
); call SYSTEM$SET_RETURN_VALUE('Over 1% bad rows. Notification sent to YOUR_EMAIL_NOTIFICATION_INTEGRATION');
else
call SYSTEM$SET_RETURN_VALUE('Less than 1% bad rows. No notification sent.');
end;
end;
-- resume all Tasks of the graph
select SYSTEM$TASK_DEPENDENTS_ENABLE('LOAD_RAW_DATA');
With this dependency setup we are also reducing redundant notifications, as they will only trigger when new quality issues are detected and the percentage of bad rows is still above 1%.
Let’s switch back to the Task Graph view to see all pieces coming together:
Once our Task Graph had a few runs we can now also see the 2 different paths that can occur.
Navigate to Monitoring / Task History and filter to our DEX_DB/DEMO schema and our LOAD_RAW_DATA root task to see the history of graph runs. We can see they are all successful, as they are handling both cases (quality checks passed or failed).
Selecting a run from the History list we will mostly see this result (checks passed and data was processed):
With a few occasional runs that did detect quality issues and isolated the dataset instead.
…and our email inbox should see a few of these:
Now make it yours!
While this setup should be generic enough for you to apply to your existing ELT Task graphs there are many opportunities for you to further customize and automate this according to your needs.
- You can start by writing and running your own DMFs.
- You can customize the notifications logic and message content.
- Or you can automatically process the isolated rows by adding more Tasks to the isolated data branch of the graph that can delete, sanitize or extrapolate data and then merge it back into the clean-data table.
- Or we add a Streamlit App with a data-editor for a data expert to manually review and correct the isolated rows before merging them…