[AWS] Twitter Analysis ~Part8: Develop Lambda function to send data to Amazon Comprehend~

Purpose

In part 8 section, I would like to explain how to develop Lambda function to send data to Amazon Comprehend. Amazon Comprehend analyzes tweet and judge if it contains Positive/Negative/Neutral contents. Let me show you the scope of this section as below.

Prerequisite
IAM Role creation
Lambda Creation
Firstly, open Lambda console.
And, click Create Function button.

Input Function name as you like.
And I selected Python 3.8 in this time. You can choose any language, though.
Then, choose IAM role that you’ve created in the previous procedure.
Finally, click Create function button.

Next, click + Add trigger button.
Lambda function will be triggered at the time of S3 bucket creation.

Choose S3 bucket that stores raw/ data.
And Add button.

Next, deploy a script in the Lambda resource.
Click File > New File button.

And write down python script in the new file. Then click Deploy button.

After deployment, Changes deployed message is appeared.

Let me show you sample code. It is almost same as AWS’s sample code.

lambda_function.py
————————————————————————————

import json
import boto3
import os
import re

s3 = boto3.resource(‘s3’)
comprehend = boto3.client(‘comprehend’)
firehose = boto3.client(‘firehose’)

entity_should_be_filtered = re.compile(‘^[\d#@]$’)

def lambda_handler(event, context):
       print(event)

       for record in event[‘Records’]:
       s3_bucket = record[‘s3’][‘bucket’][‘name’]
       s3_key = record[‘s3’][‘object’][‘key’]

       obj = s3.Object(s3_bucket, s3_key)
       tweets_as_string = obj.get()[‘Body’].read().decode(‘utf-8’)

       #Split by new line code and parse it with JSON format.
       tweets = tweets_as_string.split(‘\n’)
       for tweet_string in tweets:
             if len(tweet_string) < 1:
                   continue

       tweet = json.loads(tweet_string)

       #Request Tweet Sentiment Analysis to Comprehend
sentiment_response = comprehend.detect_sentiment(

             Text=tweet[‘text’],
             LanguageCode=tweet[‘lang’]
             )
       print(sentiment_response)

       sentiment_record = {
             ‘tweetid’: tweet[‘id’],
             ‘text’: tweet[‘text’],
             ‘sentiment’: sentiment_response[‘Sentiment’],
             ‘sentimentposscore’: sentiment_response[‘SentimentScore’][‘Positive’],
             ”sentimentnegscore’: sentiment_response[‘SentimentScore’][‘Negative’],
             ”sentimentneuscore’: sentiment_response[‘SentimentScore’][‘Neutral’],
             ”sentimentmixedscore’: sentiment_response[‘SentimentScore’][‘Mixed’]
         ‘}

          firehose.put_record(
                    DeliveryStreamName=os.environ[‘SENTIMENT_STREAM’],
                    Record={
                              ‘Data’: json.dumps(sentiment_record) + ‘\n’
                    }
          )

          entities_response = comprehend.detect_entities(
                    Text=tweet[‘text’],
                    LanguageCode=tweet[‘lang’]
          )
          print(entities_response)

          seen_entities = []
          for entity in entities_response[‘Entities’]:
                    if (entity_should_be_filtered.match(entity[‘Text’])):
                              continue

                    id = entity[‘Text’] + ‘-‘ + entity[‘Type’]
                    if (id in seen_entities) == False:
                          entity_record = {
                              ‘tweetid’: tweet[‘id’],
                              ‘entity’: entity[‘Text’],
                              ‘type’: entity[‘Type’],
                              ‘score’: entity[‘Score’]
                          }

                          firehose.put_record(
                               DeliveryStreamName=os.environ[‘ENTITY_STREAM’],
                               Record={
                                          ‘Data’: json.dumps(entity_record) + ‘\n’
                               }
                          )
                          seen_entities.append(id)

         return ‘true’
——————————————————————————————-

Then, edit Environment variables as below.
And, set variables which are used in the python script.

Finally, set Timeout just to be safe.

That’s all for this topic. Please check if analyze results are stored in your S3 bucket.
In my next article, let me show you how to make queries by applying Athena.

Reference
Basically, I followed the procedure provided by AWS webpage. Please refer to below URL if necessary, though it is written in Japanese…
If you think this article is beneficial for you, I would be glad if you click below icon for my motivation.
ブログランキング・にほんブログ村へ

コメント