[AWS] Twitter Analysis ~Part12: Notification by Amazon SNS triggered by Lambda~

Purpose

In part 12 section, I would like to explain how to execute notification triggered by Lambda.
Let me show you the scope of this section as below.

Prerequisite
S3 bucket creation
Firstly, create new S3 bucket. Only negative tweet will be stored in the S3 bucket for notification.
Please refer to previous article.
https://eeengineer.com/aws-how-to-create-s3-bucket/

Define Lambda function to execute Amazon Athena

Next, create new Lambda function. It extracts only negative data by Amazon Athena and store it to S3 bucket created in the previous step.

Create a new py file and write down python code as you like.

Let me show you how to get only negative tweet as below.
You need to designate SNS topic ARN.
Please check it in advance.

lambda_function.py
—————————————————————————-
import os
import time
import boto3
import urllib

S3_OUTPUT = ‘s3://twitter-XXXX’
S3_BUCKET =’twitter-XXXX’
DATABASE = ‘twitter_timeline_dashboard’
TABLE = ‘tweet_sentiments’

# create query

# number of retries
RETRY_COUNT = 30

# main
def lambda_handler(event, context):

# Extract only negative tweet
query = “SELECT * FROM %s.%s where sentiment=’NEGATIVE’ limit 100;” % (DATABASE, TABLE)

print(query)

client = boto3.client(‘athena’)

response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
‘Database’: DATABASE
},
ResultConfiguration={
‘OutputLocation’: S3_OUTPUT,
}
)

query_execution_id = response[‘QueryExecutionId’]
print(query_execution_id)

for i in range(1, 1 + RETRY_COUNT):

# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status[‘QueryExecution’][‘Status’][‘State’]

if query_execution_status == ‘SUCCEEDED’:
print(“STATUS:” + query_execution_status)
break

if query_execution_status == ‘FAILED’:
raise Exception(“STATUS:” + query_execution_status)

else:
print(“STATUS:” + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception(‘TIME OVER’)

# query file name
QUERY_FILE = query_execution_id + “.csv”

QUERY1_FILE = “out.csv”

# rename csv file
s3 = boto3.client(‘s3’)
s3.copy_object(Bucket=S3_BUCKET,Key=QUERY1_FILE, CopySource={‘Bucket’: S3_BUCKET, ‘Key’: QUERY_FILE})

## Generate a pre-signed URL for an Amazon S3 object.
presigned_url = s3.generate_presigned_url(
ClientMethod=’get_object’,
Params={‘Bucket’: S3_BUCKET, ‘Key’: QUERY_FILE},
ExpiresIn=120,
HttpMethod=’GET’)
print(presigned_url)

# Send email by SNS
sns = boto3.client(‘sns’)
TOPIC_ARN = u’arn:aws:sns:ap-northeast-XXXX’
msg = presigned_url
subject = u’Email was sent by lambda triggered by Athena’

request = {
‘TopicArn’: TOPIC_ARN,
‘Message’: msg,
‘Subject’: subject
}
response = sns.publish(**request)
return ‘Email was sent by lambda triggered by Athena’

—————————————————————————

Next, set triggers for Lambda execution.
I selected S3 bucket and sentiment/ prefix that you’ve created in chapter2.
By selecting above resource, Lambda function will be executed at the time that Twitter Analyze result is generated in sentiment/ prefix.

That’s all for this series, Twitter Analysis article. I hope it would be helpful for your development life. Thank you very much for reading it !!

Reference
Basically, I followed the procedure provided by AWS webpage. Please refer to below URL if necessary, though it is written in Japanese…
If you think this article is beneficial for you, I would be glad if you click below icon for my motivation.
ブログランキング・にほんブログ村へ

コメント