Quality problems in aiortc communication

116 Views Asked by At

With this code i am trying to implement an aiortc ip call voice channel between a browser user (client) and a python (pyqt5) user (server).

index.html

<!doctype html>
<html>
    <head>
        <title>Ip calls</title>
        
        <!-- jquery library -->
        <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.7.1/jquery.min.js"></script>
        
        <!-- own javascript -->
        <script src="telephone_calls.js"></script>
        
    </head>
    <body>
        <h1>Ip calls</h1>
        
        <table>
        <tr>
            <td><label for="name">Όνομα:</label></td>
            <td><input type="text" id="name" /></td>
        </tr>
        <tr>
            <td><label for="surname">Επώνυμο:</label></td>
            <td><input type="text" id="surname" /></td>
        </tr>
        </table>
        </br>
        <input id="control_call_button" type="button" value="Έναρξη κλήσης">
        
        <audio style="width:100%;margin-top:5mm;background:rgb(241,243,244)" id="audio" autoplay="true" controls="true"></audio>
    </body>
</html>

telephone_calls.js

// peer connection
var pc = null;

function createPeerConnection() {
    var config = {
        sdpSemantics: 'unified-plan',
        iceServers: [{urls: ['stun:stun.l.google.com:19302']}]
    };

    pc = new RTCPeerConnection(config);

    // connect audio
    pc.addEventListener('track', function(evt) {
        if (evt.track.kind == 'audio'){
            document.getElementById('audio').srcObject = evt.streams[0];
        };
    });

    return pc;
}

function negotiate() {
    return pc.createOffer({offerToReceiveAudio:true}).then(function(offer) {
        return pc.setLocalDescription(offer);
    }).then(function() {
        // wait for ICE gathering to complete
        return new Promise(function(resolve) {
            console.log(pc.iceGatheringState);
            if (pc.iceGatheringState === 'complete') {
                resolve();
            } else {
                function checkState() {
                    console.log(pc.iceGatheringState);
                    if (pc.iceGatheringState === 'complete') {
                        pc.removeEventListener('icegatheringstatechange', checkState);
                        resolve();
                    }
                }
                pc.addEventListener('icegatheringstatechange', checkState);

            }
        });
    }).then(function() {
        var offer = pc.localDescription;
        
        return fetch('/offer', {
            body: JSON.stringify({
                sdp: offer.sdp,
                type: offer.type,
                "name":name,
                "surname":surname
            }),
            headers: {
                'Content-Type': 'application/json'
            },
            method: 'POST'
        });
    }).then(function(response) {
        return response.json();
    }).then(function(answer) {
            return pc.setRemoteDescription(answer);
    }).catch(function(e) {
        alert(e);
        console.log(e);
    });
    
}


function start(name,surname) {
    pc = createPeerConnection();
    
    dc = pc.createDataChannel('radio_metadata', {"ordered": true});
    dc.onclose = function() {
        
    };
    dc.onopen = function() {
        
    };
    dc.onmessage = function(evt) {
        console.log(evt.data);
    };
    
    //negotiate();
    
    constraints = {audio:true,video:false};
    
    navigator.mediaDevices.getUserMedia(constraints).then(function(stream) {
        stream.getTracks().forEach(function(track) {
            pc.addTrack(track, stream);
        });
        return negotiate();
        }, function(err) {
            alert('Could not acquire media: ' + err);
    });
    
    
}

$(document).ready(function(){
    $("#control_call_button").on( "click", function() {
        name = $("#name").val();
        surname = $("#surname").val();
        start(name,surname)
    });
})

calls.py (pyqt5 interface - not implemented yet)

# -*- coding: utf-8 -*-

# Form implementation generated from reading ui file 'calls.ui'
#
# Created by: PyQt5 UI code generator 5.15.7
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again.  Do not edit this file unless you know what you are doing.


from PyQt5 import QtCore, QtGui, QtWidgets


class Ui_Dialog(object):
    def setupUi(self, Dialog):
        Dialog.setObjectName("Dialog")
        Dialog.resize(492, 60)
        self.gridLayout = QtWidgets.QGridLayout(Dialog)
        self.gridLayout.setObjectName("gridLayout")
        self.label = QtWidgets.QLabel(Dialog)
        self.label.setObjectName("label")
        self.gridLayout.addWidget(self.label, 0, 0, 1, 2)
        self.pushButton = QtWidgets.QPushButton(Dialog)
        self.pushButton.setObjectName("pushButton")
        self.gridLayout.addWidget(self.pushButton, 1, 0, 1, 1)
        self.pushButton_2 = QtWidgets.QPushButton(Dialog)
        self.pushButton_2.setObjectName("pushButton_2")
        self.gridLayout.addWidget(self.pushButton_2, 1, 1, 1, 1)
        self.pushButton_3 = QtWidgets.QPushButton(Dialog)
        self.pushButton_3.setObjectName("pushButton_3")
        self.gridLayout.addWidget(self.pushButton_3, 1, 2, 1, 1)

        self.retranslateUi(Dialog)
        QtCore.QMetaObject.connectSlotsByName(Dialog)

    def retranslateUi(self, Dialog):
        _translate = QtCore.QCoreApplication.translate
        Dialog.setWindowTitle(_translate("Dialog", "Τηλεφωνικές κλήσεις"))
        self.label.setText(_translate("Dialog", "Τηλεφωνική κλήση από: ... ..."))
        self.pushButton.setText(_translate("Dialog", "Απάντηση κλήσης"))
        self.pushButton_2.setText(_translate("Dialog", "Απόρριψη κλήσης"))
        self.pushButton_3.setText(_translate("Dialog", "Τερματισμός κλήσης"))


if __name__ == "__main__":
    import sys
    app = QtWidgets.QApplication(sys.argv)
    Dialog = QtWidgets.QDialog()
    ui = Ui_Dialog()
    ui.setupUi(Dialog)
    Dialog.show()
    sys.exit(app.exec_())

server.py

from PyQt5 import QtCore, QtGui, QtWidgets
from calls import Ui_Dialog
from aiohttp import web
from aiortc.mediastreams import MediaStreamTrack
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder, MediaRelay
from pydub import AudioSegment
import av
import pyaudio
import asyncio
import json
import os
from multiprocessing import Process, Queue, Pipe, freeze_support
from queue import Queue as Av_Queue
import sys
import threading
from time import sleep
import fractions
import time


from PyQt5.QtCore import pyqtSignal, QThread, Qt
from datetime import datetime, timedelta
from pydub import AudioSegment,effects,utils,generators
from pydub.utils import which
AudioSegment.converter = which("ffmpeg")

from pydub.playback import play
from io import BytesIO

class Run_me:
    def __init__(self):
        self.app = QtWidgets.QApplication(sys.argv)
        self.Dialog = QtWidgets.QDialog()
        self.ui = Ui_Dialog()
        self.ui.setupUi(self.Dialog)
        
        self.server_child_process = Server()
        self.server_child_process.start()
        
        self.Dialog.show()
        
        self.ui.label.hide()
        self.ui.pushButton.hide()
        self.ui.pushButton_2.hide()
        self.ui.pushButton_3.hide()
        
        self.Dialog.closeEvent = lambda event:self.closeEvent(event)
        
        sys.exit(self.app.exec_())
        
    def microphone_slice_ready(self,slice):
        packet = av.Packet(slice.raw_data)
        frame = self.codec.decode(packet)[0]
        frame.pts = self.audio_samples
        frame.time_base = fractions.Fraction(1, self.codec.sample_rate)
        self.audio_samples += frame.samples
        q.put(frame)
        
    def closeEvent(self,event): 
        event.accept()
    
class CustomRadioStream(MediaStreamTrack):
    kind = "audio"
    
    def __init__(self):
        super().__init__()  # don't forget this!
        
        self.q = Av_Queue()
        self._start = None
        
    async def recv(self):
        frame = self.q.get()
        return frame

class Server(Process):
    def __init__(self):
        super().__init__()
        self.ROOT = os.path.dirname(__file__)
        self.pcs = []
        self.channels = []
        self.stream_offer = None
    
    def run(self):
        self.app = web.Application()
        self.app.on_shutdown.append(self.on_shutdown)
        self.app.router.add_get("/", self.index)
        self.app.router.add_get("/telephone_calls.js", self.javascript)
        self.app.router.add_post("/offer", self.offer)
        threading.Thread(target=self.fill_the_queues).start()
        web.run_app(self.app, access_log=None, host="192.168.1.188", port=8080, ssl_context=None)

    
    def fill_the_queues(self):
        self.sample_rate = 44800
        self.AUDIO_PTIME = 0.744
        self.samples = int(self.AUDIO_PTIME * self.sample_rate)
        self.packet_time = 20

        self.FORMAT = pyaudio.paInt16
        self.CHANNELS = 2
        self.RATE = self.sample_rate
        self.CHUNK = int(44100*0.744)

        #self.file_segment = AudioSegment.from_file(r"ΑΓΙΑ ΚΥΡΙΑΚΗ.mp3").set_frame_rate(self.sample_rate)
        #self.duration_milliseconds = len(self.file_segment)
        #self.chunk_number = 0

        self.silence = AudioSegment.silent(duration=self.packet_time)

        self.codec = av.CodecContext.create('pcm_s16le', 'r')
        self.codec.sample_rate = 8000
        self.codec.channels = 2

        self.audio_samples = 0

        self.p = pyaudio.PyAudio()
        self.input_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=8000,input=True,frames_per_buffer=int(8000*0.020))
        self.input_stream.start_stream()

        while(True):
            in_data = self.input_stream.read(int(8000*0.020),exception_on_overflow = False)            
            slice = AudioSegment(in_data, sample_width=2, frame_rate=8000, channels=2)
            #slice = AudioSegment.from_mono_audiosegments(slice, slice)
            #slice = slice.set_frame_rate(44800)

            packet = av.Packet(slice.raw_data)
            frame = self.codec.decode(packet)[0]
            frame.pts = self.audio_samples
            frame.time_base = fractions.Fraction(1, self.codec.sample_rate)
            self.audio_samples += frame.samples
            if self.stream_offer is not None:
                self.stream_offer.q.put(frame)
        
    async def index(self,request):
        content = open(os.path.join(self.ROOT, "index.html"), encoding="utf8").read()
        return web.Response(content_type="text/html", text=content)


    async def javascript(self,request):
        content = open(os.path.join(self.ROOT, "telephone_calls.js"), encoding="utf8").read()
        return web.Response(content_type="application/javascript", text=content)

    async def offer(self,request):  
        params = await request.json()
        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
        name = params["name"]
        surname = params["surname"]
        #print(name+" "+surname)
        pc = RTCPeerConnection()
        self.pcs.append(pc)

        # prepare epalxeis media
        self.stream_offer = CustomRadioStream()
        pc.addTrack(self.stream_offer)


        @pc.on("datachannel")
        def on_datachannel(channel):
            self.channels.append(channel)
            self.send_channel_message(str(len(self.pcs)))


        @pc.on("iceconnectionstatechange")
        async def on_iceconnectionstatechange():
            if pc.iceConnectionState == "failed":
                self.pcs.remove(pc)
                print("Current peer connections:"+str(len(self.pcs)))
            

        @pc.on("track")
        async def on_track(track):           
            micTrack = ClientTrack(track)
            blackHole = MediaBlackhole()
            blackHole.addTrack(micTrack)
            await blackHole.start()
            
        # handle offer
        await pc.setRemoteDescription(offer)

        # send answer
        answer = await pc.createAnswer()
        await pc.setLocalDescription(answer)
        
            
        return web.Response(content_type="application/json",text=json.dumps({"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}))

    async def on_shutdown(self,app):
        # close peer connections
        if self.pcs:
            coros = [pc.close() for pc in self.pcs]
            await asyncio.gather(*coros)
            self.pcs = []
            self.channels = []
            self.stream_offers = []
            
    def send_channel_message(self,message):
        for channel in self.channels:
            channel.send(message)

class ClientTrack(MediaStreamTrack):
    kind = "audio"

    def __init__(self, track):
        super().__init__()
        self.track = track
        self.p = pyaudio.PyAudio()
        self.output_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=44800,output=True,frames_per_buffer=int(16384/4))
        self.output_stream.start_stream()
        
    async def recv(self):
        # Get a new PyAV frame
        frame = await self.track.recv()
        
        packet_bytes = frame.to_ndarray().tobytes()
        self.output_stream.write(packet_bytes)

if __name__ == "__main__":
    if os.path.exists("ip_call_1.mp3"):
        os.remove("ip_call_1.mp3")
    if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
        freeze_support()
    program = Run_me()

How to run this code:

  • Open ngrok and run ngrok http 192.168.1.188:8080 where 192.168.1.188 is your lan ip address.
  • Change the lan ip address in line 98 of file server.py (web.run_app(self.app, access_log=None, host="192.168.1.188", port=8080, ssl_context=None))
  • Run python server.py
  • Navigate with your browser to ngrok url and press the button. Then allow access to microphone and then speak.

With this approach i have two big problems:

  1. In some cases i have no sound from server (pyaudio) to client (html5 browser).
  2. The connection is lost after ~30 seconds in many cases.

How can i modify this code to fix the issue?

0

There are 0 best solutions below