python django asyncua subscription synchronous

44 Views Asked by At

In the following example, I call my function check_data_acknowledge from a synchronous function to subscribe to a node, determine the moment of its modification, and act accordingly.

I just found a starting solution that I will share and update when I finally finish

import asyncio

from asyncua import Node
from asyncua.common.subscription import SubscriptionHandler, SubHandler
from asyncua.sync import Client
from opcua.common.subscription import DataChangeNotif

from prod_declarator.config_line import line_id
from prod_declarator.opcua import opcua_instance


class MyHandler(SubHandler):
    def __init__(self):
        self._queue = asyncio.Queue()

    def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
        self._queue.put_nowait([node, value, data])
        print(f'Data change notification was received and queued.')

    async def process(self) -> None:
        try:
            while True:
                # Get data in a queue.
                [node, value, data] = self._queue.get_nowait()
                path = node.get_path(as_string=True)

                # *** Write your processing code ***

                print(f'New value {value} of "{path}" was processed.')

        except asyncio.QueueEmpty:
            pass


async def check_data_acknowledge(part_declaration_check_packaging_number) -> None:
    end_point = line_id[part_declaration_check_packaging_number['line_id']]["end_point"]
    time_out = 600000
    print(f"End point : {end_point}")
    name_space = line_id[part_declaration_check_packaging_number['line_id']]["associatedNameSpaceIndex"]
    node_data_acknowledge_line_id = opcua_instance.instanceExigences["data_acknowledge"]
    line_name = line_id[part_declaration_check_packaging_number["line_id"]]["lineName"]
    node_data_acknowledge = node_data_acknowledge_line_id.replace("LineName", line_name)

    client = Client(end_point, timeout=time_out)
    try:
        print(f"Before connect: {client}")
        client.connect()
        print(f"Connected: {client}")

        # Get a variable node.
        node = client.get_node("ns={};s={}".format(name_space, node_data_acknowledge))

        # Subscribe data change.
        handler = MyHandler()
        subscription = client.create_subscription(period=0, handler=handler)
        subscription.subscribe_data_change(node)

        # Process data change every 100ms
        while True:
            await handler.process()
            await asyncio.sleep(0.1)

    except Exception as e:
        import traceback
        traceback.print_exc()
        print(f"An error occurred during connection: {e}")

    finally:
        print(f"Before disconnect: {client}")
        if client is not None:
            client.disconnect()
            print(f"After disconnect: {client}")

    return part_declaration_check_packaging_number

Thank you in advance for your future advice.

0

There are 0 best solutions below