Created
December 29, 2020 19:29
-
-
Save jaltmayerpizzorno/11833af2ef808065b0f638e6eebc764e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2020, Juan Altmayer Pizzorno. All Rights Reserved. MIT license applies. | |
import numpy as np | |
import cv2 | |
import time | |
from pathlib import Path | |
# This is https://github.com/jaltmayerpizzorno/d2k | |
d2k_path = Path(__file__).parent / 'd2k' | |
import sys | |
sys.path.append(str(d2k_path)) | |
import d2k.box | |
config = { | |
'camera_path': Path('/mnt/camera/samba/axis-00408CC5B6D8'), | |
'search_secs': 2.0, # search for detections every N seconds | |
'before_secs': 1.5, # extend clip this long before first detection | |
'after_secs': 1.5, # extend clip this long after last detection | |
'clip_width': 320, # clip width (height is automatic) | |
'clip_format': 'mp4', | |
'clip_codec': 'avc1', | |
'image_width': 640, # "representative" image width (height is automatic) | |
'email_subject': 'entrance camera', | |
'smtp_credentials': (Path(__file__).parent / 'smtp-credentials.json') | |
} | |
def parse_args(): | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('input_files', type=str, nargs='*', help='file(s) to process') | |
parser.add_argument('--send', help='send email', action='store_true') | |
parser.add_argument('--mark', help='mark detections', action='store_true') | |
parser.add_argument('--nodb', help='don\'t use axis\' index.db to find files', action='store_true') | |
parser.add_argument('--nordthread', help='don\'t use reading thread', action='store_true') | |
parser.add_argument('--tflite', help='use TF Lite (requires .tflite model)', action='store_true') | |
g = parser.add_mutually_exclusive_group() | |
g.add_argument('--yolo', help='use YOLOv3', action='store_true') | |
g.add_argument('--ssd', help='use MobileNetV2-SSD (implies --tflite)', action='store_true') | |
g.add_argument('--dummy', help='use dummy detector', action='store_true') | |
g = parser.add_argument_group('yolo', 'YOLO options') | |
g.add_argument('--gpu', help='use GPU', action='store_true') | |
g.add_argument('--float16', help='use float16', action='store_true') | |
g.add_argument('--yolo_cfg', help='YOLO configuration/weights to use', type=str, | |
default=str(d2k_path / 'darknet-files' / 'yolov3.cfg')) | |
g = parser.add_argument_group('tflite', 'TF Lite options') | |
g.add_argument('--tpu', help='use Coral Edge TPU', action='store_true') | |
args = parser.parse_args() | |
args.tflite = args.tflite or args.ssd | |
args.yolo = args.yolo or not args.ssd | |
if args.tflite: | |
if args.gpu: parser.error('--gpu invalid with --tflite') | |
else: | |
if args.tpu: parser.error('--tpu only valid for --tflite') | |
if args.float16 and args.ssd: parser.error('--float16 only valid for --yolo') | |
return args | |
# FIXME load only when needed; refactor into class | |
coco_class_names = (d2k_path / 'darknet-files' / 'coco.names').read_text().splitlines() | |
# Limit detections to classes we are interested in... this cuts down on false positives | |
interesting_classes = {coco_class_names.index(c) for c in ['person', 'bicycle', 'car', 'motorbike', | |
'bus', 'truck', 'cat', 'dog']} | |
def is_interesting(box): | |
"""Returns whether the given YOLO box has an interesting detection.""" | |
return any([(box.classes[i] > 0) for i in interesting_classes]) | |
class DividingLine: | |
"""Implements a linear detection boundary, scaled to the given image dimensions (w,h).""" | |
def __init__(self, img_dim): | |
line_points = np.array([[0, 320], [1920, 820]]) # (x,y) measured on image | |
orig_img_size = np.array([1920, 1080]) # that image's (w, h) | |
line_points = line_points / orig_img_size | |
x = line_points[:,0] | |
y = line_points[:,1] | |
a = (y[1] - y[0]) / (x[1] - x[0]) | |
b = y[0] - a * x[0] | |
self.a = a * img_dim[1]/img_dim[0] | |
self.b = b * img_dim[1] | |
def is_above(self, box): | |
"""Returns whether the bottom center of a box lies below a line""" | |
return (box.y + box.h/2) > (self.a * box.x + self.b) # note y starts at top | |
def bgr2rgb(image): | |
"""Converts an image from BGR to RGB""" | |
return image[...,::-1] # there's also cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
def image_dim(image): | |
"""Given a HxWxC sized image array, returns (W,H) as an array""" | |
return np.array(image.shape[1::-1]) | |
class TfEngine: | |
"""Engine wrapper for TensorFlow""" | |
def __init__(self, model): | |
self.model = model | |
def input_dim(self): | |
return self.model.layers[0].input_shape[0][2:0:-1] | |
def predict(self, image): | |
output = self.model.predict(np.expand_dims(image, axis=0)) | |
return [np.squeeze(o) for o in output] | |
class TfliteEngine: | |
"""Engine wrapper for TensorFlow Lite""" | |
def __init__(self, model_path, use_tpu=False): | |
if use_tpu: | |
import tflite_runtime.interpreter as tflite | |
import platform | |
EDGETPU_SHARED_LIB = { | |
'Linux': 'libedgetpu.so.1', | |
'Darwin': 'libedgetpu.1.dylib' | |
} [platform.system()] | |
model_path = model_path.parent / (model_path.stem + '_edgetpu' + model_path.suffix) | |
if not model_path.exists(): | |
raise FileNotFoundError('Model file ' + str(model_path) + ' doesn\'t exist') | |
self.interpreter = tflite.Interpreter(str(model_path), | |
experimental_delegates=[ | |
tflite.load_delegate(EDGETPU_SHARED_LIB, {})]) | |
else: | |
import tensorflow.lite as tflite | |
self.interpreter = tflite.Interpreter(str(model_path)) | |
self.interpreter.allocate_tensors() | |
def input_dim(self): | |
return self.interpreter.get_input_details()[0]['shape'][2:0:-1] | |
def _copy_input(self, image): | |
tensor = self.interpreter.tensor(self.interpreter.get_input_details()[0]['index'])()[0] | |
tensor.fill(0) # padding, in case image isn't as big as input | |
tensor[:image.shape[0], :image.shape[1]] = image | |
def predict(self, image): | |
self._copy_input(image) | |
self.interpreter.invoke() | |
return [np.squeeze(self.interpreter.tensor(o['index'])()) \ | |
for o in self.interpreter.get_output_details()] | |
class YoloDetector: | |
"""Detects objects using YOLOv3""" | |
def __init__(self, engine): | |
self.engine = engine | |
self.net_dim = engine.input_dim() | |
def detect(self, frame): | |
"""Detects objects on a CV2 frame, returning a list of d2k compatible boxes.""" | |
import d2k | |
image = bgr2rgb(frame).astype(np.float32) / 255.0 | |
image = d2k.image.letterbox(image, *self.net_dim) | |
output = self.engine.predict(image) | |
return d2k.network.boxes_from_output(output, self.net_dim, image_dim(frame), thresh=.8) | |
@staticmethod | |
def model_file(path): | |
return path / 'yolov3.tflite' | |
class MobilenetSsdDetector: | |
"""Detects objects using MobileNetv2-SSD""" | |
def __init__(self, engine): | |
self.engine = engine | |
self.net_dim = engine.input_dim() | |
def detect(self, frame): | |
"""Detects objects on a CV2 frame, returning a list of d2k compatible boxes.""" | |
img_dim = image_dim(frame) | |
letterbox_scale = min(self.net_dim / img_dim) | |
image = bgr2rgb(cv2.resize(frame, tuple((img_dim * letterbox_scale).astype(int)))) | |
ssd_boxes, class_ids, scores, count = self.engine.predict(image) | |
scale_adjust = self.net_dim / letterbox_scale | |
def make_box(i): | |
ymin, xmin, ymax, xmax = ssd_boxes[i] | |
w, h = (xmax - xmin, ymax - ymin) | |
x, y = (xmin + w/2, ymin + h/2) | |
x, y = (x, y) * scale_adjust | |
w, h = (w, h) * scale_adjust | |
return d2k.box.Box(x, y, w, h, scores[i], | |
[scores[i] if class_ids[i] == j else 0. for j in range(len(coco_class_names))]) | |
return [make_box(i) for i in range(int(count)) if scores[i] >= .5] | |
@staticmethod | |
def model_file(path): | |
return path / 'ssd_mobilenet_v2_coco_quant_postprocess.tflite' | |
class DummyDetector: | |
"""Dummy detector to facilitate measuring non-detection overhead""" | |
def __init__(self): | |
pass | |
def detect(self, frame): | |
return [] | |
class FrameDetections: | |
"""Holds a frame and its detections.""" | |
def __init__(self, frame, boxes): | |
self.frame = frame | |
self.boxes = boxes | |
self.area = sum([b.w * b.h for b in boxes]) | |
def classes(self): | |
"""Returns the set of classes in these detections.""" | |
return {coco_class_names[i] for b in self.boxes for i, c in enumerate(b.classes) if c > 0.} | |
def __lt__(self, other): | |
"""Defines how we'd like our detections sorted""" | |
assert isinstance(other, FrameDetections) | |
return (len(self.boxes), self.area) < (len(other.boxes), other.area) | |
def write_jpg(self, filename, x_dim, mark=False): | |
"""Writes these detections' frame as a JPG, resizing and optionally marking the boxes""" | |
from PIL import Image | |
image = Image.fromarray(bgr2rgb(self.frame)) | |
if mark: | |
d2k.box.draw_boxes(image, self.boxes, names=coco_class_names) | |
image = image.resize((x_dim, int(image.height/image.width*x_dim))) | |
image.save(filename, "jpeg") | |
class VideoReader: | |
"""Reads from a video file; The end of the video is indicated by a 'None' frame.""" | |
def __init__(self, file): | |
self.cap = cv2.VideoCapture(str(file)) | |
if not self.cap.isOpened(): | |
raise FileNotFoundError | |
self.pos = 0 | |
def __del__(self): | |
self.cap.release() | |
def _fill(self): | |
"""Internal function to fill the queue""" | |
while True: | |
ret, frame = self.cap.read() | |
if not ret: break | |
self.q.put(frame) | |
self.q.put(None) # end marker | |
def read(self): | |
ret, frame = self.cap.read() | |
if not ret: return None | |
self.pos += 1 | |
return frame | |
def position(self): | |
return self.pos | |
def get(self, propName): | |
"""Returns a property of the underlying video capture stream""" | |
return self.cap.get(propName) | |
class ThreadedVideoReader(VideoReader): | |
"""Reads from a video file by wrapping an OpenCV VideoCapture object and reading | |
from it in another thread. The end of the video is indicated by a 'None' frame.""" | |
def __init__(self, file): | |
import queue | |
import threading | |
super().__init__(file) | |
self.q = queue.SimpleQueue() | |
threading.Thread(target=self._fill, daemon=True).start() | |
def _fill(self): | |
"""Internal function to fill the queue""" | |
while True: | |
ret, frame = self.cap.read() | |
if not ret: break | |
self.q.put(frame) | |
self.q.put(None) # end marker | |
def read(self): | |
"""Reads a frame; returns None at the end of the file""" | |
if self.q: | |
frame = self.q.get() | |
if frame is None: | |
self.q = None | |
else: | |
self.pos += 1 | |
return frame | |
def processFile(detector, file): | |
"""Processes a video file, returning a tuple (names, clips) of a list of the detected class names | |
and a list of the generated clips/images""" | |
print("processing", file) | |
start_time = time.time() | |
video = VideoReader(file) if args.nordthread else ThreadedVideoReader(file) | |
video_fps = video.get(cv2.CAP_PROP_FPS) | |
video_dims = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))) | |
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
clip_dims = (config['clip_width'], int(video_dims[1]/video_dims[0]*config['clip_width'])) | |
# search for detections every N frames | |
search_frames = int(config['search_secs'] * video_fps) | |
clips_list = [] | |
clip_prefix = '_'.join(str(file.name).split('_')[:2]) | |
all_detected_classes = set() | |
memory_secs = 20 | |
assert memory_secs >= config['before_secs'] | |
assert memory_secs >= config['after_secs'] | |
MEM_SIZE = int(memory_secs*video_fps) | |
memory = [None] * MEM_SIZE | |
dividing_line = DividingLine(video_dims) # below the line it's "in", otherwise "out" | |
def detect(frame): | |
boxes = d2k.box.nms_boxes(detector.detect(frame), iou_thresh=.5) | |
for b in [b for b in boxes if not dividing_line.is_above(b)]: | |
print({coco_class_names[i] for i, c in enumerate(b.classes) if c > 0.}, 'ignored') | |
return FrameDetections(frame, [b for b in boxes if is_interesting(b) and dividing_line.is_above(b)]) | |
while True: | |
frame = video.read() | |
if frame is None: break | |
if sys.stdout.isatty(): | |
print(f"{video.position()/total_frames*100:.1f}%", end='\r') | |
memory[(video.position()-1) % MEM_SIZE] = frame | |
if (video.position() % search_frames) == 0: | |
detection = detect(frame) | |
if not detection.boxes: continue | |
detected_classes = detection.classes() | |
top_detection = detection | |
clip_name = Path(f"{clip_prefix}-{len(clips_list)+1}.{config['clip_format']}") | |
print(f"writing {clip_name}", detected_classes) | |
out = cv2.VideoWriter(str(clip_name), cv2.VideoWriter_fourcc(*config['clip_codec']), | |
video_fps, clip_dims) | |
for i in range(max(0, video.position() - int(config['before_secs'] * video_fps)), video.position()): | |
out.write(cv2.resize(memory[i % MEM_SIZE], clip_dims)) | |
next_write = video.position() # next frame to write, one after last detection | |
while (video.position() - next_write) < MEM_SIZE: | |
frame = video.read() | |
if frame is None: break | |
if sys.stdout.isatty(): | |
print(f"{video.position()/total_frames*100:.1f}%", end='\r') | |
memory[(video.position()-1) % MEM_SIZE] = frame | |
if (video.position() % search_frames) == 0: | |
detection = detect(frame) | |
if detection.boxes: | |
detected_classes |= detection.classes() | |
if (detection > top_detection): | |
top_detection = detection | |
for i in range(next_write, video.position()): | |
out.write(cv2.resize(memory[i % MEM_SIZE], clip_dims)) | |
next_write = video.position() | |
for i in range(next_write, min(next_write + int(config['after_secs'] * video_fps), video.position())): | |
out.write(cv2.resize(memory[i % MEM_SIZE], clip_dims)) | |
out.release() | |
new_clip_name = str(clip_name.parent / (clip_name.stem + '-' + '-'.join(sorted(detected_classes)) \ | |
+ clip_name.suffix)) | |
clip_name.rename(new_clip_name) | |
all_detected_classes |= detected_classes | |
image_name = clip_name.stem + ".jpg" | |
top_detection.write_jpg(image_name, config['image_width'], mark=args.mark) | |
clips_list.append((new_clip_name, image_name)) | |
end_time = time.time() | |
print(f"elapsed: {end_time - start_time:.1f}s -- {total_frames/(end_time - start_time):.1f} FPS") | |
return (all_detected_classes, clips_list) | |
def sendEmail(detections, clips): | |
"""Sends (submits) and email to notify of the detections""" | |
import smtplib | |
import email | |
import email.mime.base | |
import email.mime.text | |
import email.mime.multipart | |
import json | |
mime_type = {'.mp4': ('video', 'mp4'), | |
'.avi': ('video', 'x-msvideo'), # or x-motion-jpeg | |
'.mkv': ('video', 'x-matroska'), | |
'.jpg': ('image', 'jpeg'), | |
'.png': ('image', 'png') | |
} | |
credentials = json.loads(config['smtp_credentials'].read_text()) | |
msg_from = credentials['user'] # assumes 'user' is a valid email address! | |
msg = email.mime.multipart.MIMEMultipart() | |
msg['To'] = credentials['email_to'] | |
msg['From'] = msg_from | |
msg['Subject'] = config['email_subject'] | |
html = ['<!doctype html><html><body>'] | |
html.append(', '.join(sorted(detections)) + ' detected') | |
html.append('<br/>') | |
for video, image in clips: | |
video_id = video + '.' + msg_from | |
image_id = image + '.' + msg_from | |
video_type = '/'.join(mime_type[Path(video).suffix]) | |
html.append(f'<video controls poster="cid:{image_id}">') | |
html.append(f'<source src="cid:{video_id}" type="{video_type}">') | |
html.append(f'<img src="cid:{image_id}"/>') | |
html.append('</video>') | |
html.append('</body></html>') | |
msg.attach(email.mime.text.MIMEText(''.join(html), 'html')) | |
for att in [Path(file) for iter_ in clips for file in iter_]: | |
part = email.mime.base.MIMEBase(*mime_type[att.suffix]) | |
part.add_header('Content-ID', f'<{att}.{msg_from}>') | |
part.add_header('Content-Disposition', 'inline', filename=att.name) | |
part.set_payload(att.read_bytes()) | |
email.encoders.encode_base64(part) | |
msg.attach(part) | |
server = smtplib.SMTP_SSL(credentials['server']) | |
try: | |
# server.set_debuglevel(1) | |
server.login(credentials['user'], credentials['password']) | |
server.sendmail(msg_from, msg['To'], msg.as_string()) | |
except Exception as e: | |
print(e) | |
finally: | |
server.quit() | |
class FileLister: | |
"""Looks for files to process saved by an Axis Q6035-E camera""" | |
def __init__(self, camera_path, nodb=False): | |
self._db = None | |
self._path = camera_path | |
self._nodb = nodb | |
def files_since(self, timestamp): | |
if self._nodb: | |
files = list(self._path.rglob('*.mkv'))[:-1] # the last one may not be ready yet | |
files = [f for f in files if f.name[:len(last_timestamp)] > last_timestamp] | |
if len(files) > 1: | |
print(len(files), "from glob") | |
return files | |
import sqlite3 | |
if not self._db: | |
self._db = sqlite3.connect('file:' + str(self._path / 'index.db') + '?mode=ro', uri=True) | |
try: | |
cursor = self._db.cursor() | |
cursor.execute( | |
'select r.path, r.filename, b.path, b.filename from blocks as b' + | |
' join recordings as r on recording_id=r.id' + | |
f' where substr(b.filename,1,{len(last_timestamp)}) > "{last_timestamp}"' + | |
' and b.stoptime not null') | |
files = cursor.fetchall() | |
files = [self._path / ('/'.join(row) + '.mkv') for row in files] | |
if len(files) > 1: print(len(files), "from db") | |
return files | |
except sqlite3.Error as e: | |
print("sqlite3 error:", e) | |
self._db.close(); self._db = None | |
return [] | |
def wait_for_more(self): | |
if self._nodb: | |
time.sleep(60) | |
else: | |
time.sleep(3) # if I only had an update hook... | |
if __name__ == '__main__': | |
args = parse_args() | |
if args.dummy: | |
detector = DummyDetector() | |
elif args.tflite: | |
tfl_path = Path(__file__).parent / 'tflite' | |
if args.ssd: | |
detector = MobilenetSsdDetector(TfliteEngine(MobilenetSsdDetector.model_file(tfl_path), | |
use_tpu=args.tpu)) | |
else: | |
detector = YoloDetector(TfliteEngine(YoloDetector.model_file(tfl_path), use_tpu=args.tpu)) | |
else: | |
import tensorflow as tf | |
if not args.gpu: | |
tf.config.experimental.set_visible_devices([], 'GPU') | |
else: | |
gpu = tf.config.experimental.list_physical_devices('GPU') | |
if tf.version.VERSION[:4] == '2.2.': | |
tf.config.experimental.set_memory_growth(gpu[0], True) # works around bug in TF 2.2 | |
if args.float16: | |
tf.keras.backend.set_floatx('float16') | |
import d2k | |
yolo_cfg = Path(args.yolo_cfg) | |
network = d2k.network.load(yolo_cfg.read_text()) | |
network.read_darknet_weights((yolo_cfg.parent / (yolo_cfg.stem + '.weights')).read_bytes()) | |
detector = YoloDetector(TfEngine(network.make_model())) | |
if len(args.input_files) > 0: | |
# process individual files, mostly for manual testing | |
for f in args.input_files: | |
detections, clips = processFile(detector, Path(f)) | |
if args.send and len(detections) > 0: | |
sendEmail(detections, clips) | |
else: | |
# keep processing files as they become available | |
fl = FileLister(config['camera_path'], nodb=args.nodb) | |
checkpoint = Path('./checkpoint') | |
while True: | |
last_timestamp = checkpoint.read_text() if checkpoint.exists() else '20200101_000000' | |
for f in fl.files_since(last_timestamp): | |
detections, clips = processFile(detector, f) | |
if len(detections) > 0: | |
sendEmail(detections, clips) | |
checkpoint.write_text(f.name[:len(last_timestamp)]) | |
if sys.stdout.isatty(): | |
import datetime | |
print(datetime.datetime.now().strftime("%H:%M:%S..."), end='\r') | |
else: | |
sys.stdout.flush() | |
fl.wait_for_more() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment