OOM when using a QoS with `max_instances` larger than a small value (or the default: -1)
clunietp opened this issue · 1 comments
The following reproducer results in an OOM on x64-linux, Debian 11.8. This is running inside a Docker container. Host machine has 32GB RAM.
I'm using CycloneDDS commit: 2cfdc1b7cfa1c3d9b37b09109113b1f5d3b01802: 05 Feb 24
and CycloneDDS Python commit: 637cfe583a5078af41c481c98daf7474fdb2a786: 06 Sep 23
and installing the Python library via CYCLONEDDS_HOME=${CYCLONEDDS_HOME} python3 -m pip install /path/to/cyclonedds-python
from dataclasses import dataclass
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from cyclonedds.topic import Topic
from cyclonedds.domain import DomainParticipant
from cyclonedds.idl.annotations import key as _key
from cyclonedds.idl import IdlStruct
from cyclonedds.qos import Policy, Qos
@dataclass
class KeyedString(IdlStruct, typename="DDS.KeyedString"):
"Represents a key-value string pair"
key: str
value: str
_key("key")
def test_qos(qos_to_test: Qos):
N_ITERATIONS = 50
print(f"Testing QoS: {str(qos_to_test)}")
# create DDS entities
participant = DomainParticipant()
topic = Topic(domain_participant=participant,
topic_name="foo", data_type=KeyedString)
writer = DataWriter(publisher_or_participant=participant,
topic=topic, qos=qos_to_test)
_ = DataReader(subscriber_or_participant=participant,
topic=topic, qos=qos_to_test)
# test N samples
sample = KeyedString(key="Hello", value="World")
for i in range(N_ITERATIONS):
print(f"Sample {i}")
sample.value = str(i)
writer.write(sample=sample)
writer.dispose(sample=sample)
# test N instances
for i in range(N_ITERATIONS):
print(f"Instance {i}")
instance = KeyedString(key=str(i), value=str(i))
writer.write(sample=instance)
writer.dispose(sample=instance)
if __name__ == '__main__':
# test using a QoS with lower instance limits
test_qos(qos_to_test=Qos(Policy.ResourceLimits(max_instances=10)))
# test using a QoS with higher instance limits
test_qos(qos_to_test=Qos(Policy.ResourceLimits(max_instances=50)))
Output:
Testing QoS: Qos(Policy.ResourceLimits(max_samples=-1, max_instances=10, max_samples_per_instance=-1))
Sample 0
Sample 1
Sample 2
...
Sample 48
Sample 49
Instance 0
Instance 1
Instance 2
...
Instance 48
Instance 49
Testing QoS: Qos(Policy.ResourceLimits(max_samples=-1, max_instances=50, max_samples_per_instance=-1))
Sample 0
Sample 1
Sample 2
...
Sample 48
Sample 49
Instance 0
Instance 1
Instance 2
...
Instance 30
Instance 31
Killed
Running apt-get install time; /usr/bin/time -v python3 path/to/my/test.py
results in Command terminated by signal 9...Maximum resident set size (kbytes): 14533312
It doesn't make a difference if I call dispose
or not on the publisher side
It doesn't make a difference if I add a sleep(1.0) between write iterations
It does not seem related to the message size, as I first encountered this error at a similar iteration count using much larger messages than KeyedString
take
ing from the reader within each iteration has no effect
Adding QoS properties of ReaderDataLifecycle(autopurge...=duration(nanoseconds=1)), Policy.History.KeepLast(1), Policy.WriterDataLifecycle(autodispose=True)
has no effect
Registering/unregistering each instance has no effect
This appears to be a bug in the implementation. Is there a workaround other then setting max_instances
to some arbitrary limit?
A more refined reproducer:
from dataclasses import dataclass
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from cyclonedds.topic import Topic
from cyclonedds.domain import DomainParticipant
from cyclonedds.idl.annotations import key as _key
from cyclonedds.idl import IdlStruct
from cyclonedds.qos import Policy, Qos
from cyclonedds.util import duration
from time import sleep
@dataclass
class KeyedString(IdlStruct, typename="DDS.KeyedString"):
"Represents a key-value string pair"
key: str
value: str
_key("key")
def test_qos(reader_qos: Qos, writer_qos):
N_ITERATIONS = 50
# create DDS entities
participant = DomainParticipant()
topic = Topic(domain_participant=participant,
topic_name="foo", data_type=KeyedString)
writer = DataWriter(publisher_or_participant=participant,
topic=topic, qos=writer_qos)
reader = DataReader(subscriber_or_participant=participant,
topic=topic, qos=reader_qos)
# test N instances
for i in range(N_ITERATIONS):
print(f"Instance {i}")
instance = KeyedString(key=str(i), value=str(i))
writer.write(sample=instance)
print(f"\tWrote instance {i}")
sleep(0.1)
reader.take_one(timeout=duration(seconds=5.0))
print(f"\tRead instance {i}")
writer.dispose(sample=instance)
if __name__ == '__main__':
reader_qos = Qos(
Policy.ResourceLimits(max_instances=5),
Policy.History.KeepLast(1),
Policy.Reliability.Reliable(duration(seconds=1)),
Policy.Durability.TransientLocal
)
writer_qos = Qos(
Policy.ResourceLimits(max_instances=-1),
Policy.Reliability.Reliable(duration(seconds=1)),
Policy.Durability.TransientLocal
)
# test QoS
test_qos(
reader_qos=reader_qos,
writer_qos=writer_qos
)
Results in the output:
Instance 0
Wrote instance 0
Read instance 0
Instance 1
Wrote instance 1
Read instance 1
Instance 2
Wrote instance 2
Read instance 2
Instance 3
Wrote instance 3
Read instance 3
Instance 4
Wrote instance 4
Read instance 4
Instance 5
1708025928.342746 [0] python3: The writer could not deliver data on time, probably due to a local reader resources being full
Traceback (most recent call last):
File "/path/to/file.py", line 63, in <module>
test_qos(
File "/path/to/file.py", line 39, in test_qos
writer.write(sample=instance)
File "/usr/local/lib/python3.9/dist-packages/cyclonedds/pub.py", line 192, in write
raise DDSException(ret, f"Occurred while writing sample in {repr(self)}")
cyclonedds.core.DDSException: [DDS_RETCODE_TIMEOUT] A timeout has occurred. Occurred while writing sample in <Entity, type=cyclonedds.pub.DataWriter, addr=0x7fc78e2778b0, id=1880116342>
If I increase the Datareader
's max_instances
to a higher value, I reliably get an OOM at "Instance 32".
I see two issues here:
- Why is my datareader considered 'full'? I have taken the samples and my writer has disposed of the sample (not shown, but also have tried
writer.register_instance
along withwriter.dispose_instance_handle
. I see no method of manually clearing the datareader. - The original OOM. At present, I can have less than N instances on any given datareader before the OOM occurs, where N ~= 31 on my machine with 32GB of RAM.
My requirement is that I must send and receive an arbitrary number of instances for a given message type, and that number is >> 31.