I tried to stream a video over Kafka and displaying it in a simple HTTP server, built in Python. regardless the good streaming data, only the first timeframe (or first received timeframe) is served in the HTTP Server.
I have proved that the sent and received timeframes equal to the streamed video (by exporting out the sent and received time frames). my suspicion is that the HTTP Server doesn't automatically refresh or write the received timeframe every new messages are received.
KafkaProducer
import cv2
import io
from PIL import Image
from kafka import KafkaProducer
def recordedVid(video_file):
producer = KafkaProducer(
bootstrap_servers='localhost:9092'
)
cap = cv2.VideoCapture(video_file)
while(cap.isOpened()):
ret, frame = cap.read()
ret, buffer = cv2.imencode('.jpg', frame)
toStream = buffer.tobytes()
producer.send('Video', toStream)
print('sent')
time.sleep(0.1)
cap.release()
print('done')
recordedVid('sample.mp4')
KafkaConsumer
import cv2
import socket
from http.server import HTTPServer, BaseHTTPRequestHandler
from kafka import KafkaConsumer
from PIL import Image
import numpy as np
import io
from ensurepip import bootstrap
def extract_IP():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
HOST = s.getsockname()[0]
return HOST
def msg_process(msg) :
frame_bytes = msg.value
frame_arr = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(frame_arr, cv2.IMREAD_COLOR)
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
toLoad = io.BytesIO()
img.save(toLoad, format='JPEG')
return toLoad.getvalue()
consumer = KafkaConsumer(
"Video",
bootstrap_servers = 'localhost:9092'
)
class Display(BaseHTTPRequestHandler):
def do_GET(self):
for msg in consumer:
frame = msg_process(msg)
self.send_response(200)
self.send_header("Content-type", "image/jpeg")
self.end_headers()
self.wfile.write(frame)
HOST = extract_IP()
PORT = 9999
server = HTTPServer((HOST, PORT), Display)
print("server is now running in ", HOST," and Port ", PORT)
server.serve_forever()
server.server_close()
Did I miss something or is there any workaround for my problem?