Last active
August 19, 2023 05:00
-
-
Save sweemeng/d666810c56bdad4ed52cf4602d9178ce to your computer and use it in GitHub Desktop.
Example code for SQS worker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
class ApplicationSpecificError(Exception): | |
pass | |
# You probably want to wrap this in a function, or put in a main() function | |
sqs = boto3.client('sqs') | |
while True: | |
response = sqs.receive_message( | |
QueueUrl=queue_url_, | |
MaxNumberOfMessages=1, | |
VisibilityTimeout=30, | |
WaitTimeSeconds=0 | |
) | |
if 'Messages' in response: | |
message = response['Messages'][0] | |
logger.debug(message) | |
receipt_handle = message['ReceiptHandle'] | |
# create keep_alive function to update visibiltiy timeout | |
# keep_alive(response['Messages'][0]['ReceiptHandle'], queue_url) | |
# put in a thread | |
def keep_alive(receipt_handle, queue_url): | |
while True: | |
try: | |
sqs.change_message_visibility( | |
QueueUrl=queue_url, | |
ReceiptHandle=receipt_handle, | |
VisibilityTimeout=20 | |
) | |
time.sleep(10) | |
except ClientError as e: | |
logger.debug("killed") | |
break | |
# create thread to use keep_alive function | |
thread = threading.Thread(target=keep_alive, args=(receipt_handle, queue_url_)) | |
thread.start() | |
# get message from queue | |
body = message['Body'] | |
logger.debug(body) | |
data = json.loads(body) | |
logger.debug(f"working") | |
try: | |
# Process message here, for example process(data) | |
# callback may also be called here, that depends on your application | |
logger.debug(f"finished") | |
# delete message from queue. do not delete if fail, allow for recovery | |
sqs.delete_message( | |
QueueUrl=queue_url_, | |
ReceiptHandle=receipt_handle | |
) | |
except ApplicationSpecificError as e: | |
logger.error(e) | |
# Delete queue if you think that this error should not be recovered. i.e Other worker should not try to process this message | |
sqs.delete_message( | |
QueueUrl=queue_url_, | |
ReceiptHandle=receipt_handle | |
) | |
continue | |
except Exception as e: | |
# Stop worker if this is a code error. | |
logger.error(e) | |
break # stop the worker, so that failure does not overwhelm the queue | |
# Be nice to all service | |
time.sleep(1) | |
else: | |
# logger.info("No messages in queue") | |
time. Sleep(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment