Create python consuming servers, and deploy them into EC2

This is the third lesson of the tutorial Scalable EC2 consuming servers for SQS, we will discuss about consuming servers, and deploying them to an EC2 instance. You should be familiar of how to create and connect to an EC2 instance.

By Consuming server, we mean a serving application, that will consume the queue and process messages from it. In an ideal case, the consuming servers are just a code, that can be deployed to lambda or EC2. We will deploy our code to EC2 in this tutorial. However, if you wish to use lambda, you will have to define a triggering event for lambda, like running once every one minute to fetch requests and process them in batches. In my case, I am assuming that I can process one message at a time, and I need some complicated stuff and I need an EC2 instance for this. Do not do this at work, as server-less design is the best.

Preparing the environment

I will be using boto3, the python SDK for AWS. You can get boto3 by invoking:


pip install boto3

I will test the code in my PC first, then deploy it to the EC2 instance. For this, I cannot use policies to connect to SQS service. I have to create a user, and get the key and the secret key and place them in /home/user/.aws/credentials file. Or an alternative, to pass them to the AWS client factory in the code.

The Serving Application

Since we do not have a real problem to process, we will make the server wait for 15 seconds for every message that is being served.

The following lines imports the needed packages, the boto3 to connect to AWS, and time to put the thread to sleep, and finally sys, that is being used to flush the output to stdout, we will see why we need this after we create the EC2 image and use it in an automated way.

import boto3
import time
import sys

sqs = boto3.client('sqs')

The final line of the previous snippets is calling the client factory to create an SQS client. We can pass the user credentials and region to that call using the arguments defined in this reference.

queue_url =
'https://sqs.us-east-2.amazonaws.com/####/ScalingTestQueue.fifo'

That was the queue URL. It can be obtained from the SQS service in the AWS console.

The following code snippet is the main core of the application:

  • Loop forever with while True to keep accepting requests.
  • Try to get an available message. If no available messages, then wait for five seconds, then try again.
  • If a message is available, then wait for 15 seconds. This acts like if the message is being processed.
  • After processing the message, you must delete it from the queue, so that other consuming server will not get the same message again.

It is important not to finish the processing for each message before the visibility timeout finishes. The visibility time out for the queue is a configuration we have set when we created the queue. If you find that the processing time is longer than the visibility time out, you can increase it any time.

The code will look as the following

while True:
        # Receive message from SQS queue
        response = sqs.receive_message(
                
            QueueUrl=queue_url,
            MaxNumberOfMessages=1,
            MessageAttributeNames=['All']
        )
        if not 'Messages' in response.keys():
                time.sleep(5)
                continue
        message = response['Messages'][0]
        receipt_handle = message['ReceiptHandle']
        print ('processing new one')
        sys.stdout.flush()
        time.sleep(15)

        # Delete received message from queue
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=receipt_handle
        )
        print('Received and deleted message: %s' % message['Body'])
        sys.stdout.flush()

The client Code

But what about adding new messages to SQS! We will use another automated application that will send messages.  You can use something like this 🙂

 


delay_between_messages = 8

count = 1
while True:
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=('This is the %d message I sent today' % count),
        MessageGroupId = str(count + 965),
        MessageDeduplicationId = str(count + 965)
    )
    print ('This is the %d message I sent today' % count)
    count += 1
    time.sleep(delay_between_messages)

With this, we have completed the applications needed for our tutorial. Now all we have left is to create an EC2 instance, and upload the consuming serving code to it. Please note that if you use the policy created in IAM, then you will not need to add user credentials.

Deploy the Application into an EC2 instance

After the code is ready, we will deploy it to an EC2 instance. We will do the following:

  1. Connect to the EC2 instance using SSH.
  2. Upload the code into the EC2 instance. I will use WinSCP to do so. Fell free to use any method you like.
  3. Install boto3.
  4. Create a cron job to run the code at the system reboot.

Cron jobs is a way to define tasks that Linux will do in a defined scheduling. We can define a job to run every minute or hour, or week. For our problem, we can define a task that will run at the startup of the system, i.e. rebooting.

Type “crontab -e” into the SSH, and pick the text editor you like. I pick nano. At the bottom of the file, define the cron job you want. The job is split into two parts, the scheduling part, then the command to run. There are a lot of options to be considered. To save time, I will introduce you to the command that you need in this case.


@reboot python pythonfile.py >> log.txt 2>&1

  • @reboot instructs the job to run once at the startup.
  • python pythonfile.py : is the command to run. Please note that pythonfile.py is stored in the home directory, i.e. /home/ubuntu/pythinfile.py.
  • >> log.txt : redirect the stdout to log.txt, after opening this file in the append mode. This file is in the home directory as well.
  • 2>&1 redirect stderr to stdout. This will make the errors available in the log file, as the stdout is redirected there.

If you have created the instance without the assigning the policy, then you will find an exception of missing credentials, unless you have wrote the user keys in the code.

Next, is creating the image, and the scaling policy 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *