Last active
December 9, 2016 16:42
-
-
Save SamCarlberg/91db7fc8a15377ca2a69553aae69e010 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
from vision_api import * | |
class StubPipeline(VisionPipeline): | |
def __init__(self): | |
self.output = None | |
def process(self, image: np.ndarray) -> None: | |
self.output = image # Sets the 'output' | |
class MyListener(PipelineListener): | |
def on_pipeline_ran(pipeline: StubPipeline) -> None: | |
print(pipeline.output) # Do something with the 'output' | |
if __name__ == '__main__' | |
# Set up OpenCV capture and the runner | |
cap = cv2.VideoCapture(0) | |
pipeline = StubPipeline() | |
listener = MyListener() | |
runner = VisionRunner(cap, pipeline, listener) | |
thread = VisionThread(runner) | |
thread.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from threading import Thread | |
class VisionPipeline: | |
""" | |
A vision pipeline runs a series of OpenCV algorithms to extract data from an image. | |
This is intended to be subclassed; calling process() directly will throw an error | |
""" | |
def process(self, image: np.ndarray) -> None: | |
""" | |
Processes an image and extracts data from it | |
:param image: the image to process | |
:return: None | |
""" | |
raise NotImplementedError | |
class PipelineListener: | |
""" | |
A callback type that runs after a pipeline runs. This can be used to do additional | |
image processing if the pipeline doesn't do everything (e.g. GRIP-generated code). | |
This is intended to be subclassed; calling on_pipeline_ran directly will throw an error. | |
""" | |
def on_pipeline_ran(self, pipeline: VisionPipeline) -> None: | |
""" | |
Called when a pipeline finishes running | |
:param pipeline: the pipeline that ran | |
:return: None | |
""" | |
raise NotImplementedError | |
class VisionRunner: | |
""" | |
A vision runner is a convenient wrapper object to make it easy to run an OpenCV pipeline | |
from user code. The easiest way to use it is to run it in a VisionThread and use the listener | |
to take snapshots of the pipelines outputs. | |
""" | |
def __init__(self, cap, pipeline: VisionPipeline, listener: PipelineListener): | |
""" | |
:param cap: the OpenCV video capture to use to grab images from e.g. cv2.VideoCapture(...) | |
:param pipeline: the pipeline to run | |
:param listener: the listener to call after the pipeline runs | |
""" | |
self.__cap = cap | |
self.__pipeline = pipeline | |
self.__listener = listener | |
def run_once(self) -> None: | |
""" | |
Runs the pipeline once, or not at all if the OpenCV video capture didn't get an image. | |
:return: None | |
""" | |
got_frame, frame = self.__cap.read() | |
if got_frame: | |
self.__pipeline.process(frame) | |
self.__listener.on_pipeline_ran(self.__pipeline) | |
def run_forever(self) -> None: | |
""" | |
Continuously runs the pipeline by processing the most recent images from the OpenCV video capture. | |
:return: None (does not return unless there was an error) | |
""" | |
while True: | |
self.run_once() | |
class VisionThread(Thread): | |
""" | |
A vision thread is a special daemon thread that runs a pipeline. | |
""" | |
def __init__(self, runner: VisionRunner): | |
Thread.__init__(self, name="Vision Thread", target=runner.run_forever) | |
self.setDaemon(True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment