Step function tutorial with Realtime Example

Posted on by Sumit Kumar

Hey ,

 

In this Video we are going to learn Step function AWS service with real-time Scenario.

We will trigger step function from Lambda as soon as file will drop into S3 bucket. We have configured Two lambda in our Step function’s state Machine, first will truncate the redshift table and 2nd will copy data from S3 to Redshift table. So this Video we will learn end to end realtime scenario using production ready code.

I will recommended to watch my below two Videos first.

1) https://youtu.be/HSCcqXauY-s

 

2)https://youtu.be/wgGNiI-Q3Pk

 

Code used in this Video:-
https://deltafrog.net/step-function-tutorial-with-realtime-example/

Next Video Link:-
https://www.youtube.com/watch?v=B1NV9H-N_yQ

 

Code used in the video is as below:-

=====config.py=====Start

credential = {
'dbname' : 'dev',
'port' : '5439',
'user' : 'awsuser',
'password' : 'Deltafrog#123',
'host_url':'redshift-cluster-1.cgymtibgpcfw.us-east-1.redshift.amazonaws.com'
}

REDSHIFT_ROLE = {
'dev': 'arn:aws:iam::222161883511:role/copy_s3_redshift3',
'test': 'arn:aws:iam::222161883511:role/copy_s3_redshift'

}
=====config.py=====END
####env.py#########Start
ENV='dev'
####env.py#########End
#########################
import json
import psycopg2

from config import credential,REDSHIFT_ROLE

from env import ENV

def lambda_handler(event, context):
# TODO implement
conn_string = "dbname='{}' port='{}' user='{}' password='{}' host='{}'"\
.format(credential['dbname'],credential['port'],credential['user'],credential['password'],credential['host_url'])
print(conn_string)
con = psycopg2.connect(conn_string)
cur = con.cursor()
print(con)
cur.execute("truncate table dev.public.annual")
return event
#########################

###########KickOfStateMachine###########
import boto3
import json
from settings import STATE_MACHINE_ARN
from env import ENV
INVOCATION_JSON = {"resources": [] }

sfn = boto3.client('stepfunctions')

def lambda_handler(event, context):
# init params

print(context)
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']

# set request id
INVOCATION_JSON["id"] = str(context.aws_request_id)
# set bucket and key
INVOCATION_JSON["resources"] = [
{
"bucket":bucket,"key":key
}
]

response = sfn.start_execution(
stateMachineArn=STATE_MACHINE_ARN[ENV],
name=str(context.aws_request_id),
input=json.dumps(INVOCATION_JSON)
)

print(response)

################
settings.py

STATE_MACHINE_ARN = {
'dev': 'arn:aws:states:us-east-1:222161883511:stateMachine:MyStateMachine',
'test': 'arn:aws:states:us-east-1:222161883511:stateMachine:step_function_test',
'prod': 'arn:aws:states:us-east-1:222161883511:stateMachine:step_function_test'
}

#######################

import json
import boto3
from datetime import datetime
import psycopg2
from env import ENV
from settings import credential,REDSHIFT_ROLE,BUCKET

def lambda_handler(event, context):
conn_string = "dbname='{}' port='{}' user='{}' password='{}' host='{}'"\
.format(credential['dbname'],credential['port'],credential['user'],credential['password'],credential['host_url'])
print(conn_string)
con = psycopg2.connect(conn_string)
cur = con.cursor()
print(con)
print(event)
# src_bucket = event['resources'][0]['bucket']
# filepath = event['resources'][1]['key']
src_bucket = 'test-deltafrog-out'
filepath = 'annual_final0910.csv'
print(filepath)
print(src_bucket)
s3_file='s3://'+src_bucket+'/'+filepath
sql="""copy dev.public.annual
from '{0}'
iam_role '{1}'
delimiter ','
IGNOREHEADER as 1
csv;""".format(s3_file,REDSHIFT_ROLE[ENV])
print(sql)
#cur.execute("truncate table dev.public.annual")
cur.execute(sql)

return {"Status":"success"}

#################################
Posted in AWS.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

*