Step function tutorial with Realtime Example
Posted on by Sumit KumarHey ,
In this Video we are going to learn Step function AWS service with real-time Scenario.
We will trigger step function from Lambda as soon as file will drop into S3 bucket. We have configured Two lambda in our Step function’s state Machine, first will truncate the redshift table and 2nd will copy data from S3 to Redshift table. So this Video we will learn end to end realtime scenario using production ready code.
I will recommended to watch my below two Videos first.
1) https://youtu.be/HSCcqXauY-s
2)https://youtu.be/wgGNiI-Q3Pk
Code used in this Video:-
https://deltafrog.net/step-function-tutorial-with-realtime-example/
Next Video Link:-
https://www.youtube.com/watch?v=B1NV9H-N_yQ
Code used in the video is as below:- =====config.py=====Start credential = { 'dbname' : 'dev', 'port' : '5439', 'user' : 'awsuser', 'password' : 'Deltafrog#123', 'host_url':'redshift-cluster-1.cgymtibgpcfw.us-east-1.redshift.amazonaws.com' } REDSHIFT_ROLE = { 'dev': 'arn:aws:iam::222161883511:role/copy_s3_redshift3', 'test': 'arn:aws:iam::222161883511:role/copy_s3_redshift' } =====config.py=====END ####env.py#########Start ENV='dev' ####env.py#########End ######################### import json import psycopg2 from config import credential,REDSHIFT_ROLE from env import ENV def lambda_handler(event, context): # TODO implement conn_string = "dbname='{}' port='{}' user='{}' password='{}' host='{}'"\ .format(credential['dbname'],credential['port'],credential['user'],credential['password'],credential['host_url']) print(conn_string) con = psycopg2.connect(conn_string) cur = con.cursor() print(con) cur.execute("truncate table dev.public.annual") return event ######################### ###########KickOfStateMachine########### import boto3 import json from settings import STATE_MACHINE_ARN from env import ENV INVOCATION_JSON = {"resources": [] } sfn = boto3.client('stepfunctions') def lambda_handler(event, context): # init params print(context) bucket = event['Records'][0]['s3']['bucket']['name'] key = event['Records'][0]['s3']['object']['key'] # set request id INVOCATION_JSON["id"] = str(context.aws_request_id) # set bucket and key INVOCATION_JSON["resources"] = [ { "bucket":bucket,"key":key } ] response = sfn.start_execution( stateMachineArn=STATE_MACHINE_ARN[ENV], name=str(context.aws_request_id), input=json.dumps(INVOCATION_JSON) ) print(response) ################ settings.py STATE_MACHINE_ARN = { 'dev': 'arn:aws:states:us-east-1:222161883511:stateMachine:MyStateMachine', 'test': 'arn:aws:states:us-east-1:222161883511:stateMachine:step_function_test', 'prod': 'arn:aws:states:us-east-1:222161883511:stateMachine:step_function_test' } ####################### import json import boto3 from datetime import datetime import psycopg2 from env import ENV from settings import credential,REDSHIFT_ROLE,BUCKET def lambda_handler(event, context): conn_string = "dbname='{}' port='{}' user='{}' password='{}' host='{}'"\ .format(credential['dbname'],credential['port'],credential['user'],credential['password'],credential['host_url']) print(conn_string) con = psycopg2.connect(conn_string) cur = con.cursor() print(con) print(event) # src_bucket = event['resources'][0]['bucket'] # filepath = event['resources'][1]['key'] src_bucket = 'test-deltafrog-out' filepath = 'annual_final0910.csv' print(filepath) print(src_bucket) s3_file='s3://'+src_bucket+'/'+filepath sql="""copy dev.public.annual from '{0}' iam_role '{1}' delimiter ',' IGNOREHEADER as 1 csv;""".format(s3_file,REDSHIFT_ROLE[ENV]) print(sql) #cur.execute("truncate table dev.public.annual") cur.execute(sql) return {"Status":"success"} #################################
Leave a Reply