Skip to content

Instantly share code, notes, and snippets.

@sam-37
Forked from All4Gis/QgsMultiplexorGs
Created October 11, 2024 10:48
Show Gist options
  • Save sam-37/5787f299ecf1dd0af5c0fef43ffdb945 to your computer and use it in GitHub Desktop.
Save sam-37/5787f299ecf1dd0af5c0fef43ffdb945 to your computer and use it in GitHub Desktop.
Python code for muxing klv and video (MISB) #STANAG #MISB #UAV

Python code for muxing klv and video (MISB)

Folder with data and results https://drive.google.com/file/d/10LA6zWLXn6VraOMvQ15MR7XZGAwLlTU9/view?usp=sharing

Reference project https://github.com/All4Gis/QGISFMV

Known problems ¯\_(ツ)_/¯

  • The video plays but gives a "meta/x-klv" codecs not available warning, in Linux player, How fix?.

  • QGISFMV recognises the telemetry but does not play the video. We can fix this with

    ffmpeg -i DJI_0047/0.0.mp4 -i MISB.mp4 -c copy -map 0:v:0 -map 1:d:0 out.mp4
    

But the telemetry is lost. So what do we do?And insert dummy values...

# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import os
import gi
# Author : Fran Raga , 2021
# proof of concept to ingest KLV telemetry into a video. Multiplexer concept to create a MISB Video.
# Related with : https://github.com/All4Gis/QGISFMV/blob/master/code/manager/QgsMultiplexor.py
# Get Video width/Height
# ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=s=x:p=0 DJI_0047.MP4
# width 3840x height 2160
# Extract all frames in folder
# ffmpeg -i DJI_0047.mp4 -r 30/1 '%d.jpg'
# Get Bitrate
# ffprobe -v error -select_streams v:0 -show_entries stream=bit_rate -of default=noprint_wrappers=1:nokey=1 DJI_0047.mp4
# ffprobe -v error -select_streams v:0 -show_entries stream=bit_rate -of csv=p=0 DJI_0047.mp4
# result/ 1000
# Get Frame rate
# ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate DJI_0047.MP4
# Merge all klv from QGISFMV to use in this code
# import glob
# Fix Video result
# ffmpeg -re -i MISB.mp4 -map 0:v -map 0:d -codec copy fixed.ts
# ffmpeg -i fixed.ts -map 0:v -muxpreload 0 -muxdelay 0 -map 0:d -codec copy newFile.ts
#
# files = []
# for file in glob.glob("/home/fragalop/SamplesFMV/multiplexor/klv/*.klv"):
# files.append(file)
#
# out_data = b''
# for fn in files:
# with open(fn, 'rb') as fp:
# out_data += fp.read()
# with open('/home/fragalop/SamplesFMV/multiplexor/all.klv', 'wb') as fp:
# fp.write(out_data)
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
#vid_file = "/home/fragalop/SamplesFMV/multiplexor/DJI_0047/DJI_0047.mp4"
vid_frame_rate = 30
vid_width = 3840
vid_height = 2160
vid_bitrate = 60012
klv_file = "/home/fragalop/SamplesFMV/multiplexor/all.klv"
img_folder ="/home/fragalop/SamplesFMV/multiplexor/img/"
klv_packet_size = 112 # Extracted from QGISFMV "Create MISB Tool"
out_file = "/home/fragalop/SamplesFMV/multiplexor/MISB.ts"
class muxKlv:
def __init__(self):
self.is_push_buffer_allowed = None
GObject.threads_init()
Gst.init(None)
self.cap = None
self.fh = open(klv_file, 'rb')
self.vid_frame_counter = 1
self.klv_frame_counter = 0
self.inject_klv = 0
self.duration = 1.0 / vid_frame_rate * Gst.SECOND
self.dts = 0
self.fps = int(vid_frame_rate)
# Create GStreamer Pipeline
self.createGstPipeline()
def createGstPipeline(self):
# video source elements
self.vsrc = Gst.ElementFactory.make("appsrc", "vidsrc")
self.vqueue = Gst.ElementFactory.make("queue")
self.vtee = Gst.ElementFactory.make("tee")
# klv source elements
self.appsrc = Gst.ElementFactory.make("appsrc")
self.queue_klv = Gst.ElementFactory.make("queue")
# recording elements
self.queue_record = Gst.ElementFactory.make("queue")
self.vcvt_encoder = Gst.ElementFactory.make("videoconvert")
# encode
self.encoder = Gst.ElementFactory.make("x264enc")
self.muxer = Gst.ElementFactory.make("mpegtsmux")
self.filesink = Gst.ElementFactory.make("filesink")
# configure video element
self.caps_str = "video/x-raw"
self.caps_str += ",format=(string)RGB,width={},height={}".format(vid_width,vid_height)
self.caps_str += ",framerate={}/1".format(int(vid_frame_rate))
self.vcaps = Gst.Caps.from_string(self.caps_str)
self.vsrc.set_property("caps", self.vcaps);
self.vsrc.set_property("format", Gst.Format.TIME)
#self.vsrc.set_property("is-live", True)
self.vsrc.connect("need-data", self.video_need_data)
self.vsrc.connect("enough-data", self.video_enough_data)
# configure appsrc element
self.caps_str = "meta/x-klv"
self.caps_str += ",parsed=True"
self.caps = Gst.Caps.from_string(self.caps_str)
self.appsrc.set_property("caps", self.caps)
self.appsrc.connect("need-data", self.klv_need_data)
self.appsrc.connect("enough-data", self.klv_enough_data)
self.appsrc.set_property("format", Gst.Format.TIME)
#self.appsrc.set_property("is-live", True)
# configure encoder
# self.encoder.set_property("noise-reduction", 1000)
self.encoder.set_property("threads", 4)
self.encoder.set_property("bitrate", vid_bitrate)
self.encoder.set_property("byte-stream", True)
# configure filesink
self.filesink.set_property("location", out_file)
self.filesink.set_property("async", 0)
self.pipeline = Gst.Pipeline()
self.pipeline.add(self.vsrc)
self.pipeline.add(self.vqueue)
self.pipeline.add(self.vtee)
self.pipeline.add(self.appsrc)
self.pipeline.add(self.queue_klv)
self.pipeline.add(self.queue_record)
self.pipeline.add(self.vcvt_encoder)
self.pipeline.add(self.encoder)
self.pipeline.add(self.muxer)
self.pipeline.add(self.filesink)
# link video elements
self.vsrc.link(self.vqueue)
self.vqueue.link(self.vtee)
# link recording elements
self.vtee.link(self.queue_record)
self.queue_record.link(self.vcvt_encoder)
self.vcvt_encoder.link(self.encoder)
self.encoder.link(self.muxer)
self.muxer.link(self.filesink)
# link klv elements
self.appsrc.link(self.queue_klv)
self.queue_klv.link(self.muxer)
def klv_need_data(self, src, length):
print('======================> KLV need data length: %s' % length)
#if self.inject_klv >= vid_frame_rate or self.vid_frame_counter==1:
# KLV Data
klv_bytes = self.fh.read(klv_packet_size)
#print(klv_bytes)
if(klv_bytes==b''):
print("End klv stream")
self.appsrc.emit("end-of-stream")
klvbuf = Gst.Buffer.new_allocate(None, klv_packet_size, None)
klvbuf.fill(0, klv_bytes)
klvbuf.duration = Gst.SECOND
klvbuf.pts = klvbuf.dts = self.dts
#klvbuf.offset = self.dts
retval = self.appsrc.emit("push-buffer", klvbuf)
if retval != Gst.FlowReturn.OK:
print(retval)
self.klv_frame_counter += 1
self.inject_klv = 0
print("klv Frame {}".format(self.klv_frame_counter))
def klv_enough_data(self, src):
print('======================> KLV enough data src: %s' % src)
def video_need_data(self, src, length):
#print('======================> Video need data length: %s' % length)
#print('======================> Video need data src: %s' % src)
# Get Image
tmp_image =img_folder + str(self.vid_frame_counter)+ ".jpg"
# print(tmp_image)
if os.path.isfile(tmp_image):
vid_frame = plt.imread(tmp_image)
data = vid_frame.tostring()
vidbuf = Gst.Buffer.new_allocate(None, len(data), None)
vidbuf.fill(0, data)
#print("Duration video frame {}".format(self.duration/ 1000.0))
#timestamp = (self.vid_frame_counter - 1) * self.duration
#print("TimeStamp video frame {}".format(timestamp/ 1000.0))
vidbuf.duration = self.duration
vidbuf.pts = vidbuf.dts = self.dts
#vidbuf.offset = self.dts
retval = self.vsrc.emit("push-buffer", vidbuf)
print("video frame {}".format(self.vid_frame_counter))
if retval != Gst.FlowReturn.OK:
print(retval)
self.vid_frame_counter += 1
self.inject_klv +=1
# Up dts
self.dts = self.dts + self.duration
else:
print("End Video and KLV Stream")
# Video Stream
self.vsrc.emit("end-of-stream")
# KLV Stream
self.appsrc.emit("end-of-stream")
def video_enough_data(self, src):
print('======================> Video enough data src: %s' % src)
def play(self):
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
raise Exception("Unable to set the pipeline to the playing state")
self.bus = self.pipeline.get_bus()
while True:
msg = self.bus.poll(Gst.MessageType.ANY, Gst.CLOCK_TIME_NONE)
t = msg.type
if t == Gst.MessageType.EOS:
print("EOS")
break
self.pipeline.set_state(Gst.State.NULL)
elif t == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print("Error: %s" % err, debug)
break
elif t == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
print("Warning: %s" % err, debug)
elif t == Gst.MessageType.STATE_CHANGED:
pass
elif t == Gst.MessageType.STREAM_STATUS:
pass
elif t in (Gst.MessageType.LATENCY, Gst.MessageType.NEW_CLOCK):
print("Warning: %s" % msg.src)
else:
pass
print("Unknown message: %s" % msg.src, msg.type)
self.pipeline.set_state(Gst.State.NULL)
print("Wohooo!MISB Created")
#Test Inject klv data
sender = muxKlv()
sender.play()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment