Open Menu
From a basic OpenCV script to a robust GStreamer-based solution

From a basic OpenCV script to a robust GStreamer-based solution

User Name

Written by

Rubén González

March 19, 2025

Artificial Intelligence (AI) is transforming the multimedia industry, driving innovation in real-time video analytics, augmented reality, and beyond. However, deploying these AI capabilities into production environments presents unique challenges and is far less glamorous. This article highlights how OpenCV and GStreamer can be utilized to build scalable, production-ready pipelines for AI-driven multimedia processing.

We’ll start with a basic OpenCV script and progress to a GStreamer-based solution capable of handling real-world production demands. The goal is to demonstrate how to scale from simple prototyping to production-grade pipelines using custom plugins and metadata integration.

Example 1: OpenCV and GStreamer for Input/Output

Overview

The first approach combines OpenCV and GStreamer. OpenCV handles frame manipulation and overlays, while GStreamer provides input/output (I/O) capabilities like reading from a camera or streaming to a remote endpoint.

Key Features:

  • OpenCV for AI logic: A dummy ML_magic function simulates an AI model that identifies regions of interest (ROIs).
  • GStreamer for Video I/O: GStreamer captures video and output processed streams.

Code:

import cv2
import random
import time


def ML_magic(frame):
    """
    Dummy function to emulate ML process that takes a low resolution frame as input
    and returns a region of interest center point.
    """
    # Always return a dummy point (e.g., center of the resized frame)
    x = random.randint(0, frame.shape[1])
    y = random.randint(0, frame.shape[0])
    return x, y


# GStreamer pipeline for capturing video from the camera
gstreamer_input = (
    "videotestsrc ! "  #    "v4l2src device=/dev/video0 ! "  # Linux camera device (adjust if using Windows or Mac)
    "video/x-raw, width=640, height=480, framerate=30/1 ! "
    "videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 ! appsink sync=false"
)

# OpenCV VideoCapture using GStreamer
cap = cv2.VideoCapture(gstreamer_input, cv2.CAP_GSTREAMER)
if not cap.isOpened():
    print("Error: Cannot open video capture using GStreamer")
    exit()

# GStreamer pipeline for streaming to RTMP
gstreamer_output = """
    appsrc ! video/x-raw, width=640, height=480, framerate=30/1 !
    videoconvert !
    x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast !
    flvmux streamable=true ! udpsink sync=false
"""

# Set up VideoWriter with the GStreamer pipeline
out = cv2.VideoWriter(
    gstreamer_output,
    cv2.VideoWriter_fourcc(*"X264"),  # FourCC code for H.264 encoding
    30,  # FPS
    (640, 480),  # Frame size
    True,  # Is color
)

# Check if VideoWriter opened successfully
if not out.isOpened():
    print("Error: Cannot open VideoWriter with GStreamer pipeline")
    cap.release()
    exit()

start_time = time.time()

try:
    num_frame = 0
    while num_frame < 100:
        num_frame += 1
        ret, frame = cap.read()
        if not ret:
            print("Error: Failed to capture video frame")
            break

        # Resize the frame to 100x100
        frame_resized = cv2.resize(frame, (100, 100))

        # Call the ML_magic function
        x, y = ML_magic(frame_resized)
        x_resized = int(x / 100 * frame.shape[1])
        y_resized = int(y / 100 * frame.shape[0])

        # Draw a bounding box on the original frame
        cv2.rectangle(
            frame,
            (x_resized - 50, y_resized - 50),
            (x_resized + 50, y_resized + 50),
            (0, 255, 0),
            2,
        )

        # Optionally display the frame
        ## - cv2.imshow("Camera Feed", frame)

        out.write(frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
except KeyboardInterrupt:
    print("Streaming stopped by user.")
finally:
    cap.release()
    out.release()
    cv2.destroyAllWindows()

# Log total video processing time
total_time = time.time() - start_time
print(f"Total wall-clock processing time for the video: {total_time:.2f} seconds")

Limitations:

  • Scaling Bottlenecks: OpenCV handles frame resizing, which can become inefficient.
  • Static Pipelines: Limited flexibility for dynamic configurations.

Example 2: Creating a GStreamer Plugin

Overview

Encapsulating the logic in a GStreamer plugin makes the pipeline modular and reusable. By embedding Python AI code into a custom GStreamer element, we simplify integration with other GStreamer components.

Key Features:

  • Custom GStreamer Element: The plugin processes frames and adds overlays using OpenCV.
  • Pipeline Integration: Easily integrates into GStreamer pipelines for scalability.

Code:

import gi
import time
import cv2
import numpy
import random

gi.require_version("Gst", "1.0")
gi.require_version("GstBase", "1.0")

from gi.repository import Gst, GstBase, GLib, GObject  # noqa: E402


def ML_magic(frame):
    """
    Dummy function to emulate ML process that takes a low resolution frame as input
    and returns a region of interest center point.
    """
    # Always return a dummy point (e.g., center of the resized frame)
    x = random.randint(0, frame.shape[1])
    y = random.randint(0, frame.shape[0])
    return x, y


Gst.init(None)
Gst.init_python()


SRC_CAPS = Gst.Caps(
    Gst.Structure(
        "video/x-raw",
        format="RGB",
        width=Gst.IntRange(range(1, GLib.MAXINT)),
        height=Gst.IntRange(range(1, GLib.MAXINT)),
        framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
    )
)

SINK_CAPS = Gst.Caps(
    Gst.Structure(
        "video/x-raw",
        format="RGB",
        width=Gst.IntRange(range(1, GLib.MAXINT)),
        height=Gst.IntRange(range(1, GLib.MAXINT)),
        framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
    )
)

SRC_PAD_TEMPLATE = Gst.PadTemplate.new(
    "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, SRC_CAPS
)

SINK_PAD_TEMPLATE = Gst.PadTemplate.new(
    "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, SINK_CAPS
)

class SampleFilter(GstBase.BaseTransform):
    __gstmetadata__ = (
        "OpenCV Filter",
        "Filter",
        "OpenCV Sample Filter",
        "Ruben Gonzaelz <rgonzalez@fluendo.com>",
    )

    __gsttemplates__ = (SRC_PAD_TEMPLATE, SINK_PAD_TEMPLATE)

    def __init__(self):
        super().__init__()
        self.set_qos_enabled(True)

    def do_set_caps(self, incaps, outcaps):
        s = incaps.get_structure(0)
        self.width = s.get_int("width").value
        self.height = s.get_int("height").value

        return True

    def do_transform_ip(self, inbuf):
        try:
            inbuf_info = inbuf.map(Gst.MapFlags.READ | Gst.MapFlags.WRITE)
            with inbuf_info:
                frame = numpy.ndarray(
                    shape=(self.height, self.width, 3),
                    dtype=numpy.uint8,
                    buffer=inbuf_info.data,
                )

                # Resize the frame to 100x100
                frame_resized = cv2.resize(frame, (100, 100))

                # Call the ML_magic function
                x, y = ML_magic(frame_resized)
                x_resized = int(x / 100 * frame.shape[1])
                y_resized = int(y / 100 * frame.shape[0])

                # Draw a bounding box on the original frame
                cv2.rectangle(
                    frame,
                    (x_resized - 50, y_resized - 50),
                    (x_resized + 50, y_resized + 50),
                    (0, 255, 0),
                    2,
                )

                return Gst.FlowReturn.OK

        except Gst.MapError as e:
            Gst.error("mapping error %s" % e)
            return Gst.FlowReturn.ERROR
        except Exception as e:
            Gst.error("%s" % e)
            return Gst.FlowReturn.ERROR


GObject.type_register(SampleFilter)
Gst.Element.register(None, "sample_filter", Gst.Rank.NONE, SampleFilter)

gstreamer_pipeline = """
                videotestsrc num-buffers=100 ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 ! sample_filter ! video/x-raw, width=640, height=480, framerate=30/1 ! tee name=t !
                queue ! videoconvert !  x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast !  flvmux streamable=true ! udpsink sync=false
                t. ! queue ! videoconvert ! xvimagesink handle-events=false sync=false
"""


# GStreamer pipeline
pipeline = Gst.parse_launch(gstreamer_pipeline)

# Start processing
start_time = time.time()
pipeline.set_state(Gst.State.PLAYING)
bus = pipeline.get_bus()

# Wait for the pipeline to finish
msg = bus.timed_pop_filtered(
    Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR | Gst.MessageType.EOS
)

if msg:
    t = msg.type
    if t == Gst.MessageType.ERROR:
        err, debug = msg.parse_error()
        print(f"Error: {err}, {debug}")
    elif t == Gst.MessageType.EOS:
        print("Pipeline finished successfully.")

pipeline.set_state(Gst.State.NULL)

# Log total video processing time
total_time = time.time() - start_time
print(f"Total wall-clock processing time for the video: {total_time:.2f} seconds")

Also, if the python file is saved in a python directory in the GStreamer plugin path, we can test the python plugin with our powerful gst-launch

$ file lib/python/filter.py
lib/python/filter.py: Python script, ASCII text executable
$ export GST_PLUGIN_PATH=$(realpath lib)
$ PAGER=cat gst-inspect-1.0  sample_filter
Factory Details:
  Rank                 	none (0)
  Long-name            	OpenCV Filter
  Klass                	Filter
  Description          	OpenCV Sample Filter
  Author               	Ruben Gonzaelz <rgonzalez@fluendo.com>

Plugin Details:
  Name                 	python
  Description          	loader for plugins written in python
  Filename             	/usr/lib/gstreamer-1.0/libgstpython.so
  Version              	1.24.9
  License              	LGPL
  Source module        	gst-python
  Binary package       	GStreamer Python
  Origin URL           	http://gstreamer.freedesktop.org

GObject
 +----GInitiallyUnowned
   	+----GstObject
         	+----GstElement
               	+----GstBaseTransform
                     	+----filter+SampleFilter

Pad Templates:
  SINK template: 'sink'
	Availability: Always
	Capabilities:
  	video/x-raw
             	format: RGB
              	width: [ 1, 2147483647 ]
             	height: [ 1, 2147483647 ]
          	framerate: [ 1/1, 2147483647/1 ]

  SRC template: 'src'
	Availability: Always
	Capabilities:
  	video/x-raw
             	format: RGB
              	width: [ 1, 2147483647 ]
             	height: [ 1, 2147483647 ]
          	framerate: [ 1/1, 2147483647/1 ]

Element has no clocking capabilities.
Element has no URI handling capabilities.

Pads:
  SINK: 'sink'
	Pad Template: 'sink'
  SRC: 'src'
	Pad Template: 'src'

Element Properties:

  name            	: The name of the object
                    	flags: readable, writable
                    	String. Default: "filter+samplefilter0"

  parent          	: The parent of the object
                    	flags: readable, writable
                    	Object of type "GstObject"

  qos             	: Handle Quality-of-Service events
                    	flags: readable, writable
                    	Boolean. Default: true


$ gst-launch-1.0  v4l2src device=/dev/video0 ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 ! sample_filter ! video/x-raw, width=640, height=480, framerate=30/1 ! tee name=t ! \
            	queue ! videoconvert !  x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast !  flvmux streamable=true ! fakesink \
            	t. ! queue ! videoconvert ! xvimagesink handle-events=false

Example 3: Leveraging Metadata and GPU Acceleration

Overview

Performance and metadata integration are critical in production pipelines. We achieve better efficiency and flexibility by offloading scaling and overlays to GStreamer’s GPU-accelerated elements and adding metadata directly to the stream.

Key Features:

  • GStreamer video processing: Using GStreamer filters for scaling and overlaying the objects.

The last step is to use analyticsoverlay and originalbuffer to take advantage of GStreamer’s power.

Code

import gi
import time
import numpy
import random

gi.require_version("Gst", "1.0")
gi.require_version("GstBase", "1.0")
gi.require_version("GstAnalytics", "1.0")

from gi.repository import Gst, GstBase, GLib, GObject, GstAnalytics  # noqa: E402


def ML_magic(frame):
    """
    Dummy function to emulate ML process that takes a low resolution frame as input
    and returns a region of interest center point.
    """
    # Always return a dummy point (e.g., center of the resized frame)
    x = random.randint(0, frame.shape[1])
    y = random.randint(0, frame.shape[0])
    return x, y


Gst.init(None)
Gst.init_python()


SRC_CAPS = Gst.Caps(
    Gst.Structure(
        "video/x-raw",
        format="RGB",
        width=Gst.IntRange(range(1, GLib.MAXINT)),
        height=Gst.IntRange(range(1, GLib.MAXINT)),
        framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
    )
)

SINK_CAPS = Gst.Caps(
    Gst.Structure(
        "video/x-raw",
        format="RGB",
        width=Gst.IntRange(range(1, GLib.MAXINT)),
        height=Gst.IntRange(range(1, GLib.MAXINT)),
        framerate=Gst.FractionRange(Gst.Fraction(1, 1), Gst.Fraction(GLib.MAXINT, 1)),
    )
)

SRC_PAD_TEMPLATE = Gst.PadTemplate.new(
    "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, SRC_CAPS
)

SINK_PAD_TEMPLATE = Gst.PadTemplate.new(
    "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, SINK_CAPS
)

class SampleFilter(GstBase.BaseTransform):
    __gstmetadata__ = (
        "OpenCV Filter",
        "Filter",
        "OpenCV Sample Filter",
        "Ruben Gonzaelz <rgonzalez@fluendo.com>",
    )

    __gsttemplates__ = (SRC_PAD_TEMPLATE, SINK_PAD_TEMPLATE)

    def __init__(self):
        super().__init__()
        self.set_qos_enabled(True)

    def do_set_caps(self, incaps, outcaps):
        s = incaps.get_structure(0)
        self.width = s.get_int("width").value
        self.height = s.get_int("height").value

        return True

    def do_transform_ip(self, inbuf):
        try:
            inbuf_info = inbuf.map(Gst.MapFlags.READ)
            with inbuf_info:
                frame = numpy.ndarray(
                    shape=(self.height, self.width, 3),
                    dtype=numpy.uint8,
                    buffer=inbuf_info.data,
                )

                # Call the ML_magic function
                x, y = ML_magic(frame)

            meta = GstAnalytics.buffer_add_analytics_relation_meta(inbuf)
            label = GLib.quark_from_string("label")
            meta.add_od_mtd(label, x, y, 20, 20, 0.55)

            return Gst.FlowReturn.OK

        except Gst.MapError as e:
            Gst.error("mapping error %s" % e)
            return Gst.FlowReturn.ERROR
        except Exception as e:
            Gst.error("%s" % e)
            return Gst.FlowReturn.ERROR


GObject.type_register(SampleFilter)
Gst.Element.register(None, "sample_filter", Gst.Rank.NONE, SampleFilter)

gstreamer_pipeline = """
                videotestsrc num-buffers=100 ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, width=640, height=480, framerate=30/1 !
                originalbuffersave ! videoconvertscale ! video/x-raw, width=100, height=100 ! sample_filter ! originalbufferrestore ! video/x-raw, width=640, height=480, framerate=30/1 ! videoconvert ! objectdetectionoverlay ! tee name=t !
                queue ! videoconvert !  x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast !  flvmux streamable=true ! udpsink sync=false
                t. ! queue ! videoconvert ! xvimagesink handle-events=false sync=true
"""


# GStreamer pipeline
pipeline = Gst.parse_launch(gstreamer_pipeline)

# Start processing
start_time = time.time()
pipeline.set_state(Gst.State.PLAYING)
bus = pipeline.get_bus()

# Wait for the pipeline to finish
msg = bus.timed_pop_filtered(
    Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR | Gst.MessageType.EOS
)

if msg:
    t = msg.type
    if t == Gst.MessageType.ERROR:
        err, debug = msg.parse_error()
        print(f"Error: {err}, {debug}")
    elif t == Gst.MessageType.EOS:
        print("Pipeline finished successfully.")

pipeline.set_state(Gst.State.NULL)

# Log total video processing time
total_time = time.time() - start_time
print(f"Total wall-clock processing time for the video: {total_time:.2f} seconds")

Benefits:

  • GPU video processing: This pipeline can be easily updated to use libva, v4l2mm, or another backend to decode and scale the video using the GPU.
  • GPU Zero-Copy: To achieve better performance, GPU memory could be used during the decoding, scaling, and ML processes.
  • Metadata Injection: Embed AI results into the output video stream.
  • GStreamer: all the GStreamer flexibility to allow features such as recording fragments of the video or bypassing the audio from the input to the output.

Part of the final pipeline

Gstreamer pipeline

Conclusion

This evolution from a basic OpenCV script to a fully optimized GStreamer pipeline demonstrates the power of combining these tools. GStreamer’s flexibility and efficiency make it ideal for deploying AI-driven multimedia applications. Whether you’re prototyping or building production pipelines, these examples provide a solid foundation for getting started.

Do you want to contact our team for a project like this? Send us a message here, and we will answer you right away!

Addendum

The environment to test these scripts can be easily set up using containers. The following is a Docker file for an Arch-based image that should include everything needed to test them:

FROM archlinux:latest

WORKDIR /app

RUN pacman -Suy --noconfirm
RUN pacman -S --noconfirm \
    rust \
    cargo \
    cairo \
    pango \
    base-devel \
    gobject-introspection \
    gst-plugin-originalbuffer \
    gst-plugins-bad \
    gst-plugins-good \
    gst-plugins-ugly \
    gst-python \
    python-opencv \
    python-numpy

COPY src/*.py /app/

CMD ["bash"]

Let’s Build Together!

Integrating AI into production-ready multimedia pipelines requires the right expertise in OpenCV, GStreamer, and scalable architectures. Whether you’re prototyping a new idea or optimizing an existing workflow, our team can help bring your AI-driven multimedia solutions to life. [Get in touch with us today] (https://fluendo.com/contact/?services=&jp=CS) to discuss your project and explore how we can collaborate!