Created
June 1, 2020 11:07
-
-
Save ggosiang/24add457b0898cdc0bc135dd7a8dcd44 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ctypes import * | |
import math | |
import random | |
import os | |
import cv2 | |
import numpy as np | |
import time | |
import darknet | |
import matplotlib.pyplot as plt | |
import smtplib, os | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.base import MIMEBase | |
from email.mime.text import MIMEText | |
from email import encoders | |
class_color = { | |
'with_mask': [0, 255, 0], | |
'without_mask': [128, 0, 128], | |
'mask_weared_incorrect': [128, 128, 0] | |
} | |
def convertBack(x, y, w, h): | |
xmin = int(round(x - (w / 2))) | |
xmax = int(round(x + (w / 2))) | |
ymin = int(round(y - (h / 2))) | |
ymax = int(round(y + (h / 2))) | |
return xmin, ymin, xmax, ymax | |
def cvDrawBoxes(detections, img): | |
for detection in detections: | |
x, y, w, h = detection[2][0],\ | |
detection[2][1],\ | |
detection[2][2],\ | |
detection[2][3] | |
xmin, ymin, xmax, ymax = convertBack( | |
float(x), float(y), float(w), float(h)) | |
pt1 = (xmin, ymin) | |
pt2 = (xmax, ymax) | |
cv2.rectangle(img, pt1, pt2, class_color[detection[0].decode()], 1) | |
cv2.putText(img, | |
detection[0].decode() + | |
" [" + str(round(detection[1] * 100, 2)) + "]", | |
(pt1[0], pt1[1] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, | |
class_color[detection[0].decode()], 2) | |
return img | |
def plotFig(w_counter, w_o_counter, incorrect_counter): | |
plt.title("Number of each object type") | |
plt.xlabel("Frame #") | |
plt.ylabel("Amount") | |
plt.ylim(0, 20) | |
plt.plot(range(len(w_counter)), w_counter, label='with_mask') | |
plt.plot(range(len(w_o_counter)), w_o_counter, label='without_mask') | |
plt.plot(range(len(incorrect_counter)), incorrect_counter, label='mask_weared_incorrect') | |
plt.legend() | |
plt.savefig('output.png') | |
def sendEmail(w_amount, w_o_amount, incorrect_amount): | |
smtp_server = "smtp.gmail.com" | |
smtp_port = 587 | |
from_address = "..." | |
from_password = "..." | |
to_address = "..." | |
subject = "Job completed: Object Detection" | |
mail_body = """Percentage of with_mask: %f\n | |
Percentage of (without_mask + mask_weared_incorrect): %f\n\n | |
View live camera: %s""" % ( | |
w_amount/(w_amount+w_o_amount+incorrect_amount), | |
(w_o_amount+incorrect_amount)/(w_amount+w_o_amount+incorrect_amount), | |
'https://www.youtube.com/watch?v=WHeYEZIUbZQ') | |
attachment_1 = r"output.png" | |
attachment_2 = r"output.mp4" | |
msg = MIMEMultipart() | |
msg['Subject'] = subject | |
msg['To'] = to_address | |
msg.attach(MIMEText(mail_body)) | |
files = [] | |
files.append(attachment_1) | |
files.append(attachment_2) | |
for file in files: | |
part = MIMEBase('application', "octet-stream") | |
part.set_payload(open(file, "rb").read()) | |
encoders.encode_base64(part) | |
part.add_header('Content-Disposition', 'attachment; filename="{0}"'.format(os.path.basename(file))) | |
msg.attach(part) | |
server = smtplib.SMTP(smtp_server, smtp_port) | |
server.starttls() | |
server.login(from_address, from_password) | |
server.sendmail(from_address, to_address, msg.as_string()) | |
server.quit() | |
print('Email sent!') | |
netMain = None | |
metaMain = None | |
altNames = None | |
def YOLO(): | |
global metaMain, netMain, altNames | |
configPath = "./cfg/yolo-obj.cfg" | |
weightPath = "./backup/yolo-obj_5000.weights" | |
metaPath = "./data/face-mask-detection/obj.data" | |
if not os.path.exists(configPath): | |
raise ValueError("Invalid config path `" + | |
os.path.abspath(configPath)+"`") | |
if not os.path.exists(weightPath): | |
raise ValueError("Invalid weight path `" + | |
os.path.abspath(weightPath)+"`") | |
if not os.path.exists(metaPath): | |
raise ValueError("Invalid data file path `" + | |
os.path.abspath(metaPath)+"`") | |
if netMain is None: | |
netMain = darknet.load_net_custom(configPath.encode( | |
"ascii"), weightPath.encode("ascii"), 0, 1) # batch size = 1 | |
if metaMain is None: | |
metaMain = darknet.load_meta(metaPath.encode("ascii")) | |
if altNames is None: | |
try: | |
with open(metaPath) as metaFH: | |
metaContents = metaFH.read() | |
import re | |
match = re.search("names *= *(.*)$", metaContents, | |
re.IGNORECASE | re.MULTILINE) | |
if match: | |
result = match.group(1) | |
else: | |
result = None | |
try: | |
if os.path.exists(result): | |
with open(result) as namesFH: | |
namesList = namesFH.read().strip().split("\n") | |
altNames = [x.strip() for x in namesList] | |
except TypeError: | |
pass | |
except Exception: | |
pass | |
cap = cv2.VideoCapture("stream.mp4") | |
print('FPS: ', cap.get(cv2.CAP_PROP_FPS)) | |
print('WIDTH: ', cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
print('HEIGHT: ', cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
out = cv2.VideoWriter( | |
"output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), cap.get(cv2.CAP_PROP_FPS), | |
(int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))) | |
print("Starting the YOLO loop...") | |
# Create an image we reuse for each detect | |
darknet_image = darknet.make_image( | |
int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), | |
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), | |
3) | |
ret, frame_read = cap.read() | |
w_mask_counter = [] | |
w_o_mask_counter = [] | |
incorrect_mask_counter = [] | |
index = 0 | |
while ret: | |
frame_rgb = cv2.cvtColor(frame_read, cv2.COLOR_BGR2RGB) | |
darknet.copy_image_from_bytes(darknet_image, frame_rgb.tobytes()) | |
detections = darknet.detect_image(netMain, metaMain, darknet_image, thresh=0.40) | |
w_mask_counter.append(len([d for d in detections if d[0].decode()=='with_mask'])) | |
w_o_mask_counter.append(len([d for d in detections if d[0].decode()=='without_mask'])) | |
incorrect_mask_counter.append(len([d for d in detections if d[0].decode()=='mask_weared_incorrect'])) | |
image = cvDrawBoxes(detections, frame_rgb) | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
out.write(image) | |
print('Processing frame: ', index) | |
index += 1 | |
ret, frame_read = cap.read() | |
if index == 300: | |
break | |
cap.release() | |
out.release() | |
print(sum(w_mask_counter)) | |
print(sum(w_o_mask_counter)) | |
print(sum(incorrect_mask_counter)) | |
plotFig(w_mask_counter, w_o_mask_counter, incorrect_mask_counter) | |
sendEmail(sum(w_mask_counter), sum(w_o_mask_counter), sum(incorrect_mask_counter)) | |
if __name__ == "__main__": | |
YOLO() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment