ETL/ELT Strategies

Demo Video

Before you Begin

Capture the following parameters from the launched CloudFormation template as you will use them in the demo. Use the link below to access your CloudFormation dashboard.

https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks?filteringText=&filteringStatus=active&viewNested=true&hideStacks=false
  • GlueExternalDatabaseName
  • TaskDataBucketName

Client Tool

The screenshots for this demo will leverage the SQL Client Tool of SQLWorkbench/J. Be sure to complete the SQL Client Tool setup before proceeding. Alternatives to the SQLWorkbench include:

Challenge

The data analyst Miguel built product review reports for his business users from the Data Lake. They use the reports to evaluate the product reviews, star ratings etc., which helps them plan new product offerings. Because these reports are gaining in popularity, Miguel reaches out to Marie to see if there is a way to make the reports perform faster and save costs.

Marie decides to build a data pipeline so that these reports can be loaded into the Redshift attached storage. To solve this challenge Marie will build a data pipeline to source the data and perform an ETL (extract, transform, and load) operation to populate it into a dimensional model. Further, she will setup the data pipeline to make the refresh process repeatable. To solve this challenge Marie will do the following:

Here is the architecture of this setup: Architecture

Create and Load the Staging Environment

SayDoShow
Marie defines a staging table to load the new product review data. This staging table will mimic the structure of the raw source data except it has two additional columns at the end which is not available in the S3 files. She names the staging table in Redshift, product_reviews_staging.

Using your SQL Client Tool, execute the following command:

DROP TABLE IF EXISTS public.product_reviews_staging;
CREATE TABLE public.product_reviews_staging
(
  marketplace         VARCHAR(2),
  customer_id         VARCHAR(32),
  review_id           VARCHAR(24),
  product_id          VARCHAR(24),
  product_parent      VARCHAR(32),
  product_title       VARCHAR(512),
  star_rating         INT,
  helpful_votes       INT,
  total_votes         INT,
  vine                CHAR(1),
  verified_purchase   CHAR(1),
  review_headline     VARCHAR(256),
  review_body         VARCHAR(MAX),
  review_date         DATE,
  YEAR                INT,
  MONTH               INT,
  DAY                 INT
)
DISTSTYLE KEY DISTKEY (customer_id) SORTKEY (marketplace,review_date);

To load the data from S3 to the Redshift staging tables, Marie will use the COPY command. The COPY command is the fastest way to ingest bulk data to Amazon Redshift.

Amazon Redshift now supports attaching the default IAM role. If you have enabled the default IAM role in your cluster, you can use the default IAM role as follows. Follow the blog for details.

Using your SQL Client Tool, execute the following command to load review data.

You get an error because the number of columns in the target table is more than the file being copied. The file does not have MONTH and DAY fields.

COPY product_reviews_staging
FROM 's3://amazon-reviews-pds/parquet/product_category=Apparel/'
iam_role default PARQUET;

To address the issue and load the data from S3 to the Redshift staging tables successfully, Marie will now use a parameter called FILLRECORD in the COPY command. FILLRECORD will populate NULL for the last columns which are not available in the file and still load the data into the target table.

For more information on FILLRECORD refer to this doc and blog.

Amazon Redshift now supports attaching the default IAM role. If you have enabled the default IAM role in your cluster, you can use the default IAM role as follows. Follow the blog for details.

Using your SQL Client Tool, execute the following command. It will load Apparel (5906460 rows) and Automotive (3516476) data successfully.

COPY product_reviews_staging
FROM 's3://amazon-reviews-pds/parquet/product_category=Apparel/'
iam_role default PARQUET FILLRECORD;

COPY product_reviews_staging
FROM 's3://amazon-reviews-pds/parquet/product_category=Automotive/'
iam_role default PARQUET FILLRECORD;

To double check that the data was loaded correctly, Marie will query the staging table to do a sanity check on the number of rows loaded.

Using your SQL Client Tool, execute the following command. you should expect it to return 9422936 records.

SELECT COUNT(*)
FROM product_reviews_staging;

Trigger based load of CSVs to Staging Environment

SayDoShow
Marie has already deployed a Lambda function that polls an S3 bucket folder to detect new data files. The Lambda then triggers a load into Redshift but first she must create the folder in that S3 Bucket.

The lambda function and associated code was deployed when the Cloud Formation template run to setup the demo. You can find it in the Resources tab to see it’s specific code.

In the AWS management Console, Open the S3 bucket named «TaskDataBucketName» Create a folder named trigger-based-load
Now Marie will create a trigger to this folder in her Lambda function To to Lambda and look for Function with TriggerETLLambda Name in it

Click on Add Trigger button

Select S3 from Drop down

Enter the details as shown below:

Bucket: TaskBucketName
Event Type: PUT
Prefix:trigger-based-load/
Accept the Recursive Invocation

Click Add

Marie will then create target tables in Redshift as per her file structure to enable Redshift in preparation for the Lambda function’s attempt to load when triggered. Run the below SQL statements to create tables

  create table mge_stg
    (
      col_1 varchar(50),
      col_2 varchar(50),
      col_3 varchar(50),
      col_4 varchar(50),
      proc_ts timestamp DEFAULT GETDATE(),
      input_file_name varchar(50) DEFAULT 'mge.csv'
    );
  create table dte_stg
    (
      dte_col_1 varchar(50),
      dte_col_2 varchar(50),
      dte_col_3 varchar(50),
      proc_ts timestamp DEFAULT GETDATE(),
      input_file_name varchar(50) DEFAULT 'dte.csv'
    );

Marie will now upload files into S3 folder she created to test the Lambda trigger and then verify the data in Redshift Upload the files below into S3 trigger-based-load folder created above:

Marie then runs queries to validate data in Redshift data warehouse. This confirms that simple uploads to S3 Buckets can now trigger a load to her Redshift cluster going forward. Run the below queries and verify you see the same data that was uploaded in the CSV files.

select * from mge_stg;
select * from dte_stg;

Create the Reporting Model

SayDoShow
Based on the reporting requirements from Miguel, Marie needs to create a dimensional model that will be used by the business users. She will create a FACT table in Redshift called daily_product_reviews_fact along with product_dim dimensional tables.

Here is the dimensional model for the setup:

Using your SQL Client Tool, execute the following command:

DROP TABLE IF EXISTS public.daily_product_reviews_fact;
CREATE TABLE public.daily_product_reviews_fact
(
  marketplace           VARCHAR(2),
  product_id            VARCHAR(24),
  count_rating          INT,
  sum_rating            INT,
  total_helpful_votes   INT,
  total_votes           INT,
  review_date           DATE
)
DISTSTYLE EVEN SORTKEY (review_date);

DROP TABLE IF EXISTS public.product_dim;
CREATE TABLE public.product_dim
(
  product_id       VARCHAR(24),
  product_parent   VARCHAR(32),
  product_title    VARCHAR(512),
  effective_date   DATE,
  current_flag     CHAR(1)
)
DISTSTYLE ALL SORTKEY (product_id);

DROP TABLE IF EXISTS public.date_dim;
CREATE TABLE public.date_dim
(
  d_date_sk             INTEGER NOT NULL,
  d_date_id             CHAR(16) NOT NULL,
  d_date                DATE,
  d_month_seq           INTEGER,
  d_week_seq            INTEGER,
  d_quarter_seq         INTEGER,
  d_year                INTEGER,
  d_dow                 INTEGER,
  d_moy                 INTEGER,
  d_dom                 INTEGER,
  d_qoy                 INTEGER,
  d_fy_year             INTEGER,
  d_fy_quarter_seq      INTEGER,
  d_fy_week_seq         INTEGER,
  d_day_name            CHAR(9),
  d_quarter_name        CHAR(6),
  d_holiday             CHAR(1),
  d_weekend             CHAR(1),
  d_following_holiday   CHAR(1),
  d_first_dom           INTEGER,
  d_last_dom            INTEGER,
  d_same_day_ly         INTEGER,
  d_same_day_lq         INTEGER,
  d_current_day         CHAR(1),
  d_current_week        CHAR(1),
  d_current_month       CHAR(1),
  d_current_quarter     CHAR(1),
  d_current_year        CHAR(1),
  PRIMARY KEY (d_date_sk)
)
diststyle ALL;

The DATE_DIM table will only need to be loaded once and is not part of the daily process. Marie will load that table now.

Amazon Redshift now supports attaching the default IAM role. If you have enabled the default IAM role in your cluster, you can use the default IAM role as follows. Follow the blog for details.

Using your SQL Client Tool, execute the following command:

COPY date_dim
FROM 's3://redshift-downloads/TPC-DS/3TB/date_dim/'
iam_role default
GZIP DELIMITER '|' EMPTYASNULL REGION 'us-east-1';

Create Materialized Views

SayDoShow
Marie decides she will leverage materialized views to precompute the result set for transformations. That will be reused in the different steps of the dimensional model refresh. In addition, Materialized views will provide a performance improvement for aggregation operations.

The first view will implement the business rule for daily fact table aggregation.

Using your SQL Client Tool, execute the following command:

DROP MATERIALIZED VIEW IF EXISTS public.daily_product_review_fact_staging_mv CASCADE;
CREATE MATERIALIZED VIEW public.daily_product_review_fact_staging_mv
AS
SELECT marketplace,
       product_id,
       COUNT(1) as count_rating,
       SUM(star_rating) as sum_rating,       
       SUM(helpful_votes) AS total_helpful_votes,
       SUM(total_votes) AS total_votes,
       review_date
FROM public.product_reviews_staging
GROUP BY marketplace,
         product_id,
         review_date;

The next set of views will implement the business rule to:

  1. Get the latest product dimension attributes.
  2. Get product dimension attributes that have been changed and need to be updated.
  3. Get new products that need to be inserted.
Using your SQL Client Tool, execute the following command:
DROP MATERIALIZED VIEW IF EXISTS public.product_dim_staging_mv CASCADE;
CREATE MATERIALIZED VIEW public.product_dim_staging_mv
AS
SELECT product_id,
       product_parent,
       product_title,
       review_date,
       CHECKSUM(CAST(product_parent || product_title AS VARCHAR(550)))
FROM (SELECT product_id,
             product_parent,
             product_title,
             review_date,
             ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY review_date DESC,product_parent,product_title) AS rn
      FROM public.product_reviews_staging)
WHERE rn = 1;

DROP VIEW IF EXISTS public.product_dim_scd2_update_vw;
CREATE VIEW public.product_dim_scd2_update_vw
AS
SELECT s.*
FROM public.product_dim p
  INNER JOIN public.product_dim_staging_mv s
          ON p.product_id = s.product_id
         AND CHECKSUM (CAST (p.product_parent || p.product_title AS VARCHAR (550))) <> s.checksum;

DROP VIEW IF EXISTS public.product_dim_scd2_new_vw;
CREATE VIEW public.product_dim_scd2_new_vw
AS
SELECT s.*
FROM public.product_dim p
  RIGHT OUTER JOIN public.product_dim_staging_mv s ON p.product_id = s.product_id
WHERE p.product_id IS NULL;

Create Stored Procedures

SayDoShow
Marie will use stored procedures to execute the transformations for the ETL process. Stored Procedure provide a convenient method to encapsulate the business logic. The first procedure will update the PRODUCT_DIM table by doing the following steps:

  1. Update existing records which are flagged as update setting current_flag = 0.
  2. Insert records which are flagged as update with their new attributes and current_flag = 1.
  3. Insert records which are flagged as new products
Using your SQL Client Tool, execute the following command:
CREATE OR REPLACE PROCEDURE public.sp_merge_product_dim (loaddate DATE)
AS
$$ BEGIN IF loaddate IS NULL THEN RAISE EXCEPTION 'input cannot be null';

END IF;
REFRESH MATERIALIZED VIEW public.product_dim_staging_mv;

-- Update existing product_id current_flag to 0 and set effective_date to current_date
UPDATE public.product_dim
   SET current_flag = 0
FROM public.product_dim p
  INNER JOIN public.product_dim_scd2_update_vw s ON p.product_id = s.product_id
WHERE p.current_flag = 1;

INSERT INTO public.product_dim
SELECT product_id,
       product_parent,
       product_title,
       CURRENT_DATE,
       1
FROM public.product_dim_scd2_update_vw
WHERE review_date >= loaddate
UNION

-- New products
SELECT product_id,
       product_parent,
       product_title,
       CURRENT_DATE,
       1
FROM public.product_dim_scd2_new_vw
WHERE review_date >= loaddate;

END;
$$ LANGUAGE plpgsql;

The second procedure will insert new aggregated data into the daily_product_reviews_fact table. Using your SQL Client Tool, execute the following command:

CREATE OR REPLACE PROCEDURE public.sp_merge_daily_product_reviews_fact (loaddate DATE)
AS
$$ BEGIN IF loaddate IS NULL THEN RAISE EXCEPTION 'input cannot be null';

END IF;

REFRESH MATERIALIZED VIEW public.daily_product_review_fact_staging_mv;

INSERT INTO public.daily_product_reviews_fact
SELECT s.marketplace,
       s.product_id,
       s.count_rating,
       s.sum_rating,       
       s.total_helpful_votes,
       s.total_votes,
       s.review_date
FROM public.daily_product_review_fact_staging_mv s
  LEFT OUTER JOIN public.daily_product_reviews_fact f
               ON s.product_id = f.product_id
              AND s.review_date = f.review_date
WHERE f.product_id IS NULL
AND   s.review_date >= loaddate;

END;
$$ LANGUAGE plpgsql;

To test this pipeline, Marie will execute the stored procedures and expects the data in the dimensional model to be refreshed. She will pass in the parameter ‘1900-01-01’ to trigger the load of the historical data. To trigger and incremental load, she can pass in the date of the last load. Using your SQL Client Tool, execute the following command:

call public.sp_merge_product_dim ('1900-01-01'::DATE);
call public.sp_merge_daily_product_reviews_fact ('1900-01-01'::DATE);

To do a quick sanity check on the data, Marie executes the following count queries.

You should expect 3,068,610 records in the product_dim table and 9,084,801 records in the daily_product_reviews_fact table.

Using your SQL Client Tool, execute the following command:

SELECT COUNT(*)
FROM public.product_dim;

SELECT COUNT(*)
FROM public.daily_product_reviews_fact;

Perform Data Quality Checks

SayDoShow
Marie wants to also incorporate a data quality check to ensure the source data and final fact match. Since the data is available in our Data Lake, the Amazon Redshift Spectrum external table demo.parquet can be utilized to compare against the target fact table. Let’s now connect our Redshift Cluster to our Data Lake.

Replace the value for <GlueExternalDatabaseName> with the value previously determined.

Amazon Redshift now supports attaching the default IAM role. If you have enabled the default IAM role in your cluster, you can use the default IAM role as follows. Follow the blog for details.

Using your SQL Client Tool, execute the following command:

DROP SCHEMA IF EXISTS demo;
CREATE EXTERNAL SCHEMA IF NOT EXISTS demo
FROM DATA CATALOG DATABASE '<GlueExternalDatabaseName>'
IAM_ROLE default;

Marie will implement a strategy of summing the source data and comparing the aggregate values with the ones loaded into the data warehouse. The expected output would be to have corresponding columns of parquet_* and dm_* to be matched. Using your SQL Client Tool, execute the following command:

SELECT
  parquet_year, dm_year, parquet_avg_star_rating, dm_avg_star_rating, parquet_total_helpful_votes, dm_total_helpful_votes, parquet_total_votes, dm_total_votes
FROM (
    SELECT
      year AS parquet_year,
      AVG(star_rating) AS parquet_avg_star_rating,
      SUM(helpful_votes) AS parquet_total_helpful_votes,
      SUM(total_votes) AS parquet_total_votes
    FROM demo.parquet
    WHERE
      product_category IN ('Automotive', 'Apparel')
    GROUP BY
      year
  ) p
LEFT JOIN (
    SELECT
      dt_dim.d_year AS dm_year,
      SUM(fct.sum_rating)/SUM(fct.count_rating) AS dm_avg_star_rating,   
      SUM(fct.total_helpful_votes) AS dm_total_helpful_votes,
      SUM(fct.total_votes) AS dm_total_votes
    FROM public.daily_product_reviews_fact fct
    JOIN public.product_dim prdt_dim ON (fct.product_id = prdt_dim.product_id)
    JOIN public.date_dim dt_dim ON (fct.review_date = dt_dim.d_date)
    GROUP BY
      dt_dim.d_year
  ) dm ON dm.dm_year = p.parquet_year
ORDER BY
  dm.dm_year;

The dimensional data is now ready for end user consumption. Miguel can build a report similar to the one below to analyze the data. Using your SQL Client Tool, execute the following command:

SELECT         
  substring(prdt_dim.product_title,1,10) as prd_title ,
  SUM(fct.sum_rating)/SUM(fct.count_rating) AS avg_star_rating,   
  SUM(fct.total_helpful_votes) as total_helpful_votes,
  SUM(fct.total_votes) as total_votes
FROM public.daily_product_reviews_fact fct
JOIN public.product_dim prdt_dim ON (fct.product_id = prdt_dim.product_id)
JOIN public.date_dim dt_dim ON (fct.review_date = dt_dim.d_date)
GROUP BY
  substring(prdt_dim.product_title,1,10)
ORDER BY SUM(fct.total_votes) DESC
LIMIT 10

Before you Leave

If you want to re-run the demo, make sure you delete the Redshift tables/views, stored procedures and materialized views using the following SQL:

DROP PROCEDURE public.sp_merge_product_dim(date);
DROP PROCEDURE public.sp_merge_daily_product_reviews_fact(date);
DROP VIEW IF EXISTS public.product_dim_scd2_update_vw;
DROP VIEW IF EXISTS public.product_dim_scd2_new_vw;
DROP MATERIALIZED VIEW IF EXISTS public.product_dim_staging_mv CASCADE;
DROP MATERIALIZED VIEW IF EXISTS public.daily_product_review_fact_staging_mv CASCADE;
DROP TABLE IF EXISTS public.date_dim;
DROP TABLE IF EXISTS public.daily_product_reviews_fact;
DROP TABLE IF EXISTS public.product_dim;
DROP TABLE IF EXISTS public.product_reviews_staging;

If you are done using your cluster, please think about deleting the CFN stack or to avoid having to pay for unused resources do these tasks:

  • pause your Redshift Cluster
  • stop the Oracle database
  • stop the DMS task