Receive, process and send multiple streams in python server

607 Views Asked by At

I have a React application which uses a Janus WebRTC server to share video streams between clients (using the videoroom plugin). There can be a variable number of rooms with a variable number of clients in each room.

I also have a python server that uses GStreamer and OpenCV to receive a video stream (just one stream rtp-forwarded from the Janus server) and process its frames with a YOLO neural network (this works fine).

Edit: The rtp forward request I currently send with Janus looks like this:

rtpForward (room: number, publisher_id: number): void {
    try {
      const message = {
        request: 'rtp_forward',
        room,
        publisher_id,
        host: '192.my.local.ip',
        host_family: 'ipv4', 
        video_port: 6001, 
        videopt: 100,
      }
      this.videoRoomPlugin &&
        this.videoRoomPlugin.send({
          message,
          success: result => {
            console.log('::: videoPlugin - RTP forwarding', result)
          },
          error: error => {
            console.error('::: videoPlugin - RTP forwarding ERROR:', error)
          }
        })
    } catch (error) {
      console.log('Error rtp forwarding: ', error)
    }
  }

The code I use in the python server to get the rtp-forward looks like this:

import numpy
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GstApp, GLib
from threading import Thread
import cv2
import torch
from av import VideoFrame

Gst.init()

# Precictor to make the object detection on the frames
predictor = torch.hub.load('ultralytics/yolov5', 'yolov5s')  # yolov5s or yolov5m, yolov5x, custom
frame_count = 0
frame_threshold = 5
# Global variable holding the image array
image_arr = None

def gst_to_opencv(sample):
    buf = sample.get_buffer()
    caps = sample.get_caps()

    arr = numpy.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         3),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=numpy.uint8)
    return arr


main_loop = GLib.MainLoop()
main_loop_thread = Thread(target=main_loop.run)
main_loop_thread.start()

pipeline = Gst.parse_launch('udpsrc port=6001 caps="application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)VP8, payload=(int)96" ! queue ! rtpvp8depay ! decodebin ! videoconvert ! video/x-raw, format=BGR ! appsink name=sink sync=true max-buffers=1 drop=true')
appsink = pipeline.get_by_name("sink")
pipeline.set_state(Gst.State.PLAYING)

try:
    while True:
        sample = appsink.try_pull_sample(Gst.SECOND)

        if sample is None:
            continue

        # Convert the sample to a numpy array to manage with cv2
        frame = gst_to_opencv(sample)

        # Determine whether its a frame where a detection should be performed
        if (frame_count < frame_threshold):
            # No detection -> do nothing
            frame_count += 1
            new_frame = sample
        else:
            # Detection -> Apply predictor to frame and rendet the results on it
            image = frame
            outputs = predictor(image) # Apply predictor
            outputs.render() # Render predictor results on the frame

            # Prepare output in the new_frame variable
            new_frame = VideoFrame.from_ndarray(outputs.imgs[0], format="bgr24")
            frame_count = 0
except KeyboardInterrupt:
    pass

pipeline.set_state(Gst.State.NULL)
main_loop.quit()
main_loop_thread.join()

Now I want to make the next step and handle multiple client streams, plus, send the processed streams back to the corresponding clients.

Contemplated solutions:

First, I thought about joining the Janus rooms with the python server to subscribe to their feeds, but this would require to handle multiple sessions of Janus and making everyone publishers and subscribers.

Second, I thought about keeping the Janus part to share feeds between clients and also send the streams directly to the python server from the clients (as well as sending them back from the server directly).

So as this looks like a quite complicated task, I ask: do you think that my options make sense? Which one of them do you think is better?

I feel like the current setup with the Janus rtp-forwarding is incorrect, as I think that I have no way of identifying the streams when they reach the python server if they come from Janus like that. Is it better to share the feeds between the clients and the python server directly? This could be achieved with web sockets?

Thank you for your help in advance!

0

There are 0 best solutions below