How to access multiprocessing.Manager().Namespace element remotely via SyncManager?

426 Views Asked by At

No matter how hard I read the docs, I fail to understand how to use a SyncManager to access remotely an item of a multiprocessing Namespace.

Please note the code below is the shortest possible to demonstrate what works and what fails. The crux of my problem lies in the calls to register, which I fail to properly understand how to use.

The example below comprises of two parts (both in the same file mp-example.py): a server and a client. For the client part to do its work, the server part must be running (in another terminal/screen).

First, the code that works:

#!/usr/bin/env python3

import os
import sys
import multiprocessing as mp
import multiprocessing.managers as mpm
import queue
import socket
import contextlib
from contextlib import contextmanager

def get_ip():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
        s.connect(('192.168.17.1', 80))
        return s.getsockname()[0]

def get_free_port():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

def startserver(name):
    # Find a free port for the SyncManager
    newportnumber = get_free_port()
    Server = {'address': (get_ip(), newportnumber), 'authkey': b'mptest'}

    # Save data to file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'wt', encoding="utf-8") as serverdata_file:
        print(Server, file=serverdata_file)
        print(Server)

    # On the server, we do not need our own IP address
    Server['address'] = ('', newportnumber)

    # Set up server:
    #   define a multiprocessing manager
    #   define a multipprocessing namespace
    #   define a Value item in the namespace
    #   define a Value item outside the namespace
    #   define a multiprocessing syncmanager
    #   start the syncmanager
    #   register both the namespace and the value outside the namespace
    #   set values
    #   initialise a different process
    #   print the values
    #   run the other process & wait for completion
    #   print the values
    #   read standard input & print the values (waiting for a remote process connection)

    m = mp.Manager()
    m.name = 'mp.Manager: ' + name

    with m:
        ns = m.Namespace()
        ns.name = 'm.Namespace: ' + name
        ns.yv = m.Value(int, 0)
        yv = m.Value(int, 0)

        bm = mpm.SyncManager(**Server)
        bm.name = 'mpm.SyncManager: ' + name

        #bm.register('ns', callable=lambda: ns, proxytype=mpm.NamespaceProxy,
        #            exposed=('__getattribute__', '__setattr__', '__delattr__', 'yv')
        #           )
        # The above has not made any difference
        bm.register('ns', callable=lambda: ns, proxytype=mpm.NamespaceProxy)
        bm.register('yv', callable=lambda: yv, proxytype=mpm.ValueProxy)

        with bm:
            yv.value = 99
            ns.yv = 999

            def f(n, y):
                n.yv += 222
                y.value += 33

            proc = mp.Process(target=f, args=(ns, yv))
            print(f'yv={yv}, ns.yv={ns.yv}')
            proc.start()
            proc.join()
            proc.terminate()
            print(f'yv={yv}, ns.yv={ns.yv}, ns={ns}')
            print('Waiting for input (waiting for client), press CTRL-D to end')
            for line in sys.stdin:
                print(f'yv={yv}, ns.yv={ns.yv}, ns={ns}')
                print('Waiting for input (waiting for client), press CTRL-D to end')

elif sys.argv[1] == 'server' and len(sys.argv) == 3:
    startserver(sys.argv[2])
elif sys.argv[1] == 'client' and len(sys.argv) == 3:
    startclient(sys.argv[2])

The code above does what I expect when run on its own:

> python3 mp-example.py server test
{'address': ('192.168.17.10', 50793), 'authkey': b'mptest'}
yv=Value(<class 'int'>, 99), ns.yv=999
yv=Value(<class 'int'>, 132), ns.yv=1221, ns=Namespace(name='m.Namespace: test', yv=1221)
Waiting for input (waiting for client), press CTRL-D to end

Next, the bit that does not work:

The code below works up to a point, but fails when trying to access the Value item yv within the Namespace.

Code:

def startclient(name):
    # retrieve server identification from file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'rt', encoding="utf-8") as serverdata_file:
        Server = eval(serverdata_file.read())
        print(Server)

    bm = mpm.SyncManager(**Server)
    bm.register('yv', proxytype=mpm.ValueProxy)
    bm.register('ns', proxytype=mpm.NamespaceProxy)
    bm.connect()

    yv = bm.yv()
    print(f'yv={yv.value}')
    yv.value += 55
    print(f'yv={yv.value}')

    ns = bm.ns()
    print(f'ns={ns}')
    print(f'ns.yv={ns.yv}')
    ns.yv += 666
    print(f'ns.yv={ns.yv}')

Output:

> python3 mp-example.py client test
{'address': ('192.168.17.10', 50793), 'authkey': b'mptest'}
yv=627
yv=682
['_Client', '__class__', '__deepcopy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_address_to_local', '_after_fork', '_authkey', '_callmethod', '_close', '_connect', '_decref', '_exposed_', '_getvalue', '_id', '_idset', '_incref', '_manager', '_mutex', '_owned_by_manager', '_serializer', '_tls', '_token']
Help on method _getvalue in module multiprocessing.managers:

_getvalue() method of multiprocessing.managers.NamespaceProxy instance
    Get a copy of the value of the referent

None
ns=<NamespaceProxy object, typeid 'Namespace' at 0x7f5d05689a20>
Traceback (most recent call last):
  File "mp-example.py", line 333, in <module>
    startclient(sys.argv[2])
  File "mp-example.py", line 282, in startclient
    print(f'ns.yv={ns.yv}')
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 1060, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'NamespaceProxy' object has no attribute 'yv'

Question:

What do I need to do to access ns.yv in the client?

2

There are 2 best solutions below

0
asoundmove On BEST ANSWER

Edit - comprehensive solution:

I resolved most of my frustrations by writing a new RemoteSyncManager class to encapsulate the multiprocessing.managers.SyncManager() instance.

This library of mine really simplifies the use of the SyncManager both for local and remote processes.

Head over to my GitHub repository PythonLibraries-Multiprocessing-RemoteSyncManager for details.

My first solution:

I finally managed to set-up a Managed multiprocessing Namespace that synchronises with a remote connection.

But this set-up refuses to work with Managed dict, it works in a similar way to Booboo's suggestion, above.

Is there any way to define and keep synchronised a whole dict in a remotely shared namespace without having to re-assign the dict at every operation?

Code:

if __name__ == '__main__':
    if sys.argv[1] == 'Server' and len(sys.argv) == 2:
        #localmanager = mp.Manager()
        #with localmanager:
        q1 = mp.Queue(1)
        q2 = mp.Queue(1)
        localnamespace = mpm.Namespace()
    
        class syncmanager(mpm.SyncManager):
            pass
        syncmanager.register('get_q1', callable=lambda: q1)
        syncmanager.register('get_q2', callable=lambda: q2)
        syncmanager.register('get_ns', callable=lambda: localnamespace, proxytype=mpm.NamespaceProxy)
    
        remotemanager = syncmanager(address = ('', 56789), authkey = b'mypassword')
        remotemanager.start()
    
        with remotemanager:
            ns = remotemanager.get_ns()
            #ns.sampledict = remotemanager.dict()
            ns.sampledict = {}
            #ns.sampledict = localmanager.dict()
            ns.sampledict['testabc'] = 1234567
            ns.sampledict['TESTABC'] = 9876
            d = {}
            d['testabc'] = 1234567
            d['TESTABC'] = 9876
            ns.sampledict2 = d
    
            print(f'Server ns = {ns}')
    
            print(q2.get())
            q1.put(f'Server ns = {ns}')
    
            ns.sampledict['testabc'] = 1
            ns.sampleserverint = 98789
            print(f'Server ns = {ns}')
    
            print(q2.get())
            q1.put(f'Server ns = {ns}')
    
    elif sys.argv[1] == 'Client' and len(sys.argv) == 2:
        class syncmanager(mpm.SyncManager):
            pass
        syncmanager.register('get_q1')
        syncmanager.register('get_q2')
        syncmanager.register('get_ns', proxytype=mpm.NamespaceProxy)
    
        remotemanager = syncmanager(address = ('', 56789), authkey = b'mypassword')
        remotemanager.connect()
    
        q1 = remotemanager.get_q1()
        q2 = remotemanager.get_q2()
        ns = remotemanager.get_ns()
    
        q2.put('ready')
        msg = q1.get()
        print(msg)
        print(f'Client ns = {ns}')
    
        ns.sampledict['TESTABC'] = 9
        ns.sampleclientint = 7887
    
        q2.put('next.1')
        msg = q1.get()
        print(msg)
        print(f'Client ns = {ns}')
8
Booboo On

I struggled mightily with it, too -- it's difficult to come up with any solution along the lines you are following that isn't convoluted and inefficient. As an aside, you are attempting to return a proxy to a shared memory value, which is already shareable without requiring a proxy and if your goal is to just serve up a singleton value, there is no need for it to be in shared memory.

A simpler approach, which does not require you to create two manager processes just so you can create a proxy object that can then be registered with another manager before you start it, is to just define your own custom classes. You typically create your own subclass of multiprocessing.managers.BaseManager with which you will register your new managed classes (of course, if your client also needs the classes already registered with the SyncManager, e.g. a dict, then you would subclass that).

The following program shows how you create a singleton Namespace equivalent type. I commented out your IP address and used 127.0.0.1 for testing. I also replaced your call to eval with a safer substitute.

#!/usr/bin/env python3

import os
import sys
import multiprocessing as mp
from multiprocessing.managers import BaseManager, NamespaceProxy
import socket
import contextlib
from ast import literal_eval
from threading import Thread


def get_ip():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
        #s.connect(('192.168.17.1', 80))
        s.connect(('127.0.0.1', 80))
        return s.getsockname()[0]

def get_free_port():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

class MyNamespaceManager(BaseManager):
    pass

class MyNamespace:
    pass

# Singleton:
singleton = MyNamespace()

def get_singleton():
    return singleton

def run_server(server):
    server.serve_forever()

def startserver():
    # Find a free port for the SyncManager
    newportnumber = get_free_port()
    Server = {'address': (get_ip(), newportnumber), 'authkey': b'mptest'}

    # Save data to file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'wt', encoding="utf-8") as serverdata_file:
        print(Server, file=serverdata_file)
        print(Server)

    # On the server, we do not need our own IP address
    Server['address'] = ('', newportnumber)

    # Singleton
    MyNamespaceManager.register('MyNamespace', callable=get_singleton, proxytype=NamespaceProxy)

    manager = MyNamespaceManager(**Server)
    """
    manager.start()
    input('Hit enter to end: ')
    manager.shutdown()
    """
    server = manager.get_server()
    Thread(target=run_server, args=(server,)).start()
    try:
        input('Hit enter to end: ')
    except KeyboardInterrupt:
        pass
    os.unlink(serverdata_filename)
    server.stop_event.set() # The server stops and calls sys.exit()

def startclient():
    # retrieve server identification from file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'rt', encoding="utf-8") as serverdata_file:
        Server = literal_eval(serverdata_file.read())
        print(Server)

    MyNamespaceManager.register('MyNamespace', proxytype=NamespaceProxy)
    manager = MyNamespaceManager(**Server)
    manager.connect()

    yv1 = manager.MyNamespace()
    yv1.value = 99
    yv1.value += 55
    yv1.x = 9

    yv2 = manager.MyNamespace()
    print(f'yv2.value={yv2.value}')
    print(f'yv2.x={yv2.x}')


if __name__ == '__main__':

    if len(sys.argv) == 2 and sys.argv[1] == 'server':
        startserver()
    else:
        startclient()

The client prints:

{'address': ('127.0.0.1', 51211), 'authkey': b'mptest'}
yv2.value=154
yv2.x=9