Last active
August 14, 2019 17:36
-
-
Save crackwitz/5f48cd37d36d26e42c595383757ec224 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import time | |
import numpy as np | |
import cv2 as cv | |
import threading | |
# also acts (partly) like a cv.VideoCapture | |
class FreshestFrame(threading.Thread): | |
def __init__(self, capture, name='FreshestFrame'): | |
self.capture = capture | |
assert self.capture.isOpened() | |
# this lets the read() method block until there's a new frame | |
self.cond = threading.Condition() | |
# this allows us to stop the thread gracefully | |
self.running = False | |
# keeping the newest frame around | |
self.frame = None | |
# passing a sequence number allows read() to NOT block | |
# if the currently available one is exactly the one you ask for | |
self.latestnum = 0 | |
# this is just for demo purposes | |
self.callback = None | |
super().__init__(name=name) | |
self.start() | |
def start(self): | |
self.running = True | |
super().start() | |
def release(self, timeout=None): | |
self.running = False | |
self.join(timeout=timeout) | |
self.capture.release() | |
def run(self): | |
counter = 0 | |
while self.running: | |
# block for fresh frame | |
(rv, img) = self.capture.read() | |
assert rv | |
counter += 1 | |
# publish the frame | |
with self.cond: # lock the condition for this operation | |
self.frame = img if rv else None | |
self.latestnum = counter | |
self.cond.notify_all() | |
if self.callback: | |
self.callback(img) | |
def read(self, wait=True, seqnumber=None, timeout=None): | |
# with no arguments (wait=True), it always blocks for a fresh frame | |
# with wait=False it returns the current frame immediately (polling) | |
# with a seqnumber, it blocks until that frame is available (if it even needs to wait) | |
# with timeout argument, may return an earlier frame; | |
# may even be (0,None) if nothing received yet | |
with self.cond: | |
if wait: | |
if seqnumber is None: | |
seqnumber = self.latestnum+1 | |
if seqnumber < 1: | |
seqnumber = 1 | |
rv = self.cond.wait_for(lambda: self.latestnum >= seqnumber, timeout=timeout) | |
if not rv: | |
return (self.latestnum, self.frame) | |
return (self.latestnum, self.frame) | |
class Timecode(threading.Thread): | |
def __init__(self, name='Timecode', delay=10): | |
self.delay = delay | |
self.timecode = None | |
self.shape = (10,10) # 64 bits, 8x8 + white border | |
# TODO: orientation marker? upper 8 bits blank? | |
super().__init__(name=name) | |
self.name = name | |
self.start() | |
def _model(self): | |
tc_model = np.float32([ | |
[0,0], # top left | |
[0,1], # bottom left, appears to be the order of findContours | |
[1,1], | |
[1,0], | |
]) | |
tc_model *= self.shape | |
return tc_model | |
def _pattern(self): | |
# black quiet zone | |
# white rectangle | |
# inner part | |
# .... | |
# .. | |
nbits = 64 | |
nrows = 8 | |
ncols = nbits // nrows | |
timestamp = int(time.time() * 1e3) | |
code = timestamp | |
code = linear2gray(timestamp) | |
code_bits = tobits(code, nbits=nbits) | |
code_bits = np.uint8(code_bits) * 255 | |
code_bits.shape = (nrows, ncols) | |
im = np.zeros((4+nrows, 4 + nbits//nrows), dtype=np.uint8) | |
im[1:-1, 1:-1] = 255 # white rectangle | |
im[2:-2, 2:-2] = code_bits | |
return (timestamp, im) | |
def start(self): | |
self.running = True | |
super().start() | |
def release(self, timeout=None): | |
self.running = False | |
self.join(timeout=timeout) | |
def run(self): | |
cv.namedWindow(self.name, cv.WINDOW_NORMAL) | |
while self.running: | |
# show it now, so it's least jittery (synchronize to camera) | |
tc_value, tc_pattern = self._pattern() | |
self.timecode = tc_value | |
tc_resized = upscale(drawscale, tc_pattern) | |
cv.imshow(self.name, tc_resized) | |
key = cv.waitKey(self.delay) | |
if key == 27: | |
self.running = False | |
def linear2gray(num): | |
return num ^ (num >> 1) | |
def gray2linear(num, nbits=64): | |
while nbits >= 2: | |
nbits //= 2 | |
num ^= num >> nbits | |
return num | |
def tobits(num, nbits=64): | |
assert num >= 0 | |
assert num.bit_length() <= nbits | |
result = [(num >> k) & 1 for k in range(nbits)] | |
# little endian, index = position | |
return result | |
def frombits(bits): | |
return sum(bool(d) << p for p,d in enumerate(bits)) | |
def upscale(factor, im): | |
h,w = im.shape[:2] | |
return cv.resize(im, (w*factor, h*factor), interpolation=cv.INTER_NEAREST) | |
def simplify_contour(c): | |
length = cv.arcLength(c, True) | |
approx = cv.approxPolyDP(c, length * 0.01, True) | |
# length changed noticeably? | |
if length > 10 and abs(cv.arcLength(approx, True) / length - 1) > 0.1: | |
return None | |
return approx | |
def refine_corners(contour, image): | |
#return contour | |
contour = cv.cornerSubPix(image, | |
contour.astype(np.float32), | |
(5,5), | |
(1,1), | |
criteria=(cv.TERM_CRITERIA_COUNT*0 | cv.TERM_CRITERIA_EPS, 10, 0.1)) | |
return contour | |
def contour_sense(contour): | |
# sum angles. positive -> clockwise | |
# cross product of successive vectors | |
contour = contour.reshape((-1, 2)).astype(np.float32) | |
vectors = np.roll(contour, -1, axis=0) - contour | |
vectors /= np.linalg.norm(vectors, axis=1).reshape((-1, 1)) | |
crossed = np.arcsin(np.cross(vectors, np.roll(vectors, -1, axis=0))) | |
return crossed.sum() | |
def find_quads(monochrome, mask, minarea=100**2): | |
(contours, _) = cv.findContours(mask, cv.RETR_LIST, cv.CHAIN_APPROX_SIMPLE) | |
contours = [(c,simplify_contour(c)) for c in contours] | |
contours = [(c,s) for c,s in contours if s is not None and len(s) == 4] | |
contours = [(c,s) for c,s in contours if contour_sense(s) < -np.pi] | |
contours = [(c,refine_corners(s, monochrome)) for c,s in contours] | |
contours = [(c,s) for c,s in contours if cv.contourArea(s) >= minarea] | |
contours.sort(key=lambda c: -cv.contourArea(c[1])) | |
return contours | |
largest = max(contours, key=lambda c: cv.contourArea(c)) | |
return largest | |
def rotate_topleft(contour): | |
distances = np.linalg.norm(contour, axis=(1,2)) | |
shift = np.argmin(distances) | |
return np.vstack([ | |
contour[shift:], | |
contour[:shift] | |
]) | |
def fixn(value, shift): | |
factor = 1<<shift | |
if isinstance(value, (tuple, list)): | |
return type(value)(int(round(v * factor)) for v in value) | |
elif isinstance(value, np.ndarray): | |
return tuple(np.round(value * factor).astype(np.int)) | |
elif isinstance(value, (int, float)): | |
return int(round(value * factor)) | |
ft2 = cv.freetype.createFreeType2() | |
ft2.loadFontData("C:\\Windows\\Fonts\\times.ttf", 0) | |
#ft2.loadFontData("C:\\Windows\\Fonts\\consola.ttf", 0) | |
def centeredText(im, text, origin, fontScale, color, thickness, background=None, *args, **kwargs): | |
fontFace = cv.FONT_HERSHEY_SIMPLEX | |
((w,h), baseLine) = cv.getTextSize(text, fontFace, fontScale, thickness) | |
ox,oy = origin | |
if background is not None: | |
cv.rectangle(im, | |
fixn((ox - w/2 - 10, oy - h/2 - 10, w+20, h+20), 4), | |
color=background, | |
thickness=cv.FILLED, lineType=cv.LINE_AA, shift=4) | |
cv.putText(im, | |
text, | |
fixn((ox - w/2, oy + h/2), 0), | |
fontFace, fontScale, color, thickness) | |
#ft2.putText(im, | |
# text, | |
# (ox - w//2, oy + h//2), | |
# fontHeight=fontHeight, | |
# color=color, | |
# thickness=-1, | |
# line_type=cv.LINE_AA, | |
# bottomLeftOrigin=False) | |
pass | |
drawscale = 32 | |
timecode = Timecode() | |
tc_model = timecode._model() | |
tch,tcw = timecode.shape | |
cap = cv.VideoCapture(int(sys.argv[1]) if len(sys.argv) >= 2 else 0) | |
cap.set(cv.CAP_PROP_FPS, 30) | |
capw = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) | |
caph = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) | |
cap = FreshestFrame(cap) | |
cv.namedWindow("camera", cv.WINDOW_NORMAL) # resizable | |
cv.resizeWindow("camera", capw, caph) | |
maxwidgets = 2 | |
alpha = 0.02 | |
meanval = [0.0] * maxwidgets | |
meanerr = [0.0] * maxwidgets | |
while True: | |
if not timecode.running: break | |
if not cap.running: break | |
rv,im = cap.read() | |
frameh,framew = im.shape[:2] | |
now = time.time() | |
if not rv: break | |
# image analysis... | |
monochrome = cv.cvtColor(im, cv.COLOR_BGR2GRAY) | |
th_effective, mask = cv.threshold(monochrome, 0, 255, cv.THRESH_BINARY | cv.THRESH_OTSU) | |
cv.imshow("thresholded", mask) | |
quads = find_quads(monochrome, mask)[:maxwidgets] | |
print(f"{len(quads)} quads") | |
for index,(rawquad,quad) in enumerate(quads): | |
quad = rotate_topleft(quad) | |
quadf = np.float32(quad) | |
quadi = np.int0(quad) | |
for p,q in zip(quad, np.roll(quad, -1, axis=0)): | |
cv.line(im, | |
fixn(p[0], 4), fixn(q[0], 4), | |
color=(255,0,255), | |
thickness=2, | |
lineType=cv.LINE_AA, | |
shift=4 | |
) | |
#cv.drawContours(im, [quadi], -1, (255,0,255), 1, lineType=cv.LINE_AA) | |
#cv.drawContours(im, [quad], -1, (255,0,255), 2) | |
cv.circle(im, fixn(quadf[0,0], 4), fixn(10, 4), (255, 255, 0), thickness=2, lineType=cv.LINE_AA, shift=4) | |
cv.circle(im, fixn(quadf[1,0], 4), fixn(10, 4), (0, 255, 255), thickness=2, lineType=cv.LINE_AA, shift=4) | |
H = cv.getPerspectiveTransform(quadf, tc_model * drawscale) | |
#print(H) | |
if False: | |
straight = cv.warpPerspective(mask, H, (tcw*drawscale, tch*drawscale)) | |
cv.imshow(f"straight {index}", straight) | |
#cv.perspectiveTransform | |
rx = np.arange(tcw) + 0.5 | |
ry = np.arange(tch) + 0.5 | |
(mgx, mgy) = np.meshgrid(rx * drawscale, ry * drawscale) | |
coords = np.dstack([mgx, mgy]) | |
Hinv = np.linalg.inv(H) | |
sample_coords = cv.perspectiveTransform(coords.reshape((-1, 1, 2)).copy(), Hinv).astype(np.float32) | |
sample_coords.shape = (-1, 2) | |
for coord in sample_coords: | |
cv.circle(im, fixn(coord, 4), fixn(2, 4), (0,0,255), thickness=1, lineType=cv.LINE_AA, shift=4) | |
sx = sample_coords[:,0].astype(np.int) | |
sy = sample_coords[:,1].astype(np.int) | |
if (0 <= sx).all() and (sx < framew).all() and (0 <= sy).all() and (sy < frameh).all(): | |
sampled = (mask[sy,sx] > 0) | |
sampled.shape = (tch, tcw) | |
bits = sampled[1:-1, 1:-1].flatten() | |
codevalue = frombits(bits) | |
codevalue = gray2linear(codevalue) | |
timestamp = codevalue * 1e-3 | |
delta = timestamp - now | |
err = abs(delta - meanval[index]) | |
meanval[index] += (delta - meanval[index]) * alpha | |
meanerr[index] += (err - meanerr[index]) * alpha | |
if err > 1.0: | |
meanval[index] = meanerr[index] = 0 | |
print(f"#{index}: {timestamp:.3f} s, delta {delta:+.3f} s") | |
print(f"mean {meanval[index]:.3f}, mean err {meanerr[index]:.3f}") | |
centeredText(im, | |
f"{meanval[index]:.2f}s", | |
quad.mean(axis=(0,1)), | |
1.5, (255,255,255), 3, background=(0, 201, 106)) | |
print() | |
cv.imshow("camera", im) | |
k = cv.waitKey(1) | |
if k == 27: | |
break | |
timecode.release() | |
cap.release() | |
cv.destroyAllWindows() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment