Is there an alternative to the cv2.resize() function that is faster at upscaling an image?

1.8k Views Asked by At

Problem

I want to display an image fullscreen (1920 x 1080) at 60 FPS in my application, but what I'm actually getting is closer to 15 FPS.

What I've tried

This is the most simplified version of my code that I've been able to get, which still produces the problem I described above.

// my code

import os
import sys
import cv2
import time
import pygame
import random
import numpy as np
import importlib.util
from threading import Thread
from imutils.video import VideoStream

### CLASSES ###

# Define VideoStream class to handle streaming of video from webcam in separate processing thread
class VideoStream:

    """Camera object that controls video streaming from the Picamera"""
    def __init__(self, resolution = (640, 360), framerate = 30):
        # Initialize the PiCamera and the camera image stream
        self.stream = cv2.VideoCapture(0)

        # Resolution of the video stream (w, h)
        self.resolution = int(resolution[0]), int(resolution[1])

        # Configure camera resolution
        ret = self.stream.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
        ret = self.stream.set(3, self.resolution[0])
        ret = self.stream.set(4, self.resolution[1])
            
        # Read first frame from the stream
        self.grabbed, self.frame = self.stream.read()

        # Variable to control when the camera is stopped
        self.stopped = False

        # Variables to monitor frame rate
        self.fps = framerate
        self.frequency = None
        self.t1 = 0
        self.t_delta = 0

    def start(self):
        # Start the thread that reads frames from the video stream
        Thread(target = self.update, args = ()).start()
        return self

    def update(self):
        # Initialize frame rate calculation
        self.fps = 60
        self.frequency = cv2.getTickFrequency()

        # Keep looping indefinitely until the thread is stopped
        while True:
            # If the camera is stopped, stop the thread
            if self.stopped:
                # Close camera resources
                self.stream.release()
                return
            # Otherwise, grab the next frame from the stream
            self.grabbed, self.frame = self.stream.read()

    def read(self):
        # Return the most recent frame
        return self.frame

    def stop(self):
        # Indicate that the camera and thread should be stopped
        self.stopped = True

class ObjectDetector:

    """Computer vision object that detects object(s) in images
    using artifical intelligence and a pre-trained model"""
    def __init__(self, model_path: str = None, graph_name: str = "detect.tflite", labelmap_name: str = "labelmap.txt", \
        use_TPU = False, minimum_confidence_threshold: float = 0.5, videostream: VideoStream = None):
        # Setup default properties
        self.model_path = model_path
        self.graph_name = graph_name
        self.labelmap_name = labelmap_name
        self.use_TPU = use_TPU
        self.minimum_confidence_threshold = minimum_confidence_threshold
        self.videostream = videostream

        self.frame = None
        self.score_string = ""

        self.stopped = False

        self.setup()

    def setup(self):
        # Setup the environment for the model
        print("[INFO] Preparing 'ShuffleboardPucks_TFLite' model...")

        # Import TensorFlow libraries
        pkg_exists = importlib.util.find_spec("tflite_runtime")
        if pkg_exists:
            from tflite_runtime.interpreter import Interpreter
            if self.use_TPU:
                from tflite_runtime.interpreter import load_delegate    
        else:
            from tensorflow.lite.python.interpreter import Interpreter
            if self.use_TPU:
                from tensorflow.lite.python.interpreter import load_delegate

        # If using Edge TPU, assign filename for Edge TPU model
        if self.use_TPU:
            # If user has specified the name of the *.tflite file, use that name, otherwise, use default 'edgetpu.tflite'
            if self.graph_name == "detect.tflite":
                self.graph_name = "edgetpu.tflite"     

        # Get path to current working directory
        cwd_path = os.getcwd()
        # Path to *.tflite file, which contains the model that is used for object detection
        ckpt_path = os.path.join(cwd_path, self.model_path, self.graph_name)
        # Path to labelmap file
        label_path = os.path.join(cwd_path, self.model_path, self.labelmap_name)
        # Load the labelmap
        with open(label_path, "r") as f:
            self.labels = [line.strip() for line in f.readlines()]
        # Have to do a weird fix for label map if using the COCO "starter model" from
        # https://www.tensorflow.org/lite/models/object_detection/overview
        # First label is '???', which has to be removed.
        if self.labels[0] == "???":
            del(self.labels[0])

        # Load the Tensorflow Lite model
        # If using Edge TPU, use special load_delegate argument
        if self.use_TPU:
            self.interpreter = Interpreter(model_path = ckpt_path, experimental_delegates = [load_delegate("libedgetpu.so.1.0")])
        else:
            self.interpreter = Interpreter(model_path = ckpt_path)

        self.interpreter.allocate_tensors()

        # Get model details
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()
        self.model_height = self.input_details[0]["shape"][1]
        self.model_width = self.input_details[0]["shape"][2]

        # Check if model is floating point version, which uses floats rather than
        # integers when predicting to decrease calculation time
        self.is_floating_model = (self.input_details[0]["dtype"] == np.float32)

        self.input_mean = 127.5
        self.input_std = 127.5

    def detect(self):
        # Start the thread that performs object detection on frames read from the video stream

        # Initialize the video stream and allow the camera sensor to warm up
        print("[INFO] Starting video stream...")

        self.videostream.start()
        # time.sleep(2) # TODO Determine whether this is necessary

        Thread(target = self.update, args = ()).start()
        return self

    def update(self):
        # Loop over the frames from the video stream until thread is stopped

        while True:

            # If the user closes pygame stopped, stop the thread
            if self.stopped:
                # Do a bit of cleanup
                cv2.destroyAllWindows()
                self.videostream.stop()
                return
            # Otherwise, continue to perform detection
            
            # Grab frame from video stream
            self.frame = self.videostream.read()
            
            # Recolor and resize frame to expected shape [1xHxWx3]
            frame_recolored = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB)
            frame_resized = cv2.resize(frame_recolored, (self.model_width, self.model_height)) # NOTE: Resizing during each iteration is very slow
            input_data = np.expand_dims(frame_resized, axis = 0)
            
            # Get frame's height and width (h, w)
            h, w = self.videostream.stream.get(cv2.CAP_PROP_FRAME_HEIGHT), self.videostream.stream.get(cv2.CAP_PROP_FRAME_WIDTH)
            
            # Team score variables
            blue_score, red_score = 0, 0
            
            # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
            if self.is_floating_model:
                input_data = (np.float32(input_data) - self.input_mean) / self.input_std

            # Perform the actual detection by running the model with the image as input
            self.interpreter.set_tensor(self.input_details[0]["index"], input_data)
            self.interpreter.invoke()

            # Retrieve detection results
            classes = self.interpreter.get_tensor(self.output_details[1]["index"])[0] # Class index of detected objects
            scores = self.interpreter.get_tensor(self.output_details[2]["index"])[0] # Confidence of detected objects

            # Loop over all detections and draw detection box if confidence is above minimum threshold
            for i in range(len(scores)):
                
                if (scores[i] > self.minimum_confidence_threshold) and (scores[i] <= 1.0):

                    # Draw label
                    object_name = self.labels[int(classes[i])] # Look up object name from "labels" array using class index
                    
                    # Allocate appropriate scores
                    if (object_name == "Blue Shuffleboard Puck"):
                        blue_score += 1
                        
                    elif (object_name == "Red Shuffleboard Puck"):
                        red_score += 1

            
            # This captures the latest score based on object detection results
            self.score_string = str(blue_score) + "-" + str(red_score)

            # Update the frame variable with latest results
            self.frame = self.frame

    def stop(self):
        # Indicate that the loop and thread should be stopped
        self.stopped = True

    def read(self):
        # Return the most recent frame and score
        return self.frame, self.score_string


### HELPER FUNCTIONS ###

# Calculate distance between two points
def calculateDistance(ptA, ptB):
    # Divide by 25.4 to convert to inches; remove to keep in millimeters
    return (abs(ptA - ptB))

# Convert CV Image (frame) to Pygame Image
def convertToPygameImage(frame):
    return pygame.image.frombuffer(frame.tobytes(), frame.shape[1::-1], "RGB")


### MAIN THREAD ###

# Initialize the video stream using the PiCamera
resolution = (640, 360)

videostream = VideoStream(resolution = resolution, framerate = 60) # NOTE: Might have to use 30 instead of 60

# Intialize the object detection with the video stream's images
model_path = "/home/pi/tflite1/ShuffleboardPucks_TFLite_model_(ssd_mobilenet_v2_quantized_300x300_coco)"
graph_name = "detect.tflite"
labelmap_name = "labelmap.txt"

shuffleboard_detector = ObjectDetector(model_path, graph_name, labelmap_name, use_TPU = True, minimum_confidence_threshold = 0.5, videostream = videostream)
shuffleboard_detector.detect()


# Initialize Pygame
print("[INFO] Initializing Pygame...")

pygame.init()
clock = pygame.time.Clock()

# Setup Window
win = pygame.display.set_mode((0, 0), pygame.RESIZABLE)
win_W, win_H = pygame.display.get_surface().get_size()
pygame.display.set_caption("Prototype")

# Loop idefinitely iver object detection frames to display live shuffleboard score
playing_shuffleboard = True
while playing_shuffleboard:
    
    # Handle the Pygame events
    for event in pygame.event.get():
        # If user closed the window, stop object detection thread, videostream thread, and loop
        if event.type == pygame.QUIT:
            playing_shuffleboard = False
            shuffleboard_detector.stop()
            continue
        # If user pressed the 'q' key, ""
        elif event.type == pygame.KEYDOWN and event.key == pygame.K_q:
            playing_shuffleboard = False
            shuffleboard_detector.stop()
            continue
    
    clock.tick(60)

    # Get the latest detection results/annotations (image with detection annotations)
    # and score for both red and blue teams
    detection_frame, score_string = shuffleboard_detector.read()

    # Add object-detection-annotated camera frame to screen
    frame_resized = cv2.resize(detection_frame, (1920, 1080))
    frame_recolored = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB)
    pygame_image = convertToPygameImage(frame_recolored)
    win.blit(pygame_image, (0, 0))

    pygame.display.update()

# Do a bit of cleanup
pygame.quit()
sys.exit()

Basically, what the code is doing is (1) creating one thread for capturing the video stream from my Raspberry Pi 4 PiCamera via cv2.VideoCapture() at 640 x 360 resolution, (2) then, creating another thread that performs object detection on those video stream images and outputs an annotated image, (3) and, finally, resizing that image to fullscreen (1920 x 1080) using cv2.resize() to then display via the Pygame blit() function in the main thread.

Changing the final output resolution to 640 x 360 versus 1920 x 1080 improves the FPS to around 40; however, I would like the resulting image to be displayed fullscreen instead of 640 x 360.

My Research

Resize image faster in OpenCV Python describes a similar problem, but I believe mine is different because my task is in a while loop that is intended to run indefinitely until closed.

Question

So my basic question is, is there an alternative method I can use to take an initially small image (640 x 360), perform object detection on it, and resize it larger (1920 x 1080) to display fullscreen while achieving an FPS close to 60?

1

There are 1 best solutions below

0
On

Maybe cv.pyrUp is faster than general cv.resize.

Though, your code is likely slow because you aren't just upscaling... you are running a neural network!

You should have expected all the other stuff you do to take some time as well.

You should also expect the other code to slow down proportional to the size of the input, i.e. it is completely responsible for the slowdown if you blow of the input by a factor of 9 (640x360 -> 1920x1080).

You should look for ways to "profile" your code. Profiling means measuring the execution time of every instruction. A less severe approach would be to use time.perf_counter() and take time differences around various blocks of code.