Redshift Integrations with Apache Spark

Demo Video

Before you Begin

Use the following CFN template to launch the resources needed in your AWS account.

Launch

The stack will deploy Redshift serverless endpoint(workgroup-xxxxxxx) and also a provisioned cluster(consumercluster-xxxxxxxxxx). This demo works on both serverless and provisioned cluster.

Client Tool

This demo utilizes the Redshift web-based Query Editor v2. Navigate to the Query editor v2.

Below credentials should be used for both the Serverless endpoint (workgroup-xxxxxxx) as well as the provisioned cluster (consumercluster-xxxxxxxxxx).

User Name: awsuser
Password: Awsuser123

Prerequisites:

  1. Need EMR provisioned server with release version of 6.9.0 or higher with applications pig 0.17.0 or higher , Jupyterhub 1.4.1 or higher, JupyterEnterpriseGateway 2.6.0 or higher and Spark 3.3.0 or higher.
  2. Create EMR note books (pyspark module) and update livy configuration file to connect to EMR cluster.
  3. Redshift serverless instance.

Test pushdown for join, filter, aggregate & sort

SayDoShow
Create spark dataframes to test pushdown for join, filter, aggregate and sort

Create new cell in your EMR note book and execute below

# Create the sales DataFrame from Redshift table
sales_df = (
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.options(**redshiftOptions)
.option("dbtable", "tickit.sales")
.load()
)

# Create the date DataFrame from Redshift table
date_df = (
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.options(**redshiftOptions)
.option("dbtable", "tickit.date")
.load()
)
# Issue query to redshift and show results
sales_df.join(date_df, sales_df.dateid == date_df.dateid, 'inner').where(
col("year") == 2008).groupBy("qtr").sum("qtysold").select(
col("qtr"), col("sum(qtysold)")).sort(["qtr"], ascending=[1]).withColumnRenamed("sum(qtysold)","total_quantity_sold").show()

Get the last query executed

SayDoShow
Get the last query executed on Amazon Redshift with query_label = ‘spark-redshift’

Create new cell in your EMR note book and execute below

def getLastQueryIssuedToRedshift():
redshiftOptions = {
"url": jdbc_iam_url,
"tempdir": temp_dir,
"aws_iam_role" : aws_role
}

test_df = (
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.options(**redshiftOptions)
.option("query", "select query_text from SYS_QUERY_HISTORY where query_label = 'spark-redshift' order by start_time desc limit 1")
.load()
)

def GetValueFromDataframe(_df,columnName):
for row in _df.rdd.collect():
return row[columnName].strip()

return GetValueFromDataframe(test_df,"query_text")

print(getLastQueryIssuedToRedshift())

Test pushdown for distinct & count

SayDoShow
Create spark dataframes to test pushdown for distinct and count

Create new cell in your EMR note book and execute below

sales_df.select(col("buyerid")).distinct().count()
 
print(getLastQueryIssuedToRedshift())