UDP Streaming in Python: Unexplained Delays Beyond Sleep Time

33 Views Asked by At

**I'm working on a school project involving UDP streaming in Python, where a client (udp_stream.py) sends messages to a server (udp_stream_server.py). The client script is designed to send a specified number (800) of messages at a certain rate (40/s). However, I'm experiencing unexpected delays in the streaming process, and the actual execution time is longer than expected.

I've checked my code for potential bottlenecks and adjusted the sleep time, but the issue persists. The server script simply prints incoming messages and their corresponding client addresses.

Any insights on why the UDP streaming might be slower than anticipated would be greatly appreciated. I'm specifically seeking guidance on optimizing the Python UDP client for consistent performance.

Thank you! **

`udp_stream.py

from socket import *
import time

def returnTime():
    t = time.localtime()
    current_time = time.strftime("%H:%M:%S", t)
    milliseconds = int((time.time() % 1) * 1000)  # Extract milliseconds
    return f"{current_time}.{milliseconds:03d}"

def log():
    return f"[CLIENT:] {returnTime()}:"

def udp_stream(target_ip, target_port, message_total, message_rate):
    client_socket = socket(AF_INET, SOCK_DGRAM)
    message_number = 10001
    message_size=1470
    sent_messages = 0

    first_sent = log();


    for x in range(message_total):
        sent_messages += 1

        if sent_messages < message_total:
            message = f"{message_number};{'A'*(message_size - len(str(message_number)) -1)}"

            client_socket.sendto(message.encode(), (target_ip, target_port))
            time.sleep(1 / message_rate)
            message_number = message_number + 1



        elif sent_messages == message_total:
            message = f"{message_number};{'A' * (message_size - len(str(message_number)) - 5)}####"

            client_socket.sendto(message.encode(), (target_ip, target_port))

            message_number = message_number + 1
            time.sleep(1 / message_rate)
            last_sent = log()


    print(first_sent, last_sent)


udp_stream('localhost',8080,800,40)`
`udp_stream_server.py

import time
def returnTime():
    t = time.localtime()
    current_time = time.strftime("%H:%M:%S", t)
    milliseconds = int((time.time() % 1) * 1000)  # Extract milliseconds
    return f"{current_time}.{milliseconds:03d}"

from socket import *

serverPort = 8080

serverSocket = socket(AF_INET, SOCK_DGRAM)
serverSocket.bind(('', serverPort))

print(f'[SERVER: {returnTime()}]: UDP Server has started listening on port: {serverPort}')

while True:
    # read client's message and remember client's address (IP and port)
    message, clientAddress = serverSocket.recvfrom(1024)
    # Print message and client address
    print (f"[SERVER: {returnTime()}]: Message from client: ",message.decode())
    print (f"[SERVER: {returnTime()}]: Client-IP: ",clientAddress)`

Checked for potential bottlenecks in the code. Adjusted the sleep time to ensure the desired message rate. Examined the server script (udp_stream_server.py), which simply prints incoming messages and client addresses.

1

There are 1 best solutions below

0
Mark Tolonen On

As mentioned in comments, compute the absolute time to send each message and sleep until that time. In this way variations in OS thread scheduling and processing each message are averaged out.

Below is an implementation of a server that receives a message of the expected test parameters, then receives and notes the time of each message for later computing of statistics. Note that in the event of a client aborting early or due to UDP being unreliable and could drop or out-of-order packets, a sync packet was implemented to make sure the server was ready for the next test and wouldn't have to be restarted.

import socket
import time

serverPort = 8080
serverSocket = socket.socket(type=socket.SOCK_DGRAM)
serverSocket.bind(('', serverPort))
print(f'UDP Server has started listening on port: {serverPort}')

def receive_header(serverSocket):
    while True:
        try:
            message_total, message_rate = [int(n) for n in serverSocket.recvfrom(1024)[0].split()]
            break
        except ValueError:  # received on a sync
            pass
    print(f'Receiving {message_total} messages @ {message_rate} messages/second...')
    print(f'Expected time {message_total/message_rate:.6f} seconds at intervals of {1/message_rate:.6f} seconds.')
    return message_total, message_rate

while True:
    message_total, message_rate = receive_header(serverSocket)
    times = [time.perf_counter()]  # Start time.
    for _ in range(message_total):
        message, clientAddress = serverSocket.recvfrom(1024)
        if message == b'sync':
            break  # ends "for" and skips "else:"
        times.append(time.perf_counter())
    else:
        total = times[-1] - times[0]
        intervals = [times[n+1] - times[n] for n in range(len(times) - 1)]
        ave = sum(intervals) / len(intervals)
        max_interval = max(intervals)
        min_interval = min(intervals)
        print(f'  Total Time       {total:9.6f}')
        print(f'  Minimum Interval {min_interval:9.6f}')
        print(f'  Average Interval {ave:9.6f}')
        print(f'  Maximum Interval {max_interval:9.6f}')

Below is a client for that server that sends the test requirements and sends messages at specific absolute times based on the message rate:

import socket
import time
import sys

def send_header(client_socket, target_ip, target_port, message_total, message_rate):
    client_socket.sendto(b'sync', (target_ip, target_port))  # make ready to receive header
    client_socket.sendto(f'{message_total} {message_rate}'.encode(), (target_ip, target_port))

def udp_stream(target_ip, target_port, message_total, message_rate):
    client_socket = socket.socket(type=socket.SOCK_DGRAM)
    send_header(client_socket, target_ip, target_port, message_total, message_rate)
    message = b"size shouldn't matter"
    start = time.perf_counter()
    for n in range(message_total):
        next_send = start + (n + 1) / message_rate  # time to send next message
        sleep_time = next_send - time.perf_counter()
        if sleep_time < 0:  # if time already passed, log and skip sleep
            print('!', end='', flush=True)
        else:
            time.sleep(sleep_time)
        client_socket.sendto(message, (target_ip, target_port))
    print(f'\ntotal time {time.perf_counter() - start}')

rate = int(sys.argv[1])
udp_stream('localhost', 8080, 10 * rate, rate)

Example run of rates 1, 10, 40, 1000, 3000:

UDP Server has started listening on port: 8080
Receiving 10 messages @ 1 messages/second...
Expected time 10.000000 seconds at intervals of 1.000000 seconds.
  Total Time       10.000234
  Minimum Interval  0.999843
  Average Interval  1.000023
  Maximum Interval  1.000573
Receiving 100 messages @ 10 messages/second...
Expected time 10.000000 seconds at intervals of 0.100000 seconds.
  Total Time       10.000396
  Minimum Interval  0.099620
  Average Interval  0.100004
  Maximum Interval  0.100455
Receiving 400 messages @ 40 messages/second...
Expected time 10.000000 seconds at intervals of 0.025000 seconds.
  Total Time       10.000506
  Minimum Interval  0.024607
  Average Interval  0.025001
  Maximum Interval  0.025614
Receiving 10000 messages @ 1000 messages/second...
Expected time 10.000000 seconds at intervals of 0.001000 seconds.
  Total Time       10.000564
  Minimum Interval  0.000390
  Average Interval  0.001000
  Maximum Interval  0.001461
Receiving 30000 messages @ 3000 messages/second...
Expected time 10.000000 seconds at intervals of 0.000333 seconds.
  Total Time       10.698635
  Minimum Interval  0.000308
  Average Interval  0.000357
  Maximum Interval  0.003552

On my system only 1 packet at rate 1000 was sent after the expected absolute time (sleep_time < 0) but the min/max intervals had a larger spread. At rate 3000 the system couldn't maintain the rate and it took ~10.7 seconds to send 3000 messages instead of the expected 10 seconds.