Streaming Data Ingestion- Redshift Streaming Ingestion

Before you Begin

Use the following CFN template to launch the resources needed in your AWS account.

Launch

The stack will deploy Redshift serverless endpoint(workgroup-xxxxxxx) and also a provisioned cluster(consumercluster-xxxxxxxxxx). This demo works on both serverless and provisioned cluster.

Additionally run below stack for setting Kinesis data generator. Provide user id and password of your choice for data generator.

Launch

Capture the following output parameters from the above launched CloudFormation template as you will use them in the demo.

  • KinesisDataGeneratorUrl

Client Tool

This demo utilizes the Redshift web-based Query Editor v2. Navigate to the Query editor v2.

Below credentials should be used for both the Serverless endpoint (workgroup-xxxxxxx) as well as the provisioned cluster (consumercluster-xxxxxxxxxx).

User Name: awsuser
Password: Awsuser123

Challenge

The data analyst Miguel needs to get realtime energy consumption and usage information from electric vehicle charging stations so he can build a dashboard showing energy consumption and number of connected users over time. He asks Marie to build an ingestion pipeline to load the latest data into the Redshift data warehouse. The architecture of the new pipeline will look like the following:

Architecture

To solve this challenge Marie will do the following:

  1. Create a Kinesis data stream
  2. Generate streaming data with the Kinesis Data Generator
  3. Load reference data
  4. Create a materialized view
  5. Query the stream

Create Kinesis data stream

SayDoShow
First, Marie must create a Kinesis data stream to receive the streaming data.

  1. On the Amazon Kinesis console, choose Data streams.
  2. Choose Create data stream.
  3. For Data stream name, enter ev_stream_data.
  4. For Capacity mode, select On-demand.
  5. Provide the remaining configurations as needed to create your data stream.

Load Reference Data

SayDoShow
Marie also wants to load reference data related to electric vehicle charging stations to the Redshift cluster. Marie runs following create table command using Redshift query editor v2 .

CREATE TABLE ev_station
  (
     siteid                INTEGER,
     station_name          VARCHAR(100),
     address_1             VARCHAR(100),
     address_2             VARCHAR(100),
     city                  VARCHAR(100),
     state                 VARCHAR(100),
     postal_code           VARCHAR(100),
     no_of_ports           SMALLINT,
     pricing_policy        VARCHAR(100),
     usage_access          VARCHAR(100),
     category              VARCHAR(100),
     subcategory           VARCHAR(100),
     port_1_connector_type VARCHAR(100),
     voltage               VARCHAR(100),
     port_2_connector_type VARCHAR(100),
     pricing               VARCHAR(100),
     power_select          VARCHAR(100)
  ) 
distkey (siteid)
sortkey (siteid) ;

Next Marie will load data from S3 bucket.

Marie uses Redshift copy command to load data from S3 bucket

COPY ev_station FROM 's3://redshift-immersionday-labs/data/charging-station/Charging-Station-Network.csv' 
IAM_ROLE default
region 'us-west-2' CSV IGNOREHEADER 1  COMPUPDATE PRESET;

Amazon Kinesis Data Generator

SayDoShow
Marie wants to create synthetic data simulating electric vehicle charging station usage and sending it into the stream using the Kinesis Data Generator. Launch KDG Tool by clicking URL extracted from cloudformation stack ‘Kinesis-Data-Generator-Cognito-User’, provide a Username and Password for the user.

Now that Marie is able to login to KDG tool, she wants to start streaming EV data to Kinesis Data Stream ‘ev_stream_data’.

Marie selects region and delivery stream she created. She copies the EV Data template below and paste it into KDG and selects Send Data.

{
    
   "_id" : "{{random.uuid}}",
   "clusterID": "{{random.number(
        {   "min":1,
            "max":50
        }
    )}}", 
    "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}",
    "kWhDelivered": "{{commerce.price}}",
    "stationID": "{{random.number(
        {   "min":1,
            "max":467
        }
    )}}",
      "spaceID": "{{random.word}}-{{random.number(
        {   "min":1,
            "max":20
        }
    )}}",
 
   "timezone": "America/Los_Angeles",
   "userID": "{{random.number(
        {   "min":1000,
            "max":500000
        }
    )}}"
}

Create a materialized view

Amazon Redshift streaming ingestion uses a materialized view, which is updated directly from the stream when REFRESH is run. The materialized view maps to the stream data source.

SayDoShow
As a next step Marie define a schema in Amazon Redshift with ‘create external schema’ to reference a Kinesis Data Streams resource

Using your Redshift query editor v2, execute the following statement.

CREATE EXTERNAL SCHEMA evdata FROM KINESIS
IAM_ROLE default;

Now Marie will create a materialized view to consume the stream data. The materialized view is auto-refreshed as long as there is new data on the KDS stream. She can also disable auto-refresh and run a manual refresh or schedule a manual refresh using the Redshift Console UI.

Using your Query editor v2, execute the following statement.

CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS
    SELECT 
    refresh_time,
    approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID,
    json_parse( from_varbyte(kinesis_data, 'utf-8')) as payload
    FROM evdata."ev_stream_data" 
    WHERE CAN_JSON_PARSE(kinesis_data);

Query the stream

With SUPER data type and the PartiQL language, Amazon Redshift expands data warehouse capabilities to natively ingest, store, transform, and analyze semi-structured data. You can use PartiQL dynamic typing and lax semantics to run your queries and discover the deeply nested data you need, without the need to impose a schema before query.

SayDoShow
Marie wants to now validate the data is loading into Redshift from “ev_stream_data” stream. First see need to enable ‘enable_case_sensitive_identifier to TRUE to handle upper case or mixed case JSON fields.

Using your Redshift query editor v2, execute the following statement.

SET enable_case_sensitive_identifier to TRUE;  

Next Marie runs following query to see payload data stored as SUPER Datatype in materialized view.

Using your Redshift query editor v2, execute the following statement.

SELECT payload
from ev_station_data_extract
extract limit 10;

Marie can also query the refreshed materialized view to get usage statistics. She can re-run query multiple times to get new data, published by Kinesis Data Stream.

Using your Redshift query editor v2, execute the following statement.

SELECT date_trunc('minute', payload."connectionTime"::timestamp) as connectiontime
,SUM(payload."kWhDelivered"::DECIMAL(10,2)) AS Energy_Consumed
,count(distinct payload."userID") AS #Users
from ev_station_data_extract
where refresh_time > current_timestamp -interval '5 minutes'
group by 1
order by 1 desc;

Marie also wants to add reference data for Miguel’s analysis. She can get reference data by joining materialzed view with ev_station reference table.

Using your Redshift query editor v2, execute the following statement. This will return charging station consumption data for the last 5 minutes and break it down by station category.

SELECT date_trunc('minute', payload."connectionTime"::timestamp) as connectiontime
,SUM(payload."kWhDelivered"::DECIMAL(10,2)) AS Energy_Consumed
,count(distinct payload."userID") AS #Users
,category
from ev_station_data_extract , ev_station
where 
stationID=siteid 
and refresh_time > current_timestamp -interval '5 minutes'
group by payload."connectionTime"::timestamp, category
order by 1 desc;

Before you Leave

If you are done using your cluster, please think about deleting the CFN stack or Pausing your Redshift Cluster to avoid having to pay for unused resources.