Can we monitor gstreamer pipeline opened by OPENCV through the gstreamer code?

173 Views Asked by At

I am opening an gstreamer pipeline with opencv. Now,as long as data is coming everything works fine. But, when for any reason if data stops coming, then pipeline is stucked and because of that opencv is stucked. Below is the code which i am using:

#define UDP_URL "udpsrc port=15004 buffer-size=5000000 ! watchdog timeout=1000 ! tsdemux latency=0 ! h264parse ! v4l2h264dec ! imxvideoconvert_g2d ! video/x-raw,format=BGRA,width=1280,height=960 ! appsink max-buffers=2"
int main()
{

    cv::VideoCapture video;
    cv::Mat frame;
    video.open(Stream_URL, cv::CAP_GSTREAMER);
    if (!video.isOpened()) {
        printf("Error in opening.\n");
        return -1;
    }

    while(1) {

        if(video.read(frame))
        {
            // some operation on frame.
        }
        else
            break;
    }

    video.release();
    return 0;
}

In above code when there is no data on port 15004, then video.read(frame) function gets stucked, especially the v4l2h264dec decoder. I think this decode is getting stucked. And when data again starts coming still it is stucked on the same function. During gstreamer debugging I got "gstreamer pipeline is in halt state", even though I can that the data is coming on the port 15004 with the help of tcpdump command. I am thinking of using gstreamer code to monitor the pipeline, but I don't how do I monitor. Also i am using IMX8QM board, the gstreamer version is 1.0 and opencv version is 4.6.0.

I did not try any code for monitoring, because I do not know how we can access the opencv backend, from the source file.

1

There are 1 best solutions below

0
SeB On

AFAIK, mostly no. OpenCv Gstreamer backend doesn't expose the internals of the underlying pipeline into cv VideoCapture. Any call to VideoCapture's read (or grab) would be blocked.

Though, you can get a cv Mat from a gstreamer pipeline without cv::VideoCapture.

The following example uses a src pad probe callback on a queue for getting the frame into a cv::Mat. It does nothing with it (just outputs a dot on stdout), you would push the Mat into a buffer for further processing by another thread.

#include <unistd.h>
#include <iostream>

#include <gst/gst.h>
#include <opencv2/opencv.hpp>


const char* gstStr = "udpsrc port=15004 buffer-size=1000000 ! queue ! watchdog timeout=1000 ! queue ! tsdemux ! h264parse ! avdec_h264 ! videorate ! videoconvert ! video/x-raw,format=BGRA,width=1280,height=960,framerate=30/1 ! queue name=my_sink ! fakesink";

static GstPadProbeReturn
cb_have_data (GstPad          *pad,
              GstPadProbeInfo *info,
              gpointer         user_data)
{
  GstMapInfo map;
  GstBuffer *buffer;
  buffer = GST_PAD_PROBE_INFO_BUFFER (info);
  if (gst_buffer_map (buffer, &map, GST_MAP_READ)) {
    cv::Mat frame(960,1280, CV_8UC4, map.data);
    std::cout << "." << std::flush;
    gst_buffer_unmap (buffer, &map);
  }

  return GST_PAD_PROBE_OK;
}


int main(int argc, char **argv)
{
    /* init GStreamer */
    gst_init (&argc, &argv);

    while(1) {
        GstElement *pipeline = gst_parse_launch (gstStr, NULL);
        GstBus *bus = gst_element_get_bus (pipeline);
        GstElement *my_sink = gst_bin_get_by_name(GST_BIN(pipeline), "my_sink");
        GstPad *pad = gst_element_get_static_pad (my_sink, "src");
        gulong probe_id = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,(GstPadProbeCallback) cb_have_data, NULL, NULL);
        gst_object_unref (my_sink);
        
        /* run */
        gst_element_set_state (pipeline, GST_STATE_PLAYING);

        /* wait until it's up and running or failed */
        if (gst_element_get_state (pipeline, NULL, NULL, -1) == GST_STATE_CHANGE_FAILURE) {
          g_print ("Failed to go into PLAYING state... Source may be down\n");
          gst_pad_remove_probe (pad, probe_id);
          gst_object_unref (pad);
          gst_element_set_state (pipeline, GST_STATE_NULL);
          gst_object_unref (bus);
          gst_object_unref (pipeline);
          sleep(3);
          std::cout << "Restarting" << std::endl;
          continue;
        }
        
        g_print ("Running ...\n");
        GstMessage *msg = gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
        if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) {
           g_print ("An error occurred! May be source is down and watchdog detected that\n");
        }
        gst_pad_remove_probe (pad, probe_id);
        gst_object_unref (pad);
        gst_element_set_state (pipeline, GST_STATE_NULL);
        gst_object_unref (bus);
        gst_object_unref (pipeline);
        sleep(3);
        std::cout << "Restarting" << std::endl;
    }

    return 0;
}