Created
September 10, 2020 18:55
-
-
Save jaltmayerpizzorno/935b021acdb3b562031f374b7deddfea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2020, Juan Altmayer Pizzorno. All Rights Reserved. MIT license applies. | |
import os | |
import argparse | |
import tensorflow as tf | |
import tensorflow.keras as keras | |
import numpy as np | |
import cv2 | |
import time | |
import sys | |
from pathlib import Path | |
# This is https://github.com/jaltmayerpizzorno/d2k | |
d2k_path = Path(__file__).parent / 'd2k' | |
sys.path.append(str(d2k_path)) | |
import d2k | |
config = { | |
'camera_path': Path('/mnt/camera/samba/axis-00408CC5B6D8'), | |
'search_secs': 2.0, # search for detections every N seconds | |
'before_secs': 1.5, # extend clip this long before first detection | |
'after_secs': 1.5, # extend clip this long after last detection | |
'clip_width': 320, # clip width (height is automatic) | |
'clip_format': 'mp4', | |
'clip_codec': 'avc1', | |
'image_width': 640, # clip "representative" image width (height is automatic) | |
'email_subject': 'entrance camera', | |
'smtp_credentials': (Path(__file__).parent / 'smtp-credentials.json') | |
} | |
argparser = argparse.ArgumentParser() | |
argparser.add_argument('input_files', type=str, nargs='*', help='file(s) to process') | |
argparser.add_argument('--send', help='send email', action='store_true') | |
argparser.add_argument('--mark', help='mark detections', action='store_true') | |
argparser.add_argument('--nodb', help='don\'t use axis\' index.db to find files', action='store_true') | |
argparser.add_argument('--gpu', help='use GPU', action='store_true') | |
argparser.add_argument('--float16', help='use float16', action='store_true') | |
args = argparser.parse_args() | |
if not args.gpu: | |
tf.config.experimental.set_visible_devices([], 'GPU') | |
else: | |
gpu = tf.config.experimental.list_physical_devices('GPU') | |
if tf.version.VERSION[:4] == '2.2.': | |
tf.config.experimental.set_memory_growth(gpu[0], True) # works around bug in TF 2.2 | |
def make_model(): | |
if args.float16: | |
keras.backend.set_floatx('float16') | |
network = d2k.network.load((d2k_path / 'darknet-files' / 'yolov3.cfg').read_text()) | |
network.read_darknet_weights((d2k_path / 'darknet-files' / 'yolov3.weights').read_bytes()) | |
return network.make_model() | |
coco_class_names = (d2k_path / 'darknet-files' / 'coco.names').read_text().splitlines() | |
# Limit detections to classes we are interested in... this cuts down on false positives | |
interesting_classes = {coco_class_names.index(c) for c in ['person', 'bicycle', 'car', 'motorbike', | |
'bus', 'truck', 'cat', 'dog']} | |
def is_interesting(box): | |
"""Returns whether the given YOLO box has an interesting detection.""" | |
return sum([(box.classes[i] > 0) for i in interesting_classes]) > 0 | |
def dividing_line(img_dim): | |
"""Returns the (a,b) parameters for our detection boundary, scaled to the given image dimensions (w,h).""" | |
line_points = np.array([[0, 270], [1920, 740]]) # (x,y) measured on image | |
orig_img_size = np.array([1920, 1080]) # that image's (w, h) | |
line_points = line_points / orig_img_size | |
x = line_points[:,0] | |
y = line_points[:,1] | |
a = (y[1] - y[0]) / (x[1] - x[0]) | |
b = y[0] - a * x[0] | |
return (a * img_dim[1]/img_dim[0], b * img_dim[1]) | |
def is_below(line, box): | |
"""Returns whether the bottom center of a box lies below a line""" | |
x = box.x | |
y = box.y + box.h/2 # bottom center of box | |
return y > (line[0] * x + line[1]) # note y starts at top | |
def pipeline_detect(model, frames): | |
"""Detects objects on the given frames, returning a list of (lists of) detection boxes.""" | |
img_dim = frames[0].shape[1::-1] | |
line = dividing_line(img_dim) # detection boundary | |
net_dim = model.layers[0].input_shape[0][2:0:-1] | |
def output2boxes(output): | |
boxes = d2k.network.boxes_from_output(output, net_dim, img_dim, thresh=.8) | |
boxes = d2k.box.nms_boxes(boxes, iou_thresh=.5) | |
# filter out uninteresting or out-of-bounds boxes | |
for b in [b for b in boxes if not is_below(line, b)]: | |
print({coco_class_names[i] for i, c in enumerate(b.classes) if c > 0.}, 'ignored') | |
boxes = [b for b in boxes if is_interesting(b) and is_below(line, b)] | |
return boxes | |
def pipeline(): | |
for img in frames: | |
img = img[...,::-1].astype('float32') / 255.0 | |
img = d2k.image.letterbox(img, *net_dim) | |
img = np.reshape(img, (1, *img.shape)) | |
yield img | |
input = tf.data.Dataset.from_generator(pipeline, (keras.backend.floatx()), | |
tf.TensorShape(model.layers[0].input_shape[0])) | |
input = input.prefetch(tf.data.experimental.AUTOTUNE) | |
output = model.predict(input) | |
return [output2boxes([out[o_i] for out in output]) for o_i in range(len(output[0]))] | |
def boxes_names(boxes): | |
"""Returns the class names for the object(s) detected in the given boxes""" | |
return {coco_class_names[i] for b in boxes for i, c in enumerate(b.classes) if c > 0.} | |
def boxes_area(boxes): | |
"""Returns the sum of the given boxes' areas""" | |
return sum([b.w * b.h for b in boxes]) | |
def save_jpg(top_detection, image_name, x_dim): | |
"""Save a frame as a JPG, resizing and optionally marking the detections""" | |
top_frame, top_boxes = top_detection | |
image = keras.preprocessing.image.array_to_img(top_frame[...,::-1].astype(np.float32) / 255.0, | |
data_format='channels_last', dtype='float32') | |
if args.mark: | |
d2k.box.draw_boxes(image, top_boxes, names=coco_class_names) | |
image = image.resize((x_dim, int(image.height/image.width*x_dim))) | |
image.save(image_name, "jpeg") | |
return image_name | |
def true_seq(iterable): | |
"""Iterator returning (start, end) ranges for sequences of True values in a boolean valued iterable""" | |
start = None | |
for i, value in enumerate(iterable): | |
if start: | |
if value != True: | |
yield (start, i) | |
start = None | |
else: | |
if value == True: | |
start = i | |
if start: yield (start, len(iterable)) | |
def processFile(model, file): | |
"""Processes a video file, returning a tuple (names, clips) of a list of the detected class names | |
and a list of the generated clips/images""" | |
print("processing", file) | |
start_time = time.time() | |
cap = cv2.VideoCapture(str(file)) | |
if not cap.isOpened(): | |
return set(), list() | |
video_fps = cap.get(cv2.CAP_PROP_FPS) | |
video_dims = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
clip_dims = (config['clip_width'], int(video_dims[1]/video_dims[0]*config['clip_width'])) | |
# search for detections every N frames | |
search_frames = int(config['search_secs'] * video_fps) | |
clips_list = [] | |
clip_prefix = '_'.join(str(file.name).split('_')[:2]) | |
all_detections = set() | |
memory = [] | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if ret != True: break | |
memory.append(frame) | |
cap.release() | |
boxes_lst = pipeline_detect(model, memory[::search_frames]) | |
for start, end in true_seq([len(boxes)>0 for boxes in boxes_lst]): | |
detections = set.union(*[boxes_names(boxes_lst[i]) for i in range(start, end)]) | |
all_detections |= detections | |
# Write clip with detection(s) | |
clip_name = f"{clip_prefix}-{len(clips_list)+1}-" + '-'.join(sorted(detections)) \ | |
+ '.' + config['clip_format'] | |
print(f"writing {clip_name}") | |
out = cv2.VideoWriter(clip_name, cv2.VideoWriter_fourcc(*config['clip_codec']), | |
video_fps, clip_dims) | |
clip_start = max(0, start * search_frames - int(config['before_secs'] * video_fps)) | |
clip_end = min(total_frames, (end-1) * search_frames + int(config['after_secs'] * video_fps)) | |
for i in range(clip_start, clip_end): | |
out.write(cv2.resize(memory[i], clip_dims)) | |
out.release() | |
# Write "representative" image for the clip, for quick viewing | |
det_areas = [boxes_area(boxes_lst[i]) for i in range(start, end)] | |
top_detection = start + det_areas.index(max(det_areas)) | |
image_name = f"{clip_prefix}-{len(clips_list)+1}.jpg" | |
save_jpg((memory[top_detection * search_frames], boxes_lst[top_detection]), | |
image_name, config['image_width']) | |
clips_list.append((clip_name, image_name)) | |
end_time = time.time() | |
print(f"elapsed: {end_time - start_time:.1f}s -- {total_frames/(end_time - start_time):.1f} FPS") | |
return (all_detections, clips_list) | |
def sendEmail(detections, clips): | |
import smtplib | |
import email | |
import email.mime.base | |
import email.mime.text | |
import email.mime.multipart | |
import json | |
mime_type = {'.mp4': ('video', 'mp4'), | |
'.avi': ('video', 'x-msvideo'), # or x-motion-jpeg | |
'.mkv': ('video', 'x-matroska'), | |
'.jpg': ('image', 'jpeg'), | |
'.png': ('image', 'png') | |
} | |
credentials = json.loads(config['smtp_credentials'].read_text()) | |
msg_from = credentials['user'] # assumes 'user' is a valid email address! | |
msg = email.mime.multipart.MIMEMultipart() | |
msg['To'] = credentials['email_to'] | |
msg['From'] = msg_from | |
msg['Subject'] = config['email_subject'] | |
html = ['<!doctype html><html><body>'] | |
html.append(', '.join(sorted(detections)) + ' detected') | |
html.append('<br/>') | |
for video, image in clips: | |
video_id = video + '.' + msg_from | |
image_id = image + '.' + msg_from | |
video_type = '/'.join(mime_type[Path(video).suffix]) | |
html.append(f'<video controls poster="cid:{image_id}">') | |
html.append(f'<source src="cid:{video_id}" type="{video_type}">') | |
html.append(f'<img src="cid:{image_id}"/>') | |
html.append('</video>') | |
html.append('</body></html>') | |
msg.attach(email.mime.text.MIMEText(''.join(html), 'html')) | |
for att in [Path(file) for iter_ in clips for file in iter_]: | |
part = email.mime.base.MIMEBase(*mime_type[att.suffix]) | |
part.add_header('Content-ID', f'<{att}.{msg_from}>') | |
part.add_header('Content-Disposition', 'inline', filename=att.name) | |
part.set_payload(att.read_bytes()) | |
email.encoders.encode_base64(part) | |
msg.attach(part) | |
server = smtplib.SMTP_SSL(credentials['server']) | |
try: | |
# server.set_debuglevel(1) | |
server.login(credentials['user'], credentials['password']) | |
server.sendmail(msg_from, msg['To'], msg.as_string()) | |
except Exception as e: | |
print(e) | |
finally: | |
server.quit() | |
class FileLister: | |
def __init__(self, camera_path): | |
self._db = None | |
self._path = camera_path | |
def files_since(self, timestamp): | |
if args.nodb: | |
files = list(self._path.rglob('*.mkv'))[:-1] # the last one may not be ready yet | |
files = [f for f in files if f.name[:len(last_timestamp)] > last_timestamp] | |
if len(files) > 1: | |
print(len(files), "from glob") | |
return files | |
import sqlite3 | |
if not self._db: | |
self._db = sqlite3.connect('file:' + str(self._path / 'index.db') + '?mode=ro', uri=True) | |
try: | |
cursor = self._db.cursor() | |
cursor.execute( | |
'select r.path, r.filename, b.path, b.filename from blocks as b' + | |
' join recordings as r on recording_id=r.id' + | |
f' where substr(b.filename,1,{len(last_timestamp)}) > "{last_timestamp}"' + | |
' and b.stoptime not null') | |
files = cursor.fetchall() | |
files = [self._path / ('/'.join(row) + '.mkv') for row in files] | |
if len(files) > 1: print(len(files), "from db") | |
return files | |
except sqlite3.Error as e: | |
print("sqlite3 error:", e) | |
self._db.close(); self._db = None | |
return [] | |
def wait_for_more(self): | |
if args.nodb: | |
time.sleep(60) | |
else: | |
time.sleep(3) # if I only had an update hook... | |
model = make_model() | |
if len(args.input_files) > 0: | |
# process individual files, mostly for manual testing | |
for f in args.input_files: | |
detections, clips = processFile(model, Path(f)) | |
if args.send and len(detections) > 0: | |
sendEmail(detections, clips) | |
else: | |
# keep processing files as they become available | |
fl = FileLister(config['camera_path']) | |
checkpoint = Path('./checkpoint') | |
while True: | |
last_timestamp = checkpoint.read_text() if checkpoint.exists() else '20200101_000000' | |
for f in fl.files_since(last_timestamp): | |
detections, clips = processFile(model, f) | |
if len(detections) > 0: | |
sendEmail(detections, clips) | |
checkpoint.write_text(f.name[:len(last_timestamp)]) | |
fl.wait_for_more() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment