Created
August 14, 2024 06:02
-
-
Save holly/ab16c4a200165ad381068a1dc70071fd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# vim:fileencoding=utf-8 | |
""" [NAME] script or package easy description | |
[DESCRIPTION] script or package description | |
""" | |
import os, sys, io | |
import json | |
import boto3 | |
import email | |
from pyline_notify import PyLINENotify | |
from decimal import Decimal | |
from email.header import decode_header | |
from email.utils import parsedate_to_datetime | |
__author__ = 'holly' | |
__version__ = '1.0' | |
DESCRIPTION = 'this is description' | |
MESSAGE = """ | |
{Subject} | |
--- | |
AWS SES to S3 notification | |
eventTime: {eventTime} | |
eventSource: {eventSource} | |
eventName: {eventName} | |
arn: {arn} | |
S3URI: {S3URI} | |
--- | |
From: {From} | |
To: {To} | |
{Content} | |
""" | |
def get_header(msg, key): | |
header = "" | |
if msg[key]: | |
for tup in decode_header(msg[key]): | |
if type(tup[0]) is bytes: | |
charset = tup[1] | |
if charset: | |
header += tup[0].decode(tup[1]) | |
else: | |
header += tup[0].decode() | |
elif type(tup[0]) is str: | |
header += tup[0] | |
return header | |
def get_content(msg): | |
charset = msg.get_content_charset() | |
payload = msg.get_payload(decode=True) | |
try: | |
if payload: | |
if charset: | |
return payload.decode(charset) | |
else: | |
return payload.decode() | |
else: | |
return "" | |
except: | |
# fallback | |
return payload | |
def lambda_handler(event, context): | |
line = PyLINENotify(os.environ["LINE_TOKEN"]) | |
s3 = boto3.client("s3") | |
for record in event["Records"]: | |
data = {} | |
data["eventTime"] = record["eventTime"] | |
data["eventName"] = record["eventName"] | |
data["eventSource"] = record["eventSource"] | |
bucket = record["s3"]["bucket"]["name"] | |
key = record["s3"]["object"]["key"] | |
data["S3URI"] = "s3://{0}/{1}".format(bucket, key) | |
data["arn"] = record["s3"]["bucket"]["arn"] | |
subject = "" | |
body = s3.get_object(Bucket=bucket, Key=key)["Body"].read() | |
msg = email.message_from_bytes(body) | |
spam_flag = False | |
virus_flag = False | |
for key in [ "X-SES-Spam-Verdict", "X-SES-Virus-Verdict", "From", "Subject", "To" ]: | |
if key == "X-SES-Spam-Verdict" and msg[key] != "PASS": | |
spam_flag = True | |
break | |
if key == "X-SES-Virus-Verdict" and msg[key] != "PASS": | |
virus_flag = True | |
break | |
if key == "Subject" or key == "From" or key == "To": | |
data[key] = get_header(msg, key) | |
if spam_flag: | |
print("{0} is spam mail. skip.".format(data["S3URI"])) | |
continue | |
if virus_flag: | |
print("{0} is virus mail. skip.".format(data["S3URI"])) | |
continue | |
content = get_content(msg) | |
if isinstance(content, bytes): | |
content = content.decode(msg.get_content_charset(), errors='replace') | |
data["Content"] = content | |
message = MESSAGE.format(**data) | |
kwargs = {} | |
if os.environ.get("LINE_STICKER_PACKAGE_ID"): | |
kwargs["stickerPackageId"] = os.environ.get("LINE_STICKER_PACKAGE_ID") | |
if os.environ.get("LINE_STICKER_ID"): | |
kwargs["stickerId"] = os.environ.get("LINE_STICKER_ID") | |
res = line.notify(message, **kwargs) | |
print("=========================") | |
print(res.status_line()) | |
print("=========================") | |
return { | |
'statusCode': 200, | |
'body': 'done' | |
} | |
if __name__ == '__main__': | |
if not os.path.isfile("event.json") : | |
print("event.json is not exists. skip") | |
sys.exit(0) | |
event = json.load(open("event.json", "r")) | |
context = {} | |
res = lambda_handler(event, context) | |
print(res) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment