
From a basic OpenCV script to a robust GStreamer-based solution

Written by
Rubén GonzálezMarch 19, 2025
Artificial Intelligence (AI) is transforming the multimedia industry, driving innovation in real-time video analytics, augmented reality, and beyond. However, deploying these AI capabilities into production environments presents unique challenges and is far less glamorous. This article highlights how OpenCV and GStreamer can be utilized to build scalable, production-ready pipelines for AI-driven multimedia processing.
We’ll start with a basic OpenCV script and progress to a GStreamer-based solution capable of handling real-world production demands. The goal is to demonstrate how to scale from simple prototyping to production-grade pipelines using custom plugins and metadata integration.
Example 1: OpenCV and GStreamer for Input/Output
Overview
The first approach combines OpenCV and GStreamer. OpenCV handles frame manipulation and overlays, while GStreamer provides input/output (I/O) capabilities like reading from a camera or streaming to a remote endpoint.
Key Features:
- OpenCV for AI logic: A dummy
ML_magic
function simulates an AI model that identifies regions of interest (ROIs). - GStreamer for Video I/O: GStreamer captures video and output processed streams.
Code:
import cv2
import random
import time
def ML_magic(frame):
"""
Dummy function to emulate ML process that takes a low resolution frame as input
and returns a region of interest center point.
"""
# Always return a dummy point (e.g., center of the resized frame)
x = random.randint(0, frame.shape[1])
y = random.randint(0, frame.shape[0])
return x, y
# GStreamer pipeline for capturing video from the camera
gstreamer_input = (
"videotestsrc ! " # "v4l2src device=/dev/video0 ! " # Linux camera device (adjust if using Windows or Mac)
"video/x-raw, width=640, height=480, framerate=30/1 ! "
"videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 ! appsink sync=false"
)
# OpenCV VideoCapture using GStreamer
cap = cv2.VideoCapture(gstreamer_input, cv2.CAP_GSTREAMER)
if not cap.isOpened():
print("Error: Cannot open video capture using GStreamer")
exit()
# GStreamer pipeline for streaming to RTMP
gstreamer_output = """
appsrc ! video/x-raw, width=640, height=480, framerate=30/1 !
videoconvert !
x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast !
flvmux streamable=true ! udpsink sync=false
"""
# Set up VideoWriter with the GStreamer pipeline
out = cv2.VideoWriter(
gstreamer_output,
cv2.VideoWriter_fourcc(*"X264"), # FourCC code for H.264 encoding
30, # FPS
(640, 480), # Frame size
True, # Is color
)
# Check if VideoWriter opened successfully
if not out.isOpened():
print("Error: Cannot open VideoWriter with GStreamer pipeline")
cap.release()
exit()
start_time = time.time()
try:
num_frame = 0
while num_frame < 100:
num_frame += 1
ret, frame = cap.read()
if not ret:
print("Error: Failed to capture video frame")
break
# Resize the frame to 100x100
frame_resized = cv2.resize(frame, (100, 100))
# Call the ML_magic function
x, y = ML_magic(frame_resized)
x_resized = int(x / 100 * frame.shape[1])
y_resized = int(y / 100 * frame.shape[0])
# Draw a bounding box on the original frame
cv2.rectangle(
frame,
(x_resized - 50, y_resized - 50),
(x_resized + 50, y_resized + 50),
(0, 255, 0),
2,
)
# Optionally display the frame
## - cv2.imshow("Camera Feed", frame)
out.write(frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
except KeyboardInterrupt:
print("Streaming stopped by user.")
finally:
cap.release()
out.release()
cv2.destroyAllWindows()
# Log total video processing time
total_time = time.time() - start_time
print(f"Total wall-clock processing time for the video: {total_time:.2f} seconds")
Limitations:
- Scaling Bottlenecks: OpenCV handles frame resizing, which can become inefficient.
- Static Pipelines: Limited flexibility for dynamic configurations.
Example 2: Creating a GStreamer Plugin
Overview
Encapsulating the logic in a GStreamer plugin makes the pipeline modular and reusable. By embedding Python AI code into a custom GStreamer element, we simplify integration with other GStreamer components.
Key Features:
- Custom GStreamer Element: The plugin processes frames and adds overlays using OpenCV.
- Pipeline Integration: Easily integrates into GStreamer pipelines for scalability.
Code:
import gi
import time
import cv2
import numpy
import random
gi.require_version("Gst", "1.0")
gi.require_version("GstBase", "1.0")
from gi.repository import Gst, GstBase, GLib, GObject # noqa: E402
def ML_magic(frame):
"""
Dummy function to emulate ML process that takes a low resolution frame as input
and returns a region of interest center point.
"""
# Always return a dummy point (e.g., center of the resized frame)
x = random.randint(0, frame.shape[1])
y = random.randint(0, frame.shape[0])
return x, y
Gst.init(None)
Gst.init_python()
SRC_CAPS = Gst.Caps(
Gst.Structure(
"video/x-raw",
format="RGB",
width=Gst.IntRange(range(1, GLib.MAXINT)),
height=Gst.IntRange(range(1, GLib.MAXINT)),
framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
)
)
SINK_CAPS = Gst.Caps(
Gst.Structure(
"video/x-raw",
format="RGB",
width=Gst.IntRange(range(1, GLib.MAXINT)),
height=Gst.IntRange(range(1, GLib.MAXINT)),
framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
)
)
SRC_PAD_TEMPLATE = Gst.PadTemplate.new(
"src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, SRC_CAPS
)
SINK_PAD_TEMPLATE = Gst.PadTemplate.new(
"sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, SINK_CAPS
)
class SampleFilter(GstBase.BaseTransform):
__gstmetadata__ = (
"OpenCV Filter",
"Filter",
"OpenCV Sample Filter",
"Ruben Gonzaelz <rgonzalez@fluendo.com>",
)
__gsttemplates__ = (SRC_PAD_TEMPLATE, SINK_PAD_TEMPLATE)
def __init__(self):
super().__init__()
self.set_qos_enabled(True)
def do_set_caps(self, incaps, outcaps):
s = incaps.get_structure(0)
self.width = s.get_int("width").value
self.height = s.get_int("height").value
return True
def do_transform_ip(self, inbuf):
try:
inbuf_info = inbuf.map(Gst.MapFlags.READ | Gst.MapFlags.WRITE)
with inbuf_info:
frame = numpy.ndarray(
shape=(self.height, self.width, 3),
dtype=numpy.uint8,
buffer=inbuf_info.data,
)
# Resize the frame to 100x100
frame_resized = cv2.resize(frame, (100, 100))
# Call the ML_magic function
x, y = ML_magic(frame_resized)
x_resized = int(x / 100 * frame.shape[1])
y_resized = int(y / 100 * frame.shape[0])
# Draw a bounding box on the original frame
cv2.rectangle(
frame,
(x_resized - 50, y_resized - 50),
(x_resized + 50, y_resized + 50),
(0, 255, 0),
2,
)
return Gst.FlowReturn.OK
except Gst.MapError as e:
Gst.error("mapping error %s" % e)
return Gst.FlowReturn.ERROR
except Exception as e:
Gst.error("%s" % e)
return Gst.FlowReturn.ERROR
GObject.type_register(SampleFilter)
Gst.Element.register(None, "sample_filter", Gst.Rank.NONE, SampleFilter)
gstreamer_pipeline = """
videotestsrc num-buffers=100 ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 ! sample_filter ! video/x-raw, width=640, height=480, framerate=30/1 ! tee name=t !
queue ! videoconvert ! x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast ! flvmux streamable=true ! udpsink sync=false
t. ! queue ! videoconvert ! xvimagesink handle-events=false sync=false
"""
# GStreamer pipeline
pipeline = Gst.parse_launch(gstreamer_pipeline)
# Start processing
start_time = time.time()
pipeline.set_state(Gst.State.PLAYING)
bus = pipeline.get_bus()
# Wait for the pipeline to finish
msg = bus.timed_pop_filtered(
Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR | Gst.MessageType.EOS
)
if msg:
t = msg.type
if t == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"Error: {err}, {debug}")
elif t == Gst.MessageType.EOS:
print("Pipeline finished successfully.")
pipeline.set_state(Gst.State.NULL)
# Log total video processing time
total_time = time.time() - start_time
print(f"Total wall-clock processing time for the video: {total_time:.2f} seconds")
Also, if the python file is saved in a python
directory in the GStreamer plugin path, we can test the python plugin with our powerful gst-launch
$ file lib/python/filter.py
lib/python/filter.py: Python script, ASCII text executable
$ export GST_PLUGIN_PATH=$(realpath lib)
$ PAGER=cat gst-inspect-1.0 sample_filter
Factory Details:
Rank none (0)
Long-name OpenCV Filter
Klass Filter
Description OpenCV Sample Filter
Author Ruben Gonzaelz <rgonzalez@fluendo.com>
Plugin Details:
Name python
Description loader for plugins written in python
Filename /usr/lib/gstreamer-1.0/libgstpython.so
Version 1.24.9
License LGPL
Source module gst-python
Binary package GStreamer Python
Origin URL http://gstreamer.freedesktop.org
GObject
+----GInitiallyUnowned
+----GstObject
+----GstElement
+----GstBaseTransform
+----filter+SampleFilter
Pad Templates:
SINK template: 'sink'
Availability: Always
Capabilities:
video/x-raw
format: RGB
width: [ 1, 2147483647 ]
height: [ 1, 2147483647 ]
framerate: [ 1/1, 2147483647/1 ]
SRC template: 'src'
Availability: Always
Capabilities:
video/x-raw
format: RGB
width: [ 1, 2147483647 ]
height: [ 1, 2147483647 ]
framerate: [ 1/1, 2147483647/1 ]
Element has no clocking capabilities.
Element has no URI handling capabilities.
Pads:
SINK: 'sink'
Pad Template: 'sink'
SRC: 'src'
Pad Template: 'src'
Element Properties:
name : The name of the object
flags: readable, writable
String. Default: "filter+samplefilter0"
parent : The parent of the object
flags: readable, writable
Object of type "GstObject"
qos : Handle Quality-of-Service events
flags: readable, writable
Boolean. Default: true
$ gst-launch-1.0 v4l2src device=/dev/video0 ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 ! sample_filter ! video/x-raw, width=640, height=480, framerate=30/1 ! tee name=t ! \
queue ! videoconvert ! x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast ! flvmux streamable=true ! fakesink \
t. ! queue ! videoconvert ! xvimagesink handle-events=false
Example 3: Leveraging Metadata and GPU Acceleration
Overview
Performance and metadata integration are critical in production pipelines. We achieve better efficiency and flexibility by offloading scaling and overlays to GStreamer’s GPU-accelerated elements and adding metadata directly to the stream.
Key Features:
- GStreamer video processing: Using GStreamer filters for scaling and overlaying the objects.
The last step is to use analyticsoverlay
and originalbuffer
to take advantage of GStreamer’s power.
Code
import gi
import time
import numpy
import random
gi.require_version("Gst", "1.0")
gi.require_version("GstBase", "1.0")
gi.require_version("GstAnalytics", "1.0")
from gi.repository import Gst, GstBase, GLib, GObject, GstAnalytics # noqa: E402
def ML_magic(frame):
"""
Dummy function to emulate ML process that takes a low resolution frame as input
and returns a region of interest center point.
"""
# Always return a dummy point (e.g., center of the resized frame)
x = random.randint(0, frame.shape[1])
y = random.randint(0, frame.shape[0])
return x, y
Gst.init(None)
Gst.init_python()
SRC_CAPS = Gst.Caps(
Gst.Structure(
"video/x-raw",
format="RGB",
width=Gst.IntRange(range(1, GLib.MAXINT)),
height=Gst.IntRange(range(1, GLib.MAXINT)),
framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
)
)
SINK_CAPS = Gst.Caps(
Gst.Structure(
"video/x-raw",
format="RGB",
width=Gst.IntRange(range(1, GLib.MAXINT)),
height=Gst.IntRange(range(1, GLib.MAXINT)),
framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
)
)
SRC_PAD_TEMPLATE = Gst.PadTemplate.new(
"src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, SRC_CAPS
)
SINK_PAD_TEMPLATE = Gst.PadTemplate.new(
"sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, SINK_CAPS
)
class SampleFilter(GstBase.BaseTransform):
__gstmetadata__ = (
"OpenCV Filter",
"Filter",
"OpenCV Sample Filter",
"Ruben Gonzaelz <rgonzalez@fluendo.com>",
)
__gsttemplates__ = (SRC_PAD_TEMPLATE, SINK_PAD_TEMPLATE)
def __init__(self):
super().__init__()
self.set_qos_enabled(True)
def do_set_caps(self, incaps, outcaps):
s = incaps.get_structure(0)
self.width = s.get_int("width").value
self.height = s.get_int("height").value
return True
def do_transform_ip(self, inbuf):
try:
inbuf_info = inbuf.map(Gst.MapFlags.READ)
with inbuf_info:
frame = numpy.ndarray(
shape=(self.height, self.width, 3),
dtype=numpy.uint8,
buffer=inbuf_info.data,
)
# Call the ML_magic function
x, y = ML_magic(frame)
meta = GstAnalytics.buffer_add_analytics_relation_meta(inbuf)
label = GLib.quark_from_string("label")
meta.add_od_mtd(label, x, y, 20, 20, 0.55)
return Gst.FlowReturn.OK
except Gst.MapError as e:
Gst.error("mapping error %s" % e)
return Gst.FlowReturn.ERROR
except Exception as e:
Gst.error("%s" % e)
return Gst.FlowReturn.ERROR
GObject.type_register(SampleFilter)
Gst.Element.register(None, "sample_filter", Gst.Rank.NONE, SampleFilter)
gstreamer_pipeline = """
videotestsrc num-buffers=100 ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 !
originalbuffersave ! videoconvertscale ! video/x-raw, width=100, height=100 ! sample_filter ! originalbufferrestore ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! objectdetectionoverlay ! tee name=t !
queue ! videoconvert ! x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast ! flvmux streamable=true ! udpsink sync=false
t. ! queue ! videoconvert ! xvimagesink handle-events=false sync=true
"""
# GStreamer pipeline
pipeline = Gst.parse_launch(gstreamer_pipeline)
# Start processing
start_time = time.time()
pipeline.set_state(Gst.State.PLAYING)
bus = pipeline.get_bus()
# Wait for the pipeline to finish
msg = bus.timed_pop_filtered(
Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR | Gst.MessageType.EOS
)
if msg:
t = msg.type
if t == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"Error: {err}, {debug}")
elif t == Gst.MessageType.EOS:
print("Pipeline finished successfully.")
pipeline.set_state(Gst.State.NULL)
# Log total video processing time
total_time = time.time() - start_time
print(f"Total wall-clock processing time for the video: {total_time:.2f} seconds")
Benefits:
- GPU video processing: This pipeline can be easily updated to use libva, v4l2mm, or another backend to decode and scale the video using the GPU.
- GPU Zero-Copy: To achieve better performance, GPU memory could be used during the decoding, scaling, and ML processes.
- Metadata Injection: Embed AI results into the output video stream.
- GStreamer: all the GStreamer flexibility to allow features such as recording fragments of the video or bypassing the audio from the input to the output.
Part of the final pipeline

Conclusion
This evolution from a basic OpenCV script to a fully optimized GStreamer pipeline demonstrates the power of combining these tools. GStreamer’s flexibility and efficiency make it ideal for deploying AI-driven multimedia applications. Whether you’re prototyping or building production pipelines, these examples provide a solid foundation for getting started.
Do you want to contact our team for a project like this? Send us a message here, and we will answer you right away!
Addendum
The environment to test these scripts can be easily set up using containers. The following is a Docker file for an Arch-based image that should include everything needed to test them:
FROM archlinux:latest
WORKDIR /app
RUN pacman -Suy --noconfirm
RUN pacman -S --noconfirm \
rust \
cargo \
cairo \
pango \
base-devel \
gobject-introspection \
gst-plugin-originalbuffer \
gst-plugins-bad \
gst-plugins-good \
gst-plugins-ugly \
gst-python \
python-opencv \
python-numpy
COPY src/*.py /app/
CMD ["bash"]
Let’s Build Together!
Integrating AI into production-ready multimedia pipelines requires the right expertise in OpenCV, GStreamer, and scalable architectures. Whether you’re prototyping a new idea or optimizing an existing workflow, our team can help bring your AI-driven multimedia solutions to life. [Get in touch with us today] (https://fluendo.com/contact/?services=&jp=CS) to discuss your project and explore how we can collaborate!