Python socketio with multiprocessing

1.3k Views Asked by At

So I have been struggling with this one error of pickle which is driving me crazy. I have the following master Engine class with the following code :

import eventlet
import socketio
import multiprocessing
from multiprocessing import Queue
from multi import SIOSerever

class masterEngine:

    if __name__ == '__main__': 
            
        serverObj = SIOSerever()

        try:
            receiveData = multiprocessing.Process(target=serverObj.run)
            receiveData.start()

            receiveProcess = multiprocessing.Process(target=serverObj.fetchFromQueue)
            receiveProcess.start()

            receiveData.join()
            receiveProcess.join()
            
        except Exception as error:
            print(error)

and I have another file called multi which runs like the following :

import multiprocessing
from multiprocessing import Queue
import eventlet
import socketio

class SIOSerever:

  def __init__(self):
    self.cycletimeQueue = Queue()
    self.sio = socketio.Server(cors_allowed_origins='*',logger=False)
    self.app = socketio.WSGIApp(self.sio, static_files={'/': 'index.html',})
    self.ws_server = eventlet.listen(('0.0.0.0', 5000))

    @self.sio.on('production')
    def p_message(sid, message):
      self.cycletimeQueue.put(message)
      print("I logged : "+str(message))

  def run(self):
    eventlet.wsgi.server(self.ws_server, self.app)

  def fetchFromQueue(self):
    while True:
      cycle = self.cycletimeQueue.get()
      print(cycle)

As you can see I can trying to create two processes of def run and fetchFromQueue which i want to run independently.

My run function starts the python-socket server to which im sending some data from a html web page ( This runs perfectly without multiprocessing). I am then trying to push the data received to a Queue so that my other function can retrieve it and play with the data received.

I have a set of time taking operations that I need to carry out with the data received from the socket which is why im pushing it all into a Queue.

On running the master Engine class I receive the following :

Can't pickle <class 'threading.Thread'>: it's not the same object as threading.Thread
I ended!
[Finished in 0.5s]

Can you please help with what I am doing wrong?

1

There are 1 best solutions below

3
On

From multiprocessing programming guidelines:

Explicitly pass resources to child processes

On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Apart from making the code (potentially) compatible with Windows and the other start methods this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.

Therefore, I slightly modified your example by removing everything unnecessary, but showing an approach where the shared queue is explicitly passed to all processes that use it:

import multiprocessing

MAX = 5

class SIOSerever:

  def __init__(self, queue):
    self.cycletimeQueue = queue

  def run(self):
    for i in range(MAX):
      self.cycletimeQueue.put(i)

  @staticmethod
  def fetchFromQueue(cycletimeQueue):
    while True:
      cycle = cycletimeQueue.get()
      print(cycle)
      if cycle >= MAX - 1:
        break


def start_server(queue):
    server = SIOSerever(queue)
    server.run()


if __name__ == '__main__':

    try:
        queue = multiprocessing.Queue()
        receiveData = multiprocessing.Process(target=start_server, args=(queue,))
        receiveData.start()

        receiveProcess = multiprocessing.Process(target=SIOSerever.fetchFromQueue, args=(queue,))
        receiveProcess.start()

        receiveData.join()
        receiveProcess.join()

    except Exception as error:
        print(error)
0
1
...