I utilized a Raspberry Pi and executed my Python script within the Thonny IDE. The script incorporates the Socket Module for data reception from UWB ESP32 devices, which are programmable with Arduino.
With the current OOP python code, I can observe that I resolved the error about "OSError: [Errno 98] Address already in use" with multiple consecutive trials by adding this line of code after "self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)".
ADDED LINE
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Here's the full code for that one. In addition, I am not quite sure if I correctly implement the closing of socket connection. But is working as of now.
OOP PYTHON CODE
import socket
import json
import cmath
import os
import time
class UWB:
def __init__(self):
self.connect_network()
self.line = []
def connect_network(self):
self.interface = 'wlan0' # Raspberry Pi Network Interface name
# Get the IP address for the specified interface
try:
self.UDP_IP = os.popen(f"ifconfig {self.interface} | grep 'inet ' | awk '{{print $2}}'").read().strip()
print("***Local IP:" + str(self.UDP_IP) + "*** \n")
except Exception as e:
print("Error at IP: {e}")
self.UDP_PORT = 8080 # Non-priveleged
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.UDP_IP, self.UDP_PORT))
self.sock.listen(1)
print("[---STATUS---] Waiting to secure connection... \n")
self.data, self.addr = self.sock.accept() #
print("[---STATUS---] Accepted Connection from Foreign IP: " + str(self.addr))
# Read UWB data from the network and return a list of UWB data
def read_data(self):
self.line = self.data.recv(1024).decode('UTF-8') # Read data from the network socket
self.uwb_list = []
try:
self.uwb_data = json.loads(self.line) # Parse the received JSON data
self.uwb_list = self.uwb_data["links"] # Extract the "links" field from the JSON data
except:
pass
return self.uwb_list
def start_uwbProcess(self):
while True:
self.line = self.read_data()
print("line", self.line)
time.sleep(0.05)
def get_uwbData(self):
return self.line
def uwbClose(self):
print("Sock Closed")
self.sock.close()
# Initialization
system = UWB()
try:
system.start_uwbProcess()
except OSError as e:
print("Error: {e}")
system.uwbClose()
except KeyboardInterrupt:
print("1) System stopped!..")
system.uwbClose()
finally:
print("2) System stopped!..")
system.uwbClose()
With that, I tried to call the process and data from class UWB to class ROBOTCONTROLLER. I used the module of multiprocessing with Process and Queue for this one.
However, now I am encountering that error again "OSError: [Errno 98] Address already in use". At first, it will connect and the data transferring is okay as what I can produce with the OOP Python Code however after stopping and running the program again, I cant connect to the same port address again.
Here's my current code with multiprocessing.
MULTIPROCESSING
import socket
import json
import cmath
import os
import time
class UWB:
def __init__(self, result_queue):
self.result_queue = result_queue
self.connect_network()
self.list = []
def connect_network(self):
self.interface = 'wlan0' # Raspberry Pi Network Interface name
# Get the IP address for the specified interface
try:
self.UDP_IP = os.popen(f"ifconfig {self.interface} | grep 'inet ' | awk '{{print $2}}'").read().strip()
print("***Local IP:" + str(self.UDP_IP) + "*** \n")
except Exception as e:
print("Error at IP: {e}")
self.UDP_PORT = 8080 # Non-priveleged
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.UDP_IP, self.UDP_PORT))
self.sock.listen(1) # No. of Connected Device
print("[---STATUS---] Waiting to secure connection... \n")
self.data, self.addr = self.sock.accept() #
print("[---STATUS---] Accepted Connection from Foreign IP: " + str(self.addr))
# Read UWB data from the network and return a list of UWB data
def read_data(self):
self.line = self.data.recv(1024).decode('UTF-8') # Read data from the network socket
self.uwb_list = []
try:
self.uwb_data = json.loads(self.line) # Parse the received JSON data
self.uwb_list = self.uwb_data["links"] # Extract the "links" field from the JSON data
except:
pass
return self.uwb_list
def start_uwbProcess(self):
while True:
self.list = self.read_data()
get_uwbData = self.list
self.result_queue.put(get_uwbData)
time.sleep(0.05)
def uwbClose(self):
print("Sock Closed")
self.sock.close()
import RPi.GPIO as GPIO
import math
import multiprocessing
import queue
import turtle
import sys
import time
class ROBOTCONTROLLER:
def __init__(self):
self.initialize_pushbuttonMode()
self.initialize_interfaces()
# Initializing the debouncing variables
# For Start
self.startLastState = True
self.startLastTime = 0
# For Resume
self.resumeLastState = True
self.resumeLastTime = 0
# For Debounce Delay in [BUTTONS]
self.debounceDelay = 50 # Can adjust the debounce delay as needed (in milliseconds)
self.processState = False
self.onGoing = False
self.waitCnt = 0
#self.isDeliveryComplete = False # Will decide if will removed, due to isTargetAchived in UWBdata
# Not used Yet
self.prev_lidar_cycle = 0
self.prev_uwb_cycle = 0
######
self.cmd = None
self.lastcmd = None
def initialize_pushbuttonMode(self):
# Initialize PushButton Mode
print("Setting Up Pushbutton Mode")
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
def initialize_interfaces(self):
self.uwb_result_queue = multiprocessing.Queue()
self.uwbMultiProcess = multiprocessing.Process(target=self.uwbProcessFunction, args=(self.uwb_result_queue,))
self.startMultiProcessing = False
def uwbProcessFunction(self, result_queue):
self.uwb = UWB(result_queue)
self.uwb.start_uwbProcess()
def pushbutton_init(self):
# Define Button pin setting:
GPIO.setup(5, GPIO.IN) # START BUTTON: EXTERNAL-PULLUP
GPIO.setup(6, GPIO.IN) # RESUME BUTTON: EXTERNAL-PULLUP
def run(self):
try:
# Activating System's Process
while True:
if not self.processState:
# Readings of Start Button
curTime = int(time.time() * 1000) # Convert current time to milliseconds
startCurState = GPIO.input(5) # Read the current state of the start button
print("CurStart: " + str(startCurState))
print("Process: " + str(self.processState)) # Already Initialized in function "def initialize_variables(self)"
if startCurState != self.startLastState:
self.startLastTime = curTime
if (curTime - self.startLastTime) > self.debounceDelay:
if startCurState == False: #ON
print("START BUTTON is PRESSED")
self.processState = True #ON
print("PROCESSING...")
else: #==True OFF
print("PRESS the START BUTTON")
self.startLastState = startCurState
else: #self.processState == True:
if self.onGoing:
self.lidarUWBMultiProcessingCallback()
#self.control_center() -> Moved inside lidarUWBMultiProcessingCallback()
else:
curTime = int(time.time()*1000)
resumeCurState = GPIO.input(6)
print("CurResume: ", resumeCurState)
if resumeCurState != self.resumeLastState:
self.resumeLastTime = curTime
if (curTime - self.resumeLastTime) > self.debounceDelay:
if resumeCurState == False: # ON
print("RESUME BUTTON is PRESSED")
self.onGoing = True
else:
print("PRESS the RESUME BUTTON")
self.resumeLastState = resumeCurState
time.sleep(0.05) #Added for incrementing of cycle counts atomically
except Exception as e:
print(f"An error occured: {e}")
def lidarUWBMultiProcessingCallback(self):
if not self.startMultiProcessing:
self.uwbMultiProcess.start()
self.startMultiProcessing = True
print("LidarUWBMultiProcessing started.")
else:
#print("LidarUWBThread is already running")
pass
self.control_center()
def control_center(self):
if not self.uwb_result_queue.empty():
uwbData = self.uwb_result_queue.get_nowait()
print(uwbData)
if __name__ == '__main__':
# Initialization
robot = ROBOTCONTROLLER()
robot.pushbutton_init()
# Activating System's Process
robot.run()
I tried to run again to run the OOP Python Code again but if I tried to run the code with multiprocessing code fisrt, OOP python code isn't connecting also but if vice versa, when the OOP python code is runs first, it connects smoothly before running multiprocessing code.
When the error shows, my only solution is to restart my raspberry pi which is very time-consuming. Here's the traceback below.
MULTIPROCESSING TRACEBACK
RESUME BUTTON is PRESSED
LidarUWBMultiProcessing started.
***Local IP:192.168._.___***
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/rome/Desktop/MP_UWB_INPROGRESS2.py", line 105, in uwbProcessFunction
self.uwb = UWB(result_queue)
^^^^^^^^^^^^^^^^^
File "/home/rome/Desktop/MP_UWB_INPROGRESS2.py", line 11, in __init__
self.connect_network()
File "/home/rome/Desktop/MP_UWB_INPROGRESS2.py", line 26, in connect_network
self.sock.bind((self.UDP_IP, self.UDP_PORT))
OSError: [Errno 98] Address already in use
Hoping for any response that can help me fix this one. Thanks.
Finally, got resolved the issue using this command in RPI terminal
and use subprocess module for this to be called in my python script with retry functionality.