Exception in subscription loop: struct.error: required argument is not an integer
magnus-bakke opened this issue · 4 comments
Please read, before you post!
This is a BUG REPORT for issues in the existing code.
If you have general questions, code handling problems or ideas, please use the:
Discussionsboard: https://github.com/FreeOpcUa/opcua-asyncio/discussions
or
Gitter-Channel: https://gitter.im/FreeOpcUa/opcua-asyncio
To save some time, please provide us following informations, if possible:
Describe the bug
We are running asyncua in a Docker container using an image based on python:3.10-slim-bullseye
.
Because the person who developed and monitored this solution has left the company, I can't say when it was last working. All I know is that:
- We have a bunch of IoT devices to which we can connect via TCP and retrieve data.
- We have a Python repository used as an interface for reading this data from the devices (which I'll call the "ethernet adapter").
- We have a Python repository (the "OPC UA repo") that uses asyncua to populate nodes/values. This repo has a loop that uses the ethernet adapter to read device data.
- To release the OPC UA repo, we build its Docker image and push it to an image repository in the cloud.
- Finally, a "master repo" pulls the image from the cloud, along with several other images, and runs it using Docker Compose.
After launching the "master" application, which coordinates images (including the OPC UA repo that uses asyncua), I look at its logs and see this error and traceback for the OPC UA container:
Exception in subscription loop
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/asyncua/server/internal_subscription.py", line 88, in _subscription_loop
await self.publish_results()
File "/usr/local/lib/python3.10/site-packages/asyncua/server/internal_subscription.py", line 135, in publish_results
await self.pub_result_callback(result, requestdata)
File "/usr/local/lib/python3.10/site-packages/asyncua/server/uaprocessor.py", line 94, in forward_publish_response
self.send_response(requestdata.requesthdr.RequestHandle, requestdata.seqhdr, response)
File "/usr/local/lib/python3.10/site-packages/asyncua/server/uaprocessor.py", line 49, in send_response
struct_to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 336, in struct_to_binary
return serializer(obj)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 325, in serialize
return b''.join(
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 327, in <genexpr>
else serializer(obj.__dict__[name])
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 325, in serialize
return b''.join(
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 327, in <genexpr>
else serializer(obj.__dict__[name])
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 325, in serialize
return b''.join(
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 327, in <genexpr>
else serializer(obj.__dict__[name])
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 401, in serialize
return data_size + b''.join(type_serializer(el) for el in val)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 401, in <genexpr>
return data_size + b''.join(type_serializer(el) for el in val)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 569, in extensionobject_to_binary
body = struct_to_binary(obj)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 336, in struct_to_binary
return serializer(obj)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 325, in serialize
return b''.join(
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 327, in <genexpr>
else serializer(obj.__dict__[name])
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 401, in serialize
return data_size + b''.join(type_serializer(el) for el in val)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 401, in <genexpr>
return data_size + b''.join(type_serializer(el) for el in val)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 325, in serialize
return b''.join(
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 327, in <genexpr>
else serializer(obj.__dict__[name])
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 336, in struct_to_binary
return serializer(obj)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 325, in serialize
return b''.join(
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 327, in <genexpr>
else serializer(obj.__dict__[name])
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 276, in <lambda>
return lambda val: b'' if val is None else serializer(val)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 485, in variant_to_binary
body = pack_uatype(var.VariantType, var.Value)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 191, in pack_uatype
return create_uatype_serializer(vtype)(value)
File "/usr/local/lib/python3.10/site-packages/asyncua/ua/ua_binary.py", line 128, in pack
return struct.pack(self.format, data)
struct.error: required argument is not an integer
This error tells me very little. (Some more error information about what was happening at the time of the error would be helpful.)
I can still connect to the server and list the nodes, but all values are absent with Datatype = Null
, Statuscode = BadWaitingForInitialData
, and Server Timestamp = 8:00:00.000 PM
I know for a fact that it was working a few weeks ago. Since then, a few things have changed, but I have carefully looked at the changes and cannot find one that I would expect to cause any problems. Specifically, the ethernet adapter has been updated, and two new nodes have been added.
But that's all irrelevant because I just tried to deploy an older version without these recent changes, and it still fails. At that point, the server hadn't been changed in several months.
I understand this may not be sufficient information to reproduce, but I'm hoping someone with a better understanding of asyncua than me has some inkling. I'll happily provide more information, I just don't know what to provide.
To Reproduce
Steps to reproduce the behavior incl code:
This will be tricky since it worked before these changes.
Here's some code:
class GHOPCServer:
async def __aenter__(self):
await self.init()
await self.server.start()
return self
# Plus lots of irrelevant logic
async def opcua_server_app(args):
async with GHOPCServer(args) as gh_opc_server:
config = GHConfig(gh_opc_server)
await config.install()
gh_poller = GHPoller(config)
await config.reload_from_local_storage()
logger.info("OPCUA Core Setup Complete, registering server wide callback")
await gh_opc_server.register_server_wide_callback(
"Reconfigure", config.reconfig, [ua.VariantType.ByteString], [ua.VariantType.String]
)
logger.info("OPCUA Setup Complete, Beginning Polling Loop")
next_master_update = datetime.datetime(1970, 1, 1)
next_node_update = datetime.datetime(1970, 1, 1)
next_cert_reconcile_update = datetime.datetime.now() + datetime.timedelta(minutes=10)
endpoints = await gh_opc_server.server.get_endpoints()
logger.info("Server endpoints: %s", endpoints)
while True:
if datetime.datetime.now() > next_master_update:
await gh_poller.poll_master_realtime_and_update()
next_master_update = datetime.datetime.now() + datetime.timedelta(minutes=2)
if datetime.datetime.now() > next_node_update:
# poll at 28 and 58
poll_time = datetime.datetime.now()
await gh_poller.poll_send_node_data_and_update()
if 28 <= poll_time.minute <= 57:
next_node_update = poll_time.replace(minute=58, second=0, microsecond=0)
else: # poll_time must be around the 58-59 point or so... add 20 minutes and reset
next_node_update = (poll_time + datetime.timedelta(minutes=20)).replace(
minute=28, second=0, microsecond=0
)
if datetime.datetime.now() > next_cert_reconcile_update:
await gh_opc_server.add_client_certs_in_path()
next_cert_reconcile_update = datetime.datetime.now() + datetime.timedelta(
minutes=10
)
await asyncio.sleep(10)
class GHPoller:
"""Class for centralizing logic for polling the GH masters & nodes and storing the received data against the OPCUA server."""
def __init__(self, config: GHConfig):
self.config = config
async def _update_node_nodes_from_polled_data(self, data_dict):
"""Update node controllers"""
for master_ip, result_dict in data_dict.items():
node_id_to_node_dict = self.config.master_nodes_by_ip[master_ip]["nodes"]
for node_id, polling_result in result_dict["results"].items():
node = node_id_to_node_dict[int(node_id)]
await _update_base_props(node, polling_result)
if isinstance(polling_result, MasterSocketError):
continue # skip processing of individual props due to polling failure
for node_prop_key, content in NODE_DATA_PROPS_FROM_SPEC.items():
variant_type, conversion_func = content
val = conversion_func(polling_result)
await _update_object_param(node, node_prop_key, val, variant_type)
async def poll_master_realtime_and_update(self):
"""Poll the masters and update the OPCUA objects with the results."""
ips_to_poll = list(self.config.master_nodes_by_ip.keys())
if ips_to_poll:
master_data = await poll_masters(ips_to_poll)
await self._update_master_nodes_from_polled_data(master_data)
else:
logger.warning("No masters to poll, skipping poll_master_realtime_and_update")
async def poll_send_node_data_and_update(self):
"""Poll the masters for node data"""
node_meta = {
master_ip: list(master_node_dict["nodes"].keys())
for master_ip, master_node_dict in self.config.master_nodes_by_ip.items()
}
master_data = await poll_nodes(node_meta)
await self._update_node_nodes_from_polled_data(master_data)
async def poll_nodes(node_meta):
"""Polls masters for the cached node_data on the master. Returns a dictionary of the results."""
master_data = {}
threads = []
for master_ip, node_ids in node_meta.items():
thread = Thread(target=_poll_master_for_node_data, args=[master_ip, node_ids, master_data])
thread.daemon = True
thread.start()
threads.append(thread)
# Cannot call thread.join() as it is a blocking call and that would prevent the async server
# from handling requests. Instead, we'll run a busy loop here while the threads are alive.
while True:
for thread in threads:
if thread.is_alive():
break
else: # all threads are dead, let's abort
break
await asyncio.sleep(0.1)
return master_data
Expected behavior
A clear and concise description of what you expected to happen.
When viewed using UaExpert, these columns should be populated as there are no polling errors:
(and many more)
Screenshots
If applicable, add screenshots to help explain your problem.
You can post Screenshots directly to github, please don't use 3rd Party webpages
See also the expected behavior.
(Notice that it continues to fail to write a bunch of None values. These None errors are expected.)
Version
Python-Version:
3.10
opcua-asyncio Version (e.g. master branch, 0.9): 1.1.0
your code is somewhere trying to write a struct to the ua server that cannot be parsed. Looks like something that is declared as an int or where an int is expected is not an int...
also the write has been done server side somewhere (we do not do type checking at that point, it is too slow), but the parsing only happens when a client subscribe to it and the server try to send the data
your code is somewhere trying to write a struct to the ua server that cannot be parsed. Looks like something that is declared as an int or where an int is expected is not an int...
You're right (thank you so much), a float was declared as an int. Would it not be a simple thing to print the node ID and variant type of the offender when it's parsed?
Usually it does. Looks like you hit a place where we do not.... MR welcome