Last active
June 25, 2021 07:19
-
-
Save Ivlyth/d8a9780b146625526a54aa74c5e225c8 to your computer and use it in GitHub Desktop.
命令行邮件工具,用于快速发送邮件及录制 pcap 用于协议解析测试
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf8 -*- | |
""" | |
Author : Myth | |
Date : 2021/3/9 | |
Email : email4myth at gmail.com | |
""" | |
from __future__ import unicode_literals | |
import sys | |
reload(sys) | |
sys.setdefaultencoding('utf-8') | |
import mimetypes | |
from cStringIO import StringIO | |
import argparse | |
import smtplib | |
from email.mime.application import MIMEApplication | |
from email.mime.audio import MIMEAudio | |
from email.mime.image import MIMEImage | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
from email.header import Header | |
from email.quoprimime import header_encode | |
import os | |
import time | |
try: | |
import qrcode | |
except: | |
qrcode = None | |
try: | |
import magic | |
except ImportError: | |
print("you can install filemagic module by execute `pip2 install filemagic` for intelligent file type detect") | |
magic = None | |
options = None | |
INSTALL = '''\ | |
Installation | |
The current stable version of python-magic is available on PyPI and can be installed by running `pip install python-magic`. | |
This module is a simple wrapper around the libmagic C library, and that must be installed as well: | |
# Debian/Ubuntu | |
>>> sudo apt-get install libmagic1 | |
# Centos | |
>>> sudo yum -y install epel-release | |
>>> sudo | |
# Windows | |
## You'll need DLLs for libmagic. @julian-r maintains a pypi package with the DLLs, you can fetch it with: | |
>>> pip install python-magic-bin | |
# OSX | |
## When using Homebrew: | |
>>> brew install libmagic | |
## When using macports: | |
>>>port install file | |
''' | |
MIME_TYPES = '''\ | |
application/font-woff woff | |
application/java-archive jar | |
application/javascript js | |
text/javascript js | |
application/ms-msi msi | |
application/msword doc | |
application/vnd.ms-powerpoint ppt pps pot | |
application/vnd.ms-excel xls xlm xla xlc xlt xlw | |
application/pdf pdf | |
application/pkcs7-signature p7 | |
application/vnd.font-fontforge-sfd sfd | |
application/vnd.ms-cab-compressed cab | |
application/vnd.ms-fontobject eot | |
application/vnd.ms-opentype otf | |
application/vnd.openxmlformats-officedocument.presentationml.presentation pptx | |
application/vnd.openxmlformats-officedocument.spreadsheetml.sheet xlsx | |
application/vnd.openxmlformats-officedocument.wordprocessingml.document docx | |
application/vnd.tcpdump.pcap pcap | |
application/warc warc | |
application/x-7z-compressed 7z | |
application/x-arc arc | |
application/x-arj arj | |
application/x-compress z | |
application/x-coredump core | |
application/x-cpio cpio | |
application/x-dmg dmg | |
application/x-dosexec exe | |
application/x-eet eet | |
application/x-font-ttf ttf | |
application/x-gzip gz | |
application/x-java-jnlp-file jnlp | |
application/x-lha lha | |
application/x-lrzip lrz | |
application/x-lz4 lz4 | |
application/x-lzh lzh | |
application/x-mif mif | |
application/x-pem pem | |
application/x-rar rar | |
application/x-rpm rpm | |
application/x-shockwave-flash swf | |
application/x-stuffit sif | |
application/x-tar tar | |
application/x-xar pkg | |
application/x-xz xz | |
application/x-zoo zoo | |
application/xml xml | |
application/zip zip | |
audio/mpeg mp3 | |
image/gif gif | |
image/jpeg jpg | |
image/png png | |
image/tiff tiff | |
image/vnd.adobe.photoshop pnd | |
image/x-cursor cur | |
image/x-icon ico | |
image/x-ms-bmp bmp | |
text/html html | |
text/json json | |
text/plain txt | |
text/rss rss | |
text/rtf rtf | |
text/x-awk awk | |
text/x-lua lua | |
text/x-perl pl | |
text/x-php php | |
text/x-python py | |
text/x-ruby rb | |
text/x-shellscript sh | |
video/3gpp 3gp | |
video/h264 264 | |
video/matroska mkv | |
video/mj2 mj2 | |
video/mp4 mp4 | |
video/mpeg mpg | |
video/mpv mpv | |
video/quicktime qt | |
video/webm webm | |
video/x-flc flc | |
video/x-fli fli | |
video/x-flv flv | |
video/x-jng jng | |
video/x-mng mng | |
video/x-sgi-movie sgi | |
''' | |
class ArgumentDefaultsHelpFormatter(argparse.HelpFormatter): | |
"""Help message formatter which adds default values to argument help. | |
Only the name of this class is considered a public API. All the methods | |
provided by the class are considered an implementation detail. | |
""" | |
def _get_help_string(self, action): | |
help = action.help | |
if '%(default)' not in action.help: | |
if action.default is not argparse.SUPPRESS: | |
defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE] | |
if action.option_strings or action.nargs in defaulting_nargs: | |
if action.default is not None: | |
help += ' (default: %(default)s)' | |
return help | |
def define_and_parse_args(args=None): | |
global options | |
parser = argparse.ArgumentParser(prog="PROG", | |
formatter_class=ArgumentDefaultsHelpFormatter) | |
mail_group = parser.add_argument_group(u'Mail') | |
mail_group.add_argument('-s', '--subject', default='【测试】邮件主题', help=u'测试的邮件主题, 可以包含占位符: xxx') | |
mail_group.add_argument('-f', '--from-user', help='发件人') | |
mail_group.add_argument('-t', '--to-list', required=True, nargs='+', help='收件人列表') | |
mail_group.add_argument('-c', '--cc-list', nargs='+', help='抄送人列表') | |
mail_group.add_argument('-b', '--bcc-list', nargs='+', help='秘送人列表') | |
mail_group.add_argument('-C', '--content', help='邮件正文, 可以指定一个文件路径, 以文件内容作为正文') | |
mail_group.add_argument('-a', '--attachments', nargs='+', help='附件路径, 可以指定多个') | |
mail_group.add_argument('-r', '--repeat-name-n-times', type=int, default=0, help='重复文件名称指定次数, 模拟长文件名称') | |
mail_group.add_argument('-q', '--qr-code-content', nargs='+', help='二维码形式附件的内容') | |
op_group = parser.add_argument_group(title='Operation') | |
op_group.add_argument('-i', '--interactive-mode', action='store_true', help='交互模式, 主要用于通过 wireshark 观测发包') | |
op_group.add_argument('-m', '--message-id', default='mk-id-%(pid)s-%(ts)s', help='message ID 格式') | |
op_group.add_argument('-T', '--reply-to', help='自定义的 reply-to 地址') | |
op_group.add_argument('-M', '--add-message-id', action='store_true', help='添加 message ID') | |
op_group.add_argument('-R', '--repeat-times', type=int, default=1, help='将当前邮件重复发送指定次数') | |
op_group.add_argument('-H', '--custom-headers', nargs='+', help='自定义 header') | |
################# SMTP Server | |
smtp_group = parser.add_argument_group(title=u'SMTP Server') | |
smtp_group.add_argument('-S', '--smtp-server', default='58.251.82.205', help=u'SMTP server 地址') | |
smtp_group.add_argument('-P', '--smtp-server-port', type=int, default=25, help=u'SMTP server 非加密通信端口') | |
smtp_group.add_argument('-u', '--user', default='[email protected]', help='发信人邮箱账号') | |
smtp_group.add_argument('-p', '--password', default='xxxxxxxxxxx', help='发信人邮箱密码') | |
tcpdump_group = parser.add_argument_group(title='tcpdump hint') | |
tcpdump_group.add_argument('--tcpdump', action='store_true', help='展示 tcpdump 命令提示, 且需二次确认后才开始真正执行') | |
options = parser.parse_args(args) | |
# check | |
if options.attachments: | |
for path in options.attachments: | |
if not os.path.isfile(path): | |
parser.error("attachment %s not found" % path) | |
return options | |
def interactive_prompt(prompt_msg): | |
if options.interactive_mode: | |
raw_input(prompt_msg) | |
def get_mail_content(): | |
if options.content: | |
if os.path.isfile(options.content): | |
return open(options.content, 'rb').read() | |
else: | |
return options.content | |
else: | |
return '''\ | |
中文邮件内容. | |
english email content. | |
--------------- | |
\r\n.\r\n.. | |
''' | |
def main(): | |
global options | |
options = define_and_parse_args() | |
if options.tcpdump: | |
print u'打开新的 shell 窗口执行以下 tcpdump 命令进行抓包, 使用真实 网卡名称 替换 IFNAME' | |
print u'tcpdump -i IFNAME -w %s_%s.pcap "host %s and port %s"' % ( | |
options.smtp_server, options.smtp_server_port, | |
options.smtp_server, options.smtp_server_port | |
) | |
raw_input('tcpdump 命令开始执行后, 回车继续') | |
if options.from_user: | |
from_user = options.from_user | |
else: | |
from_user = options.user[:options.user.index('@')] | |
frm = u'%s<%s>' % (from_user, options.user) | |
to_list = options.to_list[:] | |
content = MIMEText(get_mail_content(), _subtype=u'html', _charset=u'utf-8') | |
message = MIMEMultipart() | |
message['From'] = Header(frm, 'utf-8') | |
message['To'] = Header(u';'.join(options.to_list), 'utf-8') | |
message['Subject'] = Header(options.subject, 'utf-8') | |
if options.add_message_id: | |
message_id = options.message_id % { | |
'pid': os.getpid(), | |
'ts': int(time.time() * 1000000) | |
} | |
message["Message-ID"] = Header(message_id, "utf-8") | |
if options.reply_to: | |
message["Reply-To"] = Header(options.reply_to, "utf-8") | |
if options.custom_headers: | |
for header in options.custom_headers: | |
h_name, _, h_value = header.partition(':') | |
h_name = h_name.strip() | |
h_value = h_value.strip() | |
if not (h_name and h_value): | |
print "Warn: header name or value is empty, ignored: %s" % header | |
continue | |
message[h_name] = Header(h_value, 'utf-8') | |
if options.cc_list: | |
to_list += options.cc_list | |
message['Cc'] = Header(u';'.join(options.cc_list), 'utf-8') | |
if options.bcc_list: | |
to_list += options.bcc_list | |
message['Bcc'] = Header(u';'.join(options.bcc_list), 'utf-8') | |
# 正文 | |
message.attach(content) | |
if options.attachments: | |
mimetypes.inited = True | |
mt = mimetypes.MimeTypes() | |
mt.readfp(StringIO(MIME_TYPES)) | |
for attachment in options.attachments: | |
basename = os.path.basename(attachment) | |
name, ext = os.path.splitext(basename) | |
if options.repeat_name_n_times > 0: | |
name = name * options.repeat_name_n_times | |
basename = u'%s%s' % (name, ext) | |
if ext: | |
mime_type, _ = mt.guess_type(attachment) | |
elif magic is not None: # no ext | |
m = magic.Magic(flags=magic.MAGIC_MIME_TYPE) | |
mime_type = m.id_buffer(open(attachment, 'rb').read(2048)) | |
else: | |
mime_type = 'application/octet-stream' | |
if not mime_type: | |
print("Error: can not determine mime type for file %s" % attachment) | |
sys.exit(1) | |
elif mime_type.startswith('text/'): | |
att = MIMEText(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1], _charset='utf-8') | |
elif mime_type.startswith('application/'): | |
att = MIMEApplication(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1]) | |
elif mime_type.startswith('image/'): | |
att = MIMEImage(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1]) | |
elif mime_type.startswith('audio/'): | |
att = MIMEAudio(open(attachment, 'rb').read(), _subtype=mime_type.split('/')[-1]) | |
else: | |
print("Error: unsupported file suffix: %s" % attachment) | |
sys.exit(1) | |
att.add_header('Content-Disposition', 'attachment', | |
filename=header_encode(basename.encode('utf-8'), charset='utf-8')) | |
message.attach(att) | |
if options.qr_code_content: | |
if qrcode is None: | |
print("Error: qrcode module not installed, please execute `pip2 install qrcode` to install it before use this script") | |
sys.exit(1) | |
for qr_content in options.qr_code_content: | |
img = qrcode.make(qr_content) | |
buf = StringIO() | |
img.save(buf) | |
basename = qr_content.replace('"', '_').replace("'", '_').replace(' ', '_').replace(':', '_').replace('/', '_').replace('.', '_') | |
att = MIMEText(buf.getvalue(), _charset='utf-8') | |
att.add_header('Content-Disposition', 'attachment', | |
filename=header_encode('%s.png' % basename, charset='utf-8')) | |
message.attach(att) | |
interactive_prompt("ready to construct SMTP instance") | |
client = smtplib.SMTP() | |
interactive_prompt("ready to connect to SMTP server %s:%s" % (options.smtp_server, options.smtp_server_port)) | |
client.connect(host=options.smtp_server, port=options.smtp_server_port) | |
try: | |
interactive_prompt("ready to login for user: %s with password: %s" % (options.user, options.password)) | |
client.login(options.user, options.password) | |
except Exception as e: | |
print "Error: login failed: %s" % e | |
return | |
for i in range(options.repeat_times): | |
try: | |
to_list = list(set(to_list)) | |
interactive_prompt("ready to send mail for loop %s" % i) | |
client.sendmail(frm, to_list, message.as_string()) | |
interactive_prompt("send mail for loop %s done" % i) | |
except Exception as e: | |
print "Error: send mail failed: %s" % e | |
return | |
if options.tcpdump: | |
print u'可以在另一个窗口中停止 tcpdump 命令' | |
print u'邮件发送结束' | |
if __name__ == '__main__': | |
# sys.argv = ['mail-maker.py', '--tcpdump', '-s', 'test subject', '-C', 'test content', '-t', '[email protected]'] | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment