Gstreamer pipeline for demuxing and decoding the rtsp streaming input

93 Views Asked by At

I want to create two different gstreamer pipeline in python3, where one pipeline will read the rtsp source (a camera url) and parse the h264 packets from it and put it into a queue, whereas second pipeline will read packet from the queue and decode the frame out of it and put the frame in another queue.

I am getting following error in decoding pipeline while running it:

Message:: Type:<flags GST_MESSAGE_ERROR of type Gst.MessageType>, Src:packet_src
Error: gst-stream-error-quark: Internal data stream error. (1) gstbasesrc.c(3072): gst_base_src_loop (): /GstPipeline:pipeline0/GstAppSrc:packet_src:
streaming stopped, reason not-negotiated (-4)

I have enabled debug and got following warnings which could be the cause:

0:00:00.041888515 18865 0x7f05d800eea0 WARN               structure gststructure.c:1861:priv_gst_structure_append_to_gstring: No value transform to serialize field 'allocator' of type 'GArray'
0:00:00.041910071 18865 0x7f05d800eea0 WARN               structure gststructure.c:1861:priv_gst_structure_append_to_gstring: No value transform to serialize field 'pool' of type 'GArray'

Through debug, I could parse following error:

0:00:00.196154481 29817 0x7fed7c00eea0 WARN            videodecoder gstvideodecoder.c:2425:gst_video_decoder_chain:<decoder> Received buffer without a new-segment. Assuming timestamps start from 0

0:00:00.762375830 29817 0x7fed54001f80 ERROR                  libav :0:: No start code is found.
0:00:00.762426081 29817 0x7fed54001f80 ERROR                  libav :0:: Error splitting the input into NAL units.
0:00:00.762481602 29817 0x7fed7c00eea0 WARN                   libav gstavviddec.c:1874:gst_ffmpegviddec_handle_frame:<decoder> Failed to send data for decoding

Here, source pipeline is generating the packets and in decoding pipeline, I could push the packets to appsrc also. But, after that it could not pass data to next elements.

Pipeline:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
from queue import Queue
import threading

# Initialize GStreamer (if not already initialized)
Gst.init(None)


# Create the RTSP source pipeline
source_pipeline_str = (
    "rtspsrc location=rtsp://admin:[email protected]:554/ latency=200 protocols=4 name=source ! "
    "rtph264depay name=depay ! "
    "h264parse name=parse ! "
    "appsink emit-signals=True sync=False max-buffers=0 drop=False name=packet_sink" # 
)

# Create the decoding pipeline
decode_pipeline_str = (
    "appsrc caps=video/x-h264 name=packet_src ! " # 
    "avdec_h264 name=decoder ! "
    "videoconvert name=converter ! video/x-raw,format=BGR ! " 
    "appsink emit-signals=True sync=False max-buffers=0 drop=False name=decode_sink"
)


def run_source_pipeline(pipeline_str, packet_queue):
    source_pipeline = Gst.parse_launch(pipeline_str)
    source_pipeline.set_state(Gst.State.PLAYING)
    
    packet_sink = source_pipeline.get_by_name('packet_sink')
    
    while True:
        sample = packet_sink.emit('pull-sample')
        buffer = sample.get_buffer()
        data = buffer.extract_dup(0, buffer.get_size())
        packet_queue.put(data)


def run_decode_pipeline(pipeline_str, packet_queue):
    
    def on_new_frame(appsink):
        sample = appsink.emit('pull-sample')
        buf = sample.get_buffer()
        data = buffer.extract_dup(0, buffer.get_size())
        print("########## Received decoded data.")
        # Extract frame out of data

    decode_pipeline = Gst.parse_launch(pipeline_str)

    packet_src = decode_pipeline.get_by_name('packet_src')
    
    decode_sink = decode_pipeline.get_by_name('decode_sink')
    decode_sink.connect('new-sample', on_new_frame)
    sink_caps_str = 'video/x-raw, format=(string){BGR, GRAY8}; video/x-bayer,format=(string){rggb,bggr,grbg,gbrg}'
    decode_sink.set_property('caps', Gst.caps_from_string(sink_caps_str))

    decode_pipeline.set_state(Gst.State.PLAYING)

    while True:
        data = packet_queue.get()
        buf = Gst.Buffer.new_wrapped(data)
        packet_src.emit('push-buffer', buf)
        print("new packet pushed")

packet_queue = Queue()


# Create and start threads for each pipeline
source_thread = threading.Thread(target=run_source_pipeline, args=(source_pipeline_str, packet_queue))
decode_thread = threading.Thread(target=run_decode_pipeline, args=(decode_pipeline_str, packet_queue))
source_thread.start()
decode_thread.start()

# Create a GStreamer main loop to keep the application running
mainloop = GLib.MainLoop()
mainloop.run()

I was expecting the frames from decoding pipeline but it's not giving any output data.

0

There are 0 best solutions below