why only the first timeframe of streamed video over Kafka served in a simple HTTP Server?

65 Views Asked by At

I tried to stream a video over Kafka and displaying it in a simple HTTP server, built in Python. regardless the good streaming data, only the first timeframe (or first received timeframe) is served in the HTTP Server.

I have proved that the sent and received timeframes equal to the streamed video (by exporting out the sent and received time frames). my suspicion is that the HTTP Server doesn't automatically refresh or write the received timeframe every new messages are received.

KafkaProducer

import cv2
import io 
from PIL import Image
from kafka import KafkaProducer

def recordedVid(video_file):

    producer = KafkaProducer(
        bootstrap_servers='localhost:9092'
        )

    cap = cv2.VideoCapture(video_file)
    while(cap.isOpened()):
        ret, frame = cap.read()
        ret, buffer = cv2.imencode('.jpg', frame)
        toStream = buffer.tobytes()
        producer.send('Video', toStream)
        print('sent')
        time.sleep(0.1)
    cap.release()
    print('done')

recordedVid('sample.mp4')

KafkaConsumer

import cv2
import socket
from http.server import HTTPServer, BaseHTTPRequestHandler
from kafka import KafkaConsumer
from PIL import Image
import numpy as np
import io
from ensurepip import bootstrap

def extract_IP():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("8.8.8.8", 80))

    HOST = s.getsockname()[0]

    return HOST

def msg_process(msg) :
    frame_bytes = msg.value
    frame_arr = np.frombuffer(frame_bytes, np.uint8)
    frame = cv2.imdecode(frame_arr, cv2.IMREAD_COLOR)
    img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    toLoad = io.BytesIO()
    img.save(toLoad, format='JPEG')
    return toLoad.getvalue() 


consumer = KafkaConsumer(
    "Video",
    bootstrap_servers = 'localhost:9092'
)

class Display(BaseHTTPRequestHandler):
    def do_GET(self):
        for msg in consumer:
            frame = msg_process(msg)
            self.send_response(200)
            self.send_header("Content-type", "image/jpeg")
            self.end_headers()
            self.wfile.write(frame)


HOST = extract_IP()
PORT = 9999
server = HTTPServer((HOST, PORT), Display)
print("server is now running in ", HOST," and Port ", PORT)
server.serve_forever()
server.server_close()

Did I miss something or is there any workaround for my problem?

0

There are 0 best solutions below