In part 8 section, I would like to explain how to develop Lambda function to send data to Amazon Comprehend. Amazon Comprehend analyzes tweet and judge if it contains Positive/Negative/Neutral contents. Let me show you the scope of this section as below.







Input Function name as you like.
And I selected Python 3.8 in this time. You can choose any language, though.
Then, choose IAM role that you’ve created in the previous procedure.
Finally, click Create function button.
Next, click + Add trigger button.
Lambda function will be triggered at the time of S3 bucket creation.
Choose S3 bucket that stores raw/ data.
And Add button.
Next, deploy a script in the Lambda resource.
Click File > New File button.
And write down python script in the new file. Then click Deploy button.
After deployment, Changes deployed message is appeared.
Let me show you sample code. It is almost same as AWS’s sample code.
lambda_function.py
————————————————————————————
import json
import boto3
import os
import re
s3 = boto3.resource(‘s3’)
comprehend = boto3.client(‘comprehend’)
firehose = boto3.client(‘firehose’)
entity_should_be_filtered = re.compile(‘^[\d#@]$’)
def lambda_handler(event, context):
print(event)
for record in event[‘Records’]:
s3_bucket = record[‘s3’][‘bucket’][‘name’]
s3_key = record[‘s3’][‘object’][‘key’]
obj = s3.Object(s3_bucket, s3_key)
tweets_as_string = obj.get()[‘Body’].read().decode(‘utf-8’)
#Split by new line code and parse it with JSON format.
tweets = tweets_as_string.split(‘\n’)
for tweet_string in tweets:
if len(tweet_string) < 1:
continue
tweet = json.loads(tweet_string)
#Request Tweet Sentiment Analysis to Comprehend
sentiment_response = comprehend.detect_sentiment(
Text=tweet[‘text’],
LanguageCode=tweet[‘lang’]
)
print(sentiment_response)
sentiment_record = {
‘tweetid’: tweet[‘id’],
‘text’: tweet[‘text’],
‘sentiment’: sentiment_response[‘Sentiment’],
‘sentimentposscore’: sentiment_response[‘SentimentScore’][‘Positive’],
”sentimentnegscore’: sentiment_response[‘SentimentScore’][‘Negative’],
”sentimentneuscore’: sentiment_response[‘SentimentScore’][‘Neutral’],
”sentimentmixedscore’: sentiment_response[‘SentimentScore’][‘Mixed’]
‘}
firehose.put_record(
DeliveryStreamName=os.environ[‘SENTIMENT_STREAM’],
Record={
‘Data’: json.dumps(sentiment_record) + ‘\n’
}
)
entities_response = comprehend.detect_entities(
Text=tweet[‘text’],
LanguageCode=tweet[‘lang’]
)
print(entities_response)
seen_entities = []
for entity in entities_response[‘Entities’]:
if (entity_should_be_filtered.match(entity[‘Text’])):
continue
id = entity[‘Text’] + ‘-‘ + entity[‘Type’]
if (id in seen_entities) == False:
entity_record = {
‘tweetid’: tweet[‘id’],
‘entity’: entity[‘Text’],
‘type’: entity[‘Type’],
‘score’: entity[‘Score’]
}
firehose.put_record(
DeliveryStreamName=os.environ[‘ENTITY_STREAM’],
Record={
‘Data’: json.dumps(entity_record) + ‘\n’
}
)
seen_entities.append(id)
return ‘true’
——————————————————————————————-




Finally, set Timeout just to be safe.
That’s all for this topic. Please check if analyze results are stored in your S3 bucket.
In my next article, let me show you how to make queries by applying Athena.
コメント