Last active
October 27, 2020 23:20
-
-
Save hshimamoto/6d1a607c1025b7f1d9a4e3a241faa909 to your computer and use it in GitHub Desktop.
Python OpenCV Multiple process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: set sw=2 sts=2 expandtab: | |
# ref https://qiita.com/kakinaguru_zo/items/eda129635816ad871e9d | |
import cv2 | |
import sys | |
import time | |
import numpy | |
import socket | |
import multiprocessing | |
import multiprocessing.sharedctypes | |
if len(sys.argv) == 1: | |
print("cam opts...") | |
sys.exit() | |
cam = sys.argv[1] | |
sock = None | |
if len(sys.argv) > 2: | |
sock = sys.argv[2] | |
socks = [] | |
for path in sys.argv[2:]: | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
try: | |
sock.connect(path) | |
socks.append(sock) | |
except: | |
print("connect failed: " + path) | |
def capture_process(src, buf, size, req, ready, quit): | |
cap = cv2.VideoCapture(src) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
delay = 1 / fps | |
while not quit.is_set(): | |
try: | |
cap_start = time.time() | |
ret, frame = cap.read() | |
if ret is False: | |
raise IOError | |
if req.is_set(): | |
ready.clear() | |
frame = cv2.resize(frame, size) | |
#frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
memoryview(buf).cast('B')[:] = memoryview(frame).cast('B')[:] | |
ready.set() | |
req.clear() | |
while True: | |
elasped = time.time() - cap_start | |
if elasped >= delay: | |
break | |
time.sleep(delay - elasped) | |
except KeyboardInterrupt: | |
break | |
print("done") | |
cap.release() | |
if __name__ == "__main__": | |
width = 640 | |
height = 360 | |
buf = multiprocessing.sharedctypes.RawArray('B', width * height * 3) | |
req = multiprocessing.Event() | |
ready = multiprocessing.Event() | |
quit = multiprocessing.Event() | |
cap_proc = multiprocessing.Process(target=capture_process, args=(cam, buf, (width, height), req, ready, quit)) | |
cap_proc.start() | |
image = numpy.empty((height, width, 3), dtype=numpy.uint8) | |
lasttime = time.time() | |
while True: | |
try: | |
# request to copy | |
req.set() | |
# and wait | |
ready.wait() | |
image[:,:,:] = numpy.reshape(buf, (height, width, 3)) | |
curr = time.time() | |
# show fps | |
elapsed = curr - lasttime | |
s = "{:.3f}FPS / {:.3f}ms".format(1.0/elapsed, elapsed*1000) | |
cv2.putText(image, s, | |
(16, 16), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA) | |
lasttime = curr | |
cv2.imshow("test", image) | |
for s in socks: | |
s.send(buf) | |
if cv2.waitKey(1) & 0xff == 27: | |
break | |
for s in socks: | |
r = s.recv(1) | |
if len(r) == 0: | |
socks.remove(s) | |
except KeyboardInterrupt: | |
break | |
quit.set() | |
cv2.destroyWindow("test") | |
cap_proc.join(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: set sw=2 sts=2 expandtab: | |
# ref. https://towardsdatascience.com/face-detection-in-2-minutes-using-opencv-python-90f89d7c0f81 | |
# curl -vLO https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml | |
import cv2 | |
import sys | |
import os | |
import numpy | |
import socket | |
import time | |
if len(sys.argv) == 1: | |
print("face sock") | |
sys.exit() | |
class Server: | |
def __init__(self, path): | |
self._path = path | |
def start(self): | |
s = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
s.bind(self._path) | |
s.listen(1) | |
print("start listening") | |
try: | |
while True: | |
conn, addr = s.accept() | |
self.handle(conn) | |
finally: | |
os.remove(self._path) | |
def handle(self, conn): | |
# face detection | |
face_cascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml") | |
width = 640 | |
height = 360 | |
image = numpy.empty((height, width, 3), dtype=numpy.uint8) | |
lasttime = time.time() | |
while True: | |
data = b"" | |
sz = width * height * 3 | |
while len(data) < sz: | |
r = conn.recv(sz - len(data)) | |
if len(r) == 0: | |
raise IOError | |
data += r | |
image[:,:,:] = numpy.reshape(bytearray(data), (height, width, 3), order='C') | |
# gray for face detection | |
scale = 2 | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
gray = cv2.resize(gray, (width//scale, height//scale)) | |
faces = face_cascade.detectMultiScale(gray, 1.1, 4) | |
for (x, y, w, h) in faces: | |
x *= scale | |
y *= scale | |
w *= scale | |
h *= scale | |
print(x, y, w, h) | |
image = cv2.rectangle(image, (x, y), (x+w, y+h), (255, 0, 0), 2) | |
curr = time.time() | |
conn.send(b" ") | |
# show fps | |
elapsed = curr - lasttime | |
s = "{:.3f}FPS / {:.3f}ms".format(1.0/elapsed, elapsed*1000) | |
cv2.putText(image, s, | |
(16, 16), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA) | |
lasttime = curr | |
cv2.imshow("face", image) | |
cv2.waitKey(1) | |
if __name__ == "__main__": | |
serv = Server(sys.argv[1]) | |
serv.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: set sw=2 sts=2 expandtab: | |
# https://storage.googleapis.com/download.tensorflow.org/models/tflite/coco_ssd_mobilenet_v1_1.0_quant_2018_06_29.zip | |
import cv2 | |
import sys | |
import os | |
import numpy | |
import socket | |
import time | |
import tensorflow as tf | |
if len(sys.argv) == 1: | |
print("obj sock") | |
sys.exit() | |
class Server: | |
def __init__(self, path): | |
self._path = path | |
# load labels | |
with open("labelmap.txt") as f: | |
f.readline() # skip first | |
self._labels = [l.strip() for l in f.readlines()] | |
# load and create interpreter | |
self._interp = tf.lite.Interpreter(model_path="detect.tflite") | |
self._input_details = self._interp.get_input_details() | |
self._output_details = self._interp.get_output_details() | |
self._interp.allocate_tensors() | |
self._fmodel = (self._input_details[0]['dtype'] == numpy.float32) | |
self._in_h = self._input_details[0]['shape'][1] | |
self._in_w = self._input_details[0]['shape'][2] | |
print(self._in_h, self._in_w) | |
def start(self): | |
s = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
s.bind(self._path) | |
s.listen(1) | |
print("start listening") | |
try: | |
while True: | |
conn, addr = s.accept() | |
self.handle(conn) | |
finally: | |
os.remove(self._path) | |
def handle(self, conn): | |
width = 640 | |
height = 360 | |
image = numpy.empty((height, width, 3), dtype=numpy.uint8) | |
lasttime = time.time() | |
while True: | |
data = b"" | |
sz = width * height * 3 | |
while len(data) < sz: | |
r = conn.recv(sz - len(data)) | |
if len(r) == 0: | |
raise IOError | |
data += r | |
image[:,:,:] = numpy.reshape(bytearray(data), (height, width, 3), order='C') | |
# resize to 512x512 | |
xs = width / self._in_w | |
ys = height / self._in_h | |
resized = cv2.resize(image, (self._in_w, self._in_h)) | |
input_data = numpy.expand_dims(resized, axis=0) | |
if self._fmodel: | |
input_data = (numpy.float32(input_data) - 127.5) / 127.5 | |
self._interp.set_tensor(self._input_details[0]['index'], input_data) | |
self._interp.invoke() | |
boxes = self._interp.get_tensor(self._output_details[0]['index'])[0] | |
classes = self._interp.get_tensor(self._output_details[1]['index'])[0] | |
scores = self._interp.get_tensor(self._output_details[2]['index'])[0] | |
for i in range(len(scores)): | |
score = scores[i] | |
if score < 0.7: | |
continue | |
label = self._labels[int(classes[i])] | |
y0 = int(max(1, boxes[i][0] * height)) | |
x0 = int(max(1, boxes[i][1] * width)) | |
y1 = int(min(height, boxes[i][2] * height)) | |
x1 = int(min(width, boxes[i][3] * width)) | |
print("label {} {} ({}, {})-({}, {})".format(label, scores[i], x0, y0, x1, y1)) | |
cv2.rectangle(image, (x0, y0), (x1, y1), (255, 0, 0), 2) | |
curr = time.time() | |
conn.send(b" ") | |
# show fps | |
elapsed = curr - lasttime | |
s = "{:.3f}FPS / {:.3f}ms".format(1.0/elapsed, elapsed*1000) | |
cv2.putText(image, s, | |
(16, 16), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA) | |
lasttime = curr | |
cv2.imshow("obj", image) | |
cv2.waitKey(1) | |
if __name__ == "__main__": | |
serv = Server(sys.argv[1]) | |
serv.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment