Skip to content

Commit d0a6c76

Browse files
author
Mike Uzan
committed
v1
1 parent 3040a78 commit d0a6c76

7 files changed

+239
-1
lines changed

1-setup-vignette.sql

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*--------------------------------------------------------------------------------
2+
3+
Full GitHub project available here : https://github.com/kromozome2003/Snowflake-Json-DataPipeline
4+
5+
Building Data Pipelines using Snowflake Streams and Tasks
6+
Online documentation : https://docs.snowflake.net/manuals/user-guide/data-pipelines-intro.html
7+
8+
Data pipelines automate many of the manual steps involved in transforming and optimizing continuous data loads.
9+
Frequently, the “raw” data is first loaded temporarily into a staging table used for interim storage and then
10+
transformed using a series of SQL statements before it is inserted into the destination reporting tables.
11+
The most efficient workflow for this process involves transforming only data that is new or modified.
12+
13+
In this tutorial we are going to create the following Data Pipeline :
14+
1- Manually ingest JSON files to a raw_json_table table without any transformation (only 1 VARIANT column)
15+
2- Extract RAW json data from raw_json_table and structure into a destination table (transformed_json_table)
16+
3- Aggregate data from transformed_json_table to a final destination table (final_table)
17+
18+
We will use 2 Warehouses :
19+
- load_wh for loading JSON data
20+
- task_wh to run the stored procedure that extract and transform the data between tables
21+
22+
This continuous Data Pipeline is incremental (no need to reload the entire table on every update) thanks to our Change Data Capture feature (CDC) :
23+
- changes over raw_json_table are tracked by the stream raw_data_stream
24+
- changed over transformed_json_table are tracked by the stream transformed_data_stream
25+
26+
All extract and transform operation are going to be processed through Stored Procedure called by a scheduled task :
27+
- The root task extract_json_data is scheduled every minute and call stored_proc_extract_json()
28+
- This stored procedure will look for INSERTS (from raw_data_stream) on the raw_json_table and then consume it to extract JSON data to transformed_json_table
29+
- The second task (aggregate_final_data) is not scheduled, as a workflow it run just after extract_json_data to call stored_proc_aggregate_final()
30+
- This stored procedure will look for INSERTS (from transformed_data_stream) on the transformed_json_table and then consume it to transform data to final_table
31+
32+
Mike Uzan - Senior SE (EMEA/France)
33+
34+
+33621728792
35+
36+
--------------------------------------------------------------------------------*/
37+
-- Context setting
38+
CREATE DATABASE CDPST;
39+
USE DATABASE CDPST;
40+
USE SCHEMA CDPST.PUBLIC;
41+
CREATE warehouse IF NOT EXISTS load_wh WITH warehouse_size = 'medium' auto_suspend = 60 initially_suspended = true;
42+
CREATE warehouse IF NOT EXISTS task_wh WITH warehouse_size = 'medium' auto_suspend = 60 initially_suspended = true;
43+
USE WAREHOUSE load_wh;
44+
45+
/*--------------------------------------------------------------------------------
46+
CLEAN UP
47+
--------------------------------------------------------------------------------*/
48+
DROP STREAM IF EXISTS raw_data_stream;
49+
DROP STREAM IF EXISTS transformed_data_stream;
50+
DROP TABLE IF EXISTS raw_json_table;
51+
DROP TABLE IF EXISTS transformed_json_table;
52+
DROP TABLE IF EXISTS final_table;
53+
DROP PROCEDURE IF EXISTS stored_proc_extract_json();
54+
DROP PROCEDURE IF EXISTS stored_proc_aggregate_final();
55+
ALTER TASK extract_json_data RESUME;
56+
ALTER TASK aggregate_final_data RESUME;
57+
DROP TASK IF EXISTS extract_json_data;
58+
DROP TASK IF EXISTS aggregate_final_data;
59+
DROP FILE FORMAT IF EXISTS CDPST.PUBLIC.JSON;
60+
DROP STAGE IF EXISTS CDPST.PUBLIC.cdpst_json_files;
61+
62+
-- Create a FILE FORMAT to parse JSON files
63+
CREATE FILE FORMAT CDPST.PUBLIC.JSON
64+
TYPE = 'JSON'
65+
COMPRESSION = 'AUTO'
66+
ENABLE_OCTAL = FALSE
67+
ALLOW_DUPLICATE = FALSE
68+
STRIP_OUTER_ARRAY = FALSE
69+
STRIP_NULL_VALUES = FALSE
70+
IGNORE_UTF8_ERRORS = FALSE;
71+
72+
-- Create a STAGE where to put our json files
73+
CREATE STAGE CDPST.PUBLIC.cdpst_json_files;
74+
75+
-- Lets put some Json files to your user stage : @~
76+
-- Put some files (in json-files folder) using SnowSQL or the GUI to @~/
77+
-- put 'file:///Path/to/your/github/project/json-files/weather*.json.gz' @CDPST.PUBLIC.cdpst_json_files;
78+
79+
-- Let's list the json files we uploaded
80+
LIST @CDPST.PUBLIC.cdpst_json_files;
81+
SELECT $1 FROM @CDPST.PUBLIC.cdpst_json_files/weather1.json.gz (FILE_FORMAT => CDPST.PUBLIC.JSON) LIMIT 5;
82+
83+
-- Create the RAW TRANSFORMED & FINAL tables
84+
CREATE OR REPLACE TABLE raw_json_table (v variant);
85+
CREATE OR REPLACE TABLE transformed_json_table (
86+
date timestamp_ntz,
87+
country string,
88+
city string,
89+
id string,
90+
temp_kel float,
91+
temp_min_kel float,
92+
temp_max_kel float,
93+
conditions string,
94+
wind_dir float,
95+
wind_speed float
96+
);
97+
CREATE OR REPLACE TABLE final_table (
98+
date timestamp_ntz,
99+
country string,
100+
city string,
101+
id string,
102+
temp_cel float,
103+
temp_min_cel float,
104+
temp_max_cel float,
105+
conditions string,
106+
wind_dir float,
107+
wind_speed float
108+
);
109+
110+
-- Create the Stream to monitor Data Changes against RAW table
111+
CREATE OR REPLACE STREAM raw_data_stream ON TABLE raw_json_table;
112+
113+
-- Create the Stream to monitor Data Changes against TRANSFORMED table
114+
CREATE OR REPLACE STREAM transformed_data_stream ON TABLE transformed_json_table;
115+
116+
-- List the streams
117+
SHOW STREAMS; -- see the stream in the list
118+
119+
-- Read the content of the newly created stream (should be empty as no data has been inserted yet)
120+
SELECT * FROM raw_data_stream;
121+
SELECT * FROM transformed_data_stream;
122+
123+
-- Create a stored procedure that will read the content of raw_data_stream and extract inserted json data to transformed_json_table
124+
create or replace procedure stored_proc_extract_json()
125+
returns string
126+
language javascript
127+
strict
128+
execute as owner
129+
as
130+
$$
131+
var sql_command = "INSERT INTO transformed_json_table (date,country,city,id,temp_kel,temp_min_kel,temp_max_kel,conditions,wind_dir,wind_speed)";
132+
sql_command += " SELECT";
133+
sql_command += " convert_timezone('UTC', 'Europe/Paris', v:time::timestamp_ntz) date,";
134+
sql_command += " v:city.country::string country,";
135+
sql_command += " v:city.name::string city,";
136+
sql_command += " v:city.id::string id,";
137+
sql_command += " v:main.temp::float temp_kel,";
138+
sql_command += " v:main.temp_min::float temp_min_kel,";
139+
sql_command += " v:main.temp_max::float temp_max_kel,";
140+
sql_command += " v:weather[0].main::string conditions,";
141+
sql_command += " v:wind.deg::float wind_dir,";
142+
sql_command += " v:wind.speed::float wind_speed";
143+
sql_command += " FROM raw_data_stream";
144+
sql_command += " WHERE metadata$action = 'INSERT';";
145+
try {
146+
snowflake.execute (
147+
{sqlText: sql_command}
148+
);
149+
return "JSON extracted."; // Return a success/error indicator.
150+
}
151+
catch (err) {
152+
return "Failed: " + err; // Return a success/error indicator.
153+
}
154+
$$
155+
;
156+
157+
-- Create a stored procedure that will read the content of the transformed_data_stream data and transform inserted data to aggregated_final_table
158+
CREATE OR REPLACE PROCEDURE stored_proc_aggregate_final()
159+
returns string
160+
language javascript
161+
strict
162+
execute as owner
163+
as
164+
$$
165+
var sql_command = "INSERT INTO final_table (date,country,city,id,temp_cel,temp_min_cel,temp_max_cel,conditions,wind_dir,wind_speed)";
166+
sql_command += " SELECT";
167+
sql_command += " date,";
168+
sql_command += " country,";
169+
sql_command += " city,";
170+
sql_command += " id,";
171+
sql_command += " temp_kel-273.15 temp_cel,";
172+
sql_command += " temp_min_kel-273.15 temp_min_cel,";
173+
sql_command += " temp_max_kel-273.15 temp_max_cel,";
174+
sql_command += " conditions,";
175+
sql_command += " wind_dir,";
176+
sql_command += " wind_speed";
177+
sql_command += " FROM transformed_data_stream";
178+
sql_command += " WHERE metadata$action = 'INSERT';";
179+
try {
180+
snowflake.execute (
181+
{sqlText: sql_command}
182+
);
183+
return "TRANSFORMED JSON - AGGREGATED."; // Return a success/error indicator.
184+
}
185+
catch (err) {
186+
return "Failed: " + err; // Return a success/error indicator.
187+
}
188+
$$
189+
;
190+
191+
-- Create a task to look for newly inserted RAW data every 1 minute
192+
CREATE OR REPLACE TASK extract_json_data warehouse = task_wh SCHEDULE = '1 minute' WHEN system$stream_has_data('raw_data_stream') AS CALL stored_proc_extract_json();
193+
194+
-- Create a sub-task to run after RAW data has been extracted (RUN AFTER)
195+
CREATE OR REPLACE TASK aggregate_final_data warehouse = task_wh AFTER extract_json_data WHEN system$stream_has_data('transformed_data_stream') AS CALL stored_proc_aggregate_final();
196+
197+
-- Resume task to make it run
198+
ALTER TASK extract_json_data RESUME;
199+
ALTER TASK aggregate_final_data RESUME;
200+
SHOW TASKS;
201+
202+
-- SWITCH TO "Stream & Tasks (DEMO)" VIGNETTE

2-load-vignette.sql

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*--------------------------------------------------------------------------------
2+
3+
Full GitHub project available here : https://github.com/kromozome2003/Snowflake-Json-DataPipeline
4+
5+
In this vignette you will insert data into the Continuous Data Pipeline and demonstrate that both extraction and aggregation are automatic and incremental.
6+
7+
Mike Uzan - Senior SE (EMEA/France)
8+
9+
+33621728792
10+
11+
--------------------------------------------------------------------------------*/
12+
13+
-- Context setting
14+
USE DATABASE CDPST;
15+
USE SCHEMA CDPST.PUBLIC;
16+
CREATE warehouse IF NOT EXISTS load_wh WITH warehouse_size = 'medium' auto_suspend = 60 initially_suspended = true;
17+
CREATE warehouse IF NOT EXISTS task_wh WITH warehouse_size = 'medium' auto_suspend = 60 initially_suspended = true;
18+
USE WAREHOUSE load_wh;
19+
20+
-- Show that 2 tasks exists to run this pipeline (1 root task and the second one is dependent)
21+
SHOW TASKS;
22+
23+
-- How long do we have to wait for next run ?
24+
SELECT timestampdiff(second, CURRENT_TIMESTAMP, scheduled_time) AS next_run, scheduled_time, CURRENT_TIMESTAMP, name, state
25+
FROM TABLE(information_schema.task_history()) WHERE state = 'SCHEDULED' ORDER BY completed_time DESC;
26+
27+
-- Insert some data to the raw_json_table (you can repeat the COPY with others weather json files in the json-files folder)
28+
COPY INTO raw_json_table FROM (SELECT $1 FROM @CDPST.PUBLIC.cdpst_json_files/weather1.json.gz (FILE_FORMAT => CDPST.PUBLIC.JSON) LIMIT 10);
29+
30+
-- Read the content of the newly created stream (should contains our 10 inserted records)
31+
SELECT * FROM raw_data_stream;
32+
SELECT * FROM transformed_data_stream;
33+
34+
-- Check when those 2 tables will be populated (automatically)
35+
SELECT * FROM transformed_json_table;
36+
SELECT * FROM final_table;

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Building Json data pipeline within Snowflake using Streams and Tasks
44
Online documentation : https://docs.snowflake.net/manuals/user-guide/data-pipelines-intro.html
55

66
# Prerequisites
7-
This tutorial use a citibike dataset available [here]()
7+
None, everything you need is in this github repo
88

99
# Introduction
1010
Data pipelines automate many of the manual steps involved in transforming and optimizing continuous data loads.

json-files/weather1.json.gz

8.66 MB
Binary file not shown.

json-files/weather2.json.gz

8.63 MB
Binary file not shown.

json-files/weather3.json.gz

16.2 MB
Binary file not shown.

json-files/weather4.json.gz

11.7 MB
Binary file not shown.

0 commit comments

Comments
 (0)