Created
September 19, 2023 18:33
-
-
Save pietrocolombo/0769bdc3cc4e63686461c5ca052d460e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install opencv-python | |
import cv2 | |
import queue | |
import threading | |
import time | |
class Video(threading.Thread): | |
def __init__(self, camera, interval, path) -> None: | |
threading.Thread.__init__(self) | |
self._cam = cv2.VideoCapture(camera) | |
self.frame_width = int(self._cam.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
self.frame_height = int(self._cam.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
self.size = (self.frame_width, self.frame_height) | |
# set camera resolution | |
self._queue_frame = queue.Queue() | |
self._queue_timestamp = queue.Queue() | |
self._interval = interval | |
self.shutdown_flag = threading.Event() | |
self._path = path | |
def run(self): | |
timestamp = time.time() | |
while not self.shutdown_flag.is_set(): | |
ret, frame = self._cam.read() | |
timestamp = time.time() | |
if ret: | |
if self._queue_frame.empty(): | |
self._queue_frame.put(frame) | |
self._queue_timestamp.put(timestamp) | |
elif timestamp - self._queue_timestamp.queue[0] > self._interval: | |
# remove frame | |
self._queue_frame.get() | |
self._queue_timestamp.get() | |
# put frame | |
self._queue_frame.put(frame) | |
self._queue_timestamp.put(timestamp) | |
else: | |
# put frame | |
self._queue_frame.put(frame) | |
self._queue_timestamp.put(time.time()) | |
self._cam.release() | |
self.fps = int(self._queue_frame.qsize() / (timestamp - self._queue_timestamp.queue[0])) | |
print(self.fps) | |
print(self._queue_frame.qsize()) | |
self.save_video() | |
def save_video(self): | |
file = cv2.VideoWriter(self._path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, self.size) | |
while not self._queue_frame.empty(): | |
frame = self._queue_frame.get() | |
file.write(frame) | |
file.release() | |
if __name__ == '__main__': | |
print("Main") | |
video = Video(camera = 0, interval = 30, path = "./test.mp4") | |
print("Start") | |
video.start() | |
print("Started") | |
time.sleep(60) | |
print("After 60 second") | |
video.shutdown_flag.set() | |
print("shutdown flag") | |
video.join() | |
print("After join") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment