Created
December 24, 2023 17:17
-
-
Save robobe/5e61b0fdaeeea5aa60b865abdb1e72a4 to your computer and use it in GitHub Desktop.
python gst compositor dynamic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi | |
import logging | |
gi.require_version("Gst", "1.0") | |
from gi.repository import GObject, Gst | |
from threading import Thread, Event | |
from dataclasses import dataclass | |
from fastapi import FastAPI | |
log = logging.getLogger(__name__) | |
Gst.init(None) | |
@dataclass | |
class Settings(): | |
name: str | |
pos_x: int | |
pos_y: int | |
class Axis: | |
def __init__(self): | |
pipe = """ | |
compositor name=m \ | |
sink_1::xpos=50 sink_1::ypos=50 sink_1::width=150\ | |
sink_2::xpos=400 sink_2::ypos=50 \ | |
! videoconvert \ | |
! autovideosink \ | |
videotestsrc pattern=1 \ | |
! video/x-raw, format=I420, framerate=5/1, width=740, height=480 \ | |
! m. \ | |
videotestsrc name=right pattern=red \ | |
! video/x-raw, format=I420, framerate=5/1, width=300, height=200 \ | |
! queue \ | |
! m.sink_2 \ | |
videotestsrc name=left pattern=green \ | |
! video/x-raw, format=I420, framerate=5/1, width=300, height=200 \ | |
! queue \ | |
! m.sink_1 | |
""" | |
self.pipeline = Gst.parse_launch(pipe) | |
self.bus = self.pipeline.get_bus() | |
self.mixer = self.pipeline.get_by_name("m") | |
self.settings = { | |
1: Settings("left", pos_x=50, pos_y=50), | |
2: Settings("right", pos_x=400, pos_y=50) | |
} | |
def custom_message(self, name, action, index): | |
if index not in self.settings: | |
log.warning(f"camera / source index not found: {index}") | |
return | |
custom_structure = Gst.Structure.new_empty(name) | |
custom_structure.set_value("index", index) | |
custom_structure.set_value("action", action) | |
custom_message = Gst.Message.new_application(None, custom_structure) | |
self.bus.post(custom_message) | |
def get_mapping(self, index: int) -> str: | |
data = { | |
1: "left", | |
2: "right" | |
} | |
return data[index] | |
def run(self): | |
self.pipeline.set_state(Gst.State.PLAYING) | |
while True: | |
try: | |
message = self.bus.timed_pop(Gst.SECOND) | |
if message == None: | |
pass | |
elif message.type == Gst.MessageType.APPLICATION: | |
data = message.get_structure() | |
if data.get_name().startswith("start"): | |
index = data.get_value("index") | |
action = data.get_string("action") | |
def action_message(pad, info): | |
self.custom_message(action, action, index) | |
pad.remove_probe(info.id) | |
return Gst.PadProbeReturn.OK | |
self.mixer.get_static_pad("src").add_probe( | |
Gst.PadProbeType.BLOCK, action_message | |
) | |
elif message.get_structure().get_name() == "close": | |
index = data.get_value("index") | |
src = self.pipeline.get_by_name(self.settings[index].name) | |
if not src: | |
log.warning(f"source not found: {self.settings[index].name}") | |
continue | |
src.set_state(Gst.State.NULL) | |
mixer_pad = self.mixer.get_static_pad(f"sink_{index}") | |
src_pad = mixer_pad.get_peer() | |
src_pad.unlink(mixer_pad) | |
self.pipeline.remove(src) | |
del src | |
self.mixer.remove_pad(mixer_pad) | |
del mixer_pad | |
elif message.get_structure().get_name() == "do_move": | |
index = data.get_value("index") | |
src = self.pipeline.get_by_name(self.settings[index].name) | |
mixer_pad = self.mixer.get_static_pad(f"sink_{index}") | |
current_x = mixer_pad.get_property("xpos") | |
mixer_pad.set_property("xpos", current_x+20) | |
elif message.get_structure().get_name() == "add": | |
index = data.get_value("index") | |
setting = self.settings[index] | |
src = self.pipeline.get_by_name(self.settings[index].name) | |
if src: | |
log.warning(f"source allready exists: {self.settings[index].name}") | |
continue | |
src = Gst.ElementFactory.make( | |
"videotestsrc", setting.name | |
) | |
src.set_property("pattern", 0) | |
self.pipeline.add(src) | |
mixer_pad = self.mixer.request_pad_simple(f"sink_{index}") | |
mixer_pad.set_property("xpos", setting.pos_x) | |
mixer_pad.set_property("ypos", setting.pos_y) | |
new_src_src_pad = src.get_static_pad("src") | |
new_src_src_pad.link(mixer_pad) | |
src.set_state(Gst.State.PLAYING) | |
elif message.type == Gst.MessageType.EOS: | |
log.warning("EOS") | |
break | |
elif message.type == Gst.MessageType.ERROR: | |
err, debug = message.parse_error() | |
log.error(err) | |
log.error(debug) | |
break | |
except KeyboardInterrupt: | |
break | |
self.pipeline.set_state(Gst.State.NULL) | |
axis = Axis() | |
g_thread = Thread(target=axis.run, daemon=True, name="g_thread") | |
g_thread.start() | |
app = FastAPI() | |
@app.get("/") | |
async def root(): | |
return {"message": "Hello World"} | |
@app.get("/close") | |
async def close(index: int): | |
axis.custom_message("start_close", "close", index) | |
@app.get("/add") | |
async def add(index: int): | |
axis.custom_message("start_add", "add", index) | |
@app.get("/move") | |
async def add(index: int): | |
axis.custom_message("do_move", "move", index) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment