FreeOpcUa/opcua-asyncio

Unable to get subscribe example working

Opened this issue · 2 comments

Describe the bug

Unable to get client subscribe to work.

To Reproduce

See code.… [1]

Expected behavior

If Data is changed on the server the client should be notified via the subscription.

Version

Python-Version: 3.11.9

opcua-asyncio Version (e.g. master branch, 0.9): 1.1.5

[1]

# server.py
import asyncio
import logging
from asyncua import Server, Client, Node
from datetime import datetime


class OPCUAServer:
    def __init__(self, endpoint: str = "opc.tcp://0.0.0.0:4840/freeopcua/server/"):
        self.endpoint = endpoint
        self.server = Server()
        self.trigger_var = None
        self.part_id_var = None
        self.namespace_idx = None

    async def init(self):
        await self.server.init()
        self.server.set_endpoint(self.endpoint)

        # Set server name
        await self.server.set_application_uri("urn:example:opcua:server")

        # Get Objects node
        objects = self.server.get_objects_node()

        # Create custom namespace
        uri = "http://examples.freeopcua.github.io"
        self.namespace_idx = await self.server.register_namespace(uri)

        # Load XML nodeset
        await self.server.import_xml("UA_NodeSet.xml")

        # Create a new object
        myobj = await objects.add_object(self.namespace_idx, "Process")

        # Create variables
        self.trigger_var = await myobj.add_variable(
            self.namespace_idx, "Trigger", False
        )
        self.part_id_var = await myobj.add_variable(self.namespace_idx, "PartID", "")

        # Set variables writable
        await self.trigger_var.set_writable()
        await self.part_id_var.set_writable()

        print(f"Server namespace index: {self.namespace_idx}")
        print(f"Trigger node id: {self.trigger_var.nodeid}")
        print(f"PartID node id: {self.part_id_var.nodeid}")

    async def start(self):
        async with self.server:
            while True:
                # Simulate trigger and part ID updates
                current_time = datetime.now().strftime("%H:%M:%S")

                await self.trigger_var.write_value(True)
                name = (await self.trigger_var.read_browse_name()).Name
                value = (await self.trigger_var.read_value())
                print(f"{name} = {value}")

                await self.part_id_var.write_value(f"PART_{current_time}")
                name = (await self.part_id_var.read_browse_name()).Name
                value = (await self.part_id_var.read_value())
                print(f"{name} = {value}")

                # Wait for 5 seconds before next update
                await asyncio.sleep(5)
                await self.trigger_var.write_value(False)
                name = (await self.trigger_var.read_browse_name()).Name
                value = (await self.trigger_var.read_value())
                print(f"{name} = {value}")
                await asyncio.sleep(5)


# client.py

class SubscriptionHandler:

    def datachange_notification(self, node: Node, val, data):
        try:
            node_name = node
            print(f"New value for {node_name}: {val} {data=}")
        except Exception as e:
            print(f"Error in notification handler: {e}")


class OPCUAClient:
    def __init__(self, url: str = "opc.tcp://localhost:4840/freeopcua/server/"):
        self.url = url
        self.client = Client(url=self.url)

    async def subscribe_to_variables(self):
        async with self.client:
            try:
                # Find the namespace index
                uri = "http://examples.freeopcua.github.io"
                nsidx = await self.client.get_namespace_index(uri)
                print(f"Client namespace index: {nsidx}")

                # Get the Process node first
                objects = self.client.get_objects_node()
                process_node = await objects.get_child(f"{nsidx}:Process")

                # Get variables using their browse paths
                trigger_node = await process_node.get_child(f"{nsidx}:Trigger")
                part_id_node = await process_node.get_child(f"{nsidx}:PartID")

                print(f"Found trigger node: {trigger_node.nodeid}")
                print(f"Found part_id node: {part_id_node.nodeid}")

                # Create subscription
                handler = SubscriptionHandler()
                subscription = await self.client.create_subscription(100, handler=handler)
                await subscription.subscribe_data_change([trigger_node, part_id_node], sampling_interval=0)

                # Keep the client running
                while True:
                    print("ZZzzzZZzzZ!")
                    await asyncio.sleep(1)

            except Exception as e:
                print(f"Error in client: {e}")
                raise


# Example XML configuration (UA_NodeSet.xml)
XML_CONTENT = """<?xml version="1.0" encoding="utf-8"?>
<UANodeSet xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:uax="http://opcfoundation.org/UA/2008/02/Types.xsd" xmlns="http://opcfoundation.org/UA/2011/03/UANodeSet.xsd">
    <NamespaceUris>
        <Uri>http://examples.freeopcua.github.io</Uri>
    </NamespaceUris>
    <UAObject NodeId="ns=1;i=1" BrowseName="1:Process">
        <DisplayName>Process</DisplayName>
        <References>
            <Reference ReferenceType="HasComponent" IsForward="false">i=85</Reference>
        </References>
    </UAObject>
    <UAVariable NodeId="ns=1;i=2" BrowseName="1:Trigger" DataType="Boolean">
        <DisplayName>Trigger</DisplayName>
        <References>
            <Reference ReferenceType="HasComponent" IsForward="false">ns=1;i=1</Reference>
        </References>
    </UAVariable>
    <UAVariable NodeId="ns=1;i=3" BrowseName="1:PartID" DataType="String">
        <DisplayName>PartID</DisplayName>
        <References>
            <Reference ReferenceType="HasComponent" IsForward="false">ns=1;i=1</Reference>
        </References>
    </UAVariable>
</UANodeSet>
"""

# main.py
async def main():
    # Save XML configuration
    with open("UA_NodeSet.xml", "w") as f:
        f.write(XML_CONTENT)
     # Create and start server
    server = OPCUAServer()
    await server.init()

    # Create and start client
    client = OPCUAClient()

    # Run both server and client concurrently
    await asyncio.gather(server.start(), client.subscribe_to_variables())


if __name__ == "__main__":
    # logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

The server is just once sending a publish event:

INFO:asyncua.server.internal_server:No user manager specified. Using default permissive manager instead.
INFO:asyncua.server.internal_session:Created internal session Internal
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=11715, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15958, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15959, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15960, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15961, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15962, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15963, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=15964, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=16134, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=16135, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.server.address_space:add_node: while adding node NumericNodeId(Identifier=16136, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO:asyncua.common.xmlimporter:Importing XML file UA_NodeSet.xml
INFO:asyncua.common.xmlimporter:namespace map: {1: 2}
INFO:asyncua.common.xmlimporter:Importing xml node (QualifiedName(NamespaceIndex=2, Name='Process'), NodeId(Identifier=1, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)) as (QualifiedName(NamespaceIndex=2, Name='Process') NodeId(Identifier=1, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>))
INFO:asyncua.common.xmlimporter:Importing xml node (QualifiedName(NamespaceIndex=2, Name='Trigger'), NodeId(Identifier=2, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)) as (QualifiedName(NamespaceIndex=2, Name='Trigger') NodeId(Identifier=2, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>))
INFO:asyncua.common.xmlimporter:Importing xml node (QualifiedName(NamespaceIndex=2, Name='PartID'), NodeId(Identifier=3, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)) as (QualifiedName(NamespaceIndex=2, Name='PartID') NodeId(Identifier=3, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>))
Server namespace index: 2
Trigger node id: NodeId(Identifier=5, NamespaceIndex=2, NodeIdType=<NodeIdType.FourByte: 1>)
PartID node id: NodeId(Identifier=6, NamespaceIndex=2, NodeIdType=<NodeIdType.FourByte: 1>)
WARNING:asyncua.server.server:Endpoints other than open requested but private key and certificate are not set.
INFO:asyncua.server.internal_server:starting internal server
INFO:asyncua.client.client:connect
INFO:asyncua.client.ua_client.UaClient:opening connection
INFO:asyncua.server.binary_server_asyncio:Listening on 0.0.0.0:4840
Trigger = True
PartID = PART_17:50:10
INFO:asyncua.server.binary_server_asyncio:New connection from ('127.0.0.1', 38948)
INFO:asyncua.uaprotocol:updating server limits to: TransportLimits(max_recv_buffer=65535, max_send_buffer=65535, max_chunk_count=1601, max_message_size=104857600)
INFO:asyncua.uaprotocol:updating client limits to: TransportLimits(max_recv_buffer=65535, max_send_buffer=65535, max_chunk_count=1601, max_message_size=104857600)
INFO:asyncua.client.ua_client.UASocketProtocol:open_secure_channel
INFO:asyncua.client.ua_client.UaClient:create_session
INFO:asyncua.server.uaprocessor:Create session request (None)
INFO:asyncua.server.internal_session:Created internal session ('127.0.0.1', 38948)
INFO:asyncua.server.internal_session:Create session request
INFO:asyncua.server.internal_server:get endpoint
INFO:asyncua.client.client:find_endpoint [EndpointDescription(EndpointUrl='opc.tcp://localhost:4840/freeopcua/server/', Server=ApplicationDescription(ApplicationUri='urn:example:opcua:server', ProductUri='urn:freeopcua.github.io:python:server', ApplicationName=LocalizedText(Locale=None, Text='FreeOpcUa Python Server'), ApplicationType_=<ApplicationType.ClientAndServer: 2>, GatewayServerUri=None, DiscoveryProfileUri=None, DiscoveryUrls=['opc.tcp://localhost:4840/freeopcua/server/']), ServerCertificate=None, SecurityMode=<MessageSecurityMode.None_: 1>, SecurityPolicyUri='http://opcfoundation.org/UA/SecurityPolicy#None', UserIdentityTokens=[UserTokenPolicy(PolicyId='anonymous', TokenType=<UserTokenType.Anonymous: 0>, IssuedTokenType=None, IssuerEndpointUrl=None, SecurityPolicyUri=None), UserTokenPolicy(PolicyId='certificate_basic256sha256', TokenType=<UserTokenType.Certificate: 2>, IssuedTokenType=None, IssuerEndpointUrl=None, SecurityPolicyUri=None), UserTokenPolicy(PolicyId='username', TokenType=<UserTokenType.UserName: 1>, IssuedTokenType=None, IssuerEndpointUrl=None, SecurityPolicyUri=None)], TransportProfileUri='http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary', SecurityLevel=0)] <MessageSecurityMode.None_: 1> 'http://opcfoundation.org/UA/SecurityPolicy#None'
INFO:asyncua.client.ua_client.UaClient:activate_session
INFO:asyncua.server.uaprocessor:Activate session request (None)
INFO:asyncua.server.internal_session:activate session
INFO:asyncua.server.internal_session:Activated internal session ('127.0.0.1', 38948) for user User(role=<UserRole.User: 3>, name=None)
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
INFO:asyncua.client.client:get_namespace_index <class 'list'> ['http://opcfoundation.org/UA/', 'urn:example:opcua:server', 'http://examples.freeopcua.github.io']
Client namespace index: 2
INFO:asyncua.client.client:get_objects_node
INFO:asyncua.server.uaprocessor:translate browsepaths to nodeids request (User(role=<UserRole.User: 3>, name=None))
INFO:asyncua.server.uaprocessor:translate browsepaths to nodeids request (User(role=<UserRole.User: 3>, name=None))
INFO:asyncua.server.uaprocessor:translate browsepaths to nodeids request (User(role=<UserRole.User: 3>, name=None))
Found trigger node: NodeId(Identifier=2, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)
Found part_id node: NodeId(Identifier=3, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)
INFO:asyncua.server.uaprocessor:create subscription request (User(role=<UserRole.User: 3>, name=None))
INFO:asyncua.server.subscription_service:create subscription
INFO:asyncua.server.uaprocessor:Server wants to send publish answer but no publish request is available,enqueuing publish results callback, length of queue is 1
INFO:asyncua.client.ua_client.UaClient:create_subscription success SubscriptionId 78
INFO:asyncua.common.subscription:Subscription created 78
INFO:asyncua.client.ua_client.UaClient:create_monitored_items
INFO:asyncua.server.uaprocessor:create monitored items request (User(role=<UserRole.User: 3>, name=None))
INFO:asyncua.server.subscription_service:create monitored items
INFO:asyncua.server.monitored_item_service.78:request to subscribe to datachange for node NodeId(Identifier=2, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>) and attribute 13
INFO:asyncua.server.monitored_item_service.78:request to subscribe to datachange for node NodeId(Identifier=3, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>) and attribute 13
INFO:asyncua.server.subscription_service:publish request with acks []
INFO:asyncua.common.subscription:Publish callback called with result: PublishResult(SubscriptionId=78, AvailableSequenceNumbers=[1], MoreNotifications=False, NotificationMessage_=NotificationMessage(SequenceNumber=1, PublishTime=datetime.datetime(2024, 11, 11, 16, 50, 10, 76325, tzinfo=datetime.timezone.utc), NotificationData=[DataChangeNotification(MonitoredItems=[MonitoredItemNotification(ClientHandle=201, Value=DataValue(Value=Variant(Value=None, VariantType=<VariantType.Null: 0>, Dimensions=None, is_array=False), StatusCode_=StatusCode(value=0), SourceTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55411, tzinfo=datetime.timezone.utc), ServerTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55413, tzinfo=datetime.timezone.utc), SourcePicoseconds=None, ServerPicoseconds=None)), MonitoredItemNotification(ClientHandle=202, Value=DataValue(Value=Variant(Value=None, VariantType=<VariantType.Null: 0>, Dimensions=None, is_array=False), StatusCode_=StatusCode(value=0), SourceTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55622, tzinfo=datetime.timezone.utc), ServerTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55623, tzinfo=datetime.timezone.utc), SourcePicoseconds=None, ServerPicoseconds=None))], DiagnosticInfos=[])]), Results=[], DiagnosticInfos=[])
New value for ns=2;i=2: None data=DataChangeNotification(<asyncua.common.subscription.SubscriptionItemData object at 0x7f2f5dcba2d0>, MonitoredItemNotification(ClientHandle=201, Value=DataValue(Value=Variant(Value=None, VariantType=<VariantType.Null: 0>, Dimensions=None, is_array=False), StatusCode_=StatusCode(value=0), SourceTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55411, tzinfo=datetime.timezone.utc), ServerTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55413, tzinfo=datetime.timezone.utc), SourcePicoseconds=None, ServerPicoseconds=None)))
New value for ns=2;i=3: None data=DataChangeNotification(<asyncua.common.subscription.SubscriptionItemData object at 0x7f2f5d322790>, MonitoredItemNotification(ClientHandle=202, Value=DataValue(Value=Variant(Value=None, VariantType=<VariantType.Null: 0>, Dimensions=None, is_array=False), StatusCode_=StatusCode(value=0), SourceTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55622, tzinfo=datetime.timezone.utc), ServerTimestamp=datetime.datetime(2024, 11, 11, 16, 50, 10, 55623, tzinfo=datetime.timezone.utc), SourcePicoseconds=None, ServerPicoseconds=None)))
ZZzzzZZzzZ!
INFO:asyncua.server.subscription_service:publish request with acks [SubscriptionAcknowledgement(SubscriptionId=78, SequenceNumber=1)]
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
Trigger = False
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
ZZzzzZZzzZ!
ZZzzzZZzzZ!
INFO:asyncua.server.uaprocessor:Read request (User(role=<UserRole.User: 3>, name=None))
Trigger = True
PartID = PART_17:50:20
ZZzzzZZzzZ!

Ok got it working. Two things

  1. import_xml and add_variable are both adding variables to the server. The server does not know on what variable to subscribe and does not work. (SHOULD that be warning!?)
  2. as we are not creating the variables but importing them we also need to reference them process.get_child([f"{self.namespace_idx}:Trigger"])

Here the complete example:

# server.py
import asyncio
import logging
from asyncua import Server, Client, Node
from datetime import datetime


class OPCUAServer:
    def __init__(self, endpoint: str = "opc.tcp://0.0.0.0:4840/freeopcua/server/"):
        self.endpoint = endpoint
        self.server = Server()
        self.trigger_var = None
        self.part_id_var = None
        self.namespace_idx = None

    async def init(self):
        await self.server.init()
        self.server.set_endpoint(self.endpoint)

        # Set server name
        await self.server.set_application_uri("urn:example:opcua:server")

        # # Create custom namespace
        uri = "http://examples.freeopcua.github.io"
        self.namespace_idx = await self.server.register_namespace(uri)

        # # Load XML nodeset
        await self.server.import_xml("UA_NodeSet.xml")
        # await print_all_nodes(self.server)
        process = await self.server.nodes.objects.get_child(f"{self.namespace_idx}:Process")

        self.trigger_var = (await process.get_child([f"{self.namespace_idx}:Trigger"]))
        self.part_id_var = (await process.get_child([f"{self.namespace_idx}:PartID"]))

        # Set variables writable
        await self.trigger_var.set_writable()
        await self.part_id_var.set_writable()

        print(f"Server namespace index: {self.namespace_idx}")
        print(f"Trigger node id: {self.trigger_var.nodeid}")
        print(f"PartID node id: {self.part_id_var.nodeid}")

    async def start(self):
        async with self.server:
            while True:
                # Simulate trigger and part ID updates
                current_time = datetime.now().strftime("%H:%M:%S")

                await self.trigger_var.write_value(True)
                name = (await self.trigger_var.read_browse_name()).Name
                value = (await self.trigger_var.read_value())
                print(f"SERVER: {name} = {value}")

                await self.part_id_var.write_value(f"PART_{current_time}")
                name = (await self.part_id_var.read_browse_name()).Name
                value = (await self.part_id_var.read_value())
                print(f"SERVER: {name} = {value}")

                # Wait for 5 seconds before next update
                await asyncio.sleep(5)
                await self.trigger_var.write_value(False)
                name = (await self.trigger_var.read_browse_name()).Name
                value = (await self.trigger_var.read_value())
                print(f"SERVER: {name} = {value}")
                await asyncio.sleep(5)


# client.py
class SubscriptionHandler:

    async def datachange_notification(self, node: Node, val, data):
        try:
            print(f"New value for {node}: {val}")
        except Exception as e:
            print(f"Error in notification handler: {e}")


class OPCUAClient:
    def __init__(self, url: str = "opc.tcp://localhost:4840/freeopcua/server/"):
        self.url = url
        self.client = Client(url=self.url)

    async def subscribe_to_variables(self):
        async with self.client:
            try:
                # Find the namespace index
                uri = "http://examples.freeopcua.github.io"
                nsidx = await self.client.get_namespace_index(uri)
                print(f"Client namespace index: {nsidx}")

                # Get the Process node first
                objects = self.client.get_objects_node()
                process_node = await objects.get_child(f"{nsidx}:Process")

                # Get variables using their browse paths
                trigger_node = await process_node.get_child(f"{nsidx}:Trigger")
                part_id_node = await process_node.get_child(f"{nsidx}:PartID")

                print(f"Found trigger node: {trigger_node.nodeid}")
                print(f"Found part_id node: {part_id_node.nodeid}")

                # Create subscription
                handler = SubscriptionHandler()
                subscription = await self.client.create_subscription(100, handler=handler)
                await subscription.subscribe_data_change([trigger_node, part_id_node])

                # Keep the client running
                while True:
                    current_time = datetime.now().strftime("%H:%M:%S")
                    await part_id_node.write_value(f"PART_CLIENT_{current_time}")
                    name = (await part_id_node.read_browse_name()).Name
                    value = (await part_id_node.read_value())
                    print(f"CLIENT: {name} = {value}")
                    await asyncio.sleep(1)

            except Exception as e:
                print(f"Error in client: {e}")
                raise


# Example XML configuration (UA_NodeSet.xml)
XML_CONTENT = """<?xml version="1.0" encoding="utf-8"?>
<UANodeSet xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:uax="http://opcfoundation.org/UA/2008/02/Types.xsd" xmlns="http://opcfoundation.org/UA/2011/03/UANodeSet.xsd">
    <NamespaceUris>
        <Uri>http://examples.freeopcua.github.io</Uri>
    </NamespaceUris>
    <UAObject NodeId="ns=1;i=1" BrowseName="1:Process">
        <DisplayName>Process</DisplayName>
        <References>
            <Reference ReferenceType="HasComponent" IsForward="false">i=85</Reference>
        </References>
    </UAObject>
    <UAVariable NodeId="ns=1;i=2" BrowseName="1:Trigger" DataType="Boolean">
        <DisplayName>Trigger</DisplayName>
        <References>
            <Reference ReferenceType="HasComponent" IsForward="false">ns=1;i=1</Reference>
        </References>
    </UAVariable>
    <UAVariable NodeId="ns=1;i=3" BrowseName="1:PartID" DataType="String">
        <DisplayName>PartID</DisplayName>
        <References>
            <Reference ReferenceType="HasComponent" IsForward="false">ns=1;i=1</Reference>
        </References>
    </UAVariable>
</UANodeSet>
"""

# main.py
async def main():
    # Save XML configuration
    with open("UA_NodeSet.xml", "w") as f:
        f.write(XML_CONTENT)
     # Create and start server
    server = OPCUAServer()
    await server.init()

    # Create and start client
    client = OPCUAClient()

    # Run both server and client concurrently
    await asyncio.gather(server.start(), client.subscribe_to_variables())


if __name__ == "__main__":
    # logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

Then the output looks like expected:

Server namespace index: 2
Trigger node id: NodeId(Identifier=2, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)
PartID node id: NodeId(Identifier=3, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)
Endpoints other than open requested but private key and certificate are not set.
SERVER: Trigger = True
SERVER: PartID = PART_14:47:13
updating server limits to: TransportLimits(max_recv_buffer=65535, max_send_buffer=65535, max_chunk_count=1601, max_message_size=104857600)
updating client limits to: TransportLimits(max_recv_buffer=65535, max_send_buffer=65535, max_chunk_count=1601, max_message_size=104857600)
Client namespace index: 2
Found trigger node: NodeId(Identifier=2, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)
Found part_id node: NodeId(Identifier=3, NamespaceIndex=2, NodeIdType=<NodeIdType.Numeric: 2>)
CLIENT: PartID = PART_CLIENT_14:47:13
New value for ns=2;i=2: True
New value for ns=2;i=3: PART_14:47:13
New value for ns=2;i=3: PART_CLIENT_14:47:13
CLIENT: PartID = PART_CLIENT_14:47:14
New value for ns=2;i=3: PART_CLIENT_14:47:14
CLIENT: PartID = PART_CLIENT_14:47:15
New value for ns=2;i=3: PART_CLIENT_14:47:15
CLIENT: PartID = PART_CLIENT_14:47:16
New value for ns=2;i=3: PART_CLIENT_14:47:16
CLIENT: PartID = PART_CLIENT_14:47:17
New value for ns=2;i=3: PART_CLIENT_14:47:17
SERVER: Trigger = False
New value for ns=2;i=2: False