labstreaminglayer/pylsl

access violation reading 0xFFFFFFFFFFFFFFFF

jdevoldere opened this issue · 16 comments

Exception in thread Thread-4:
Traceback (most recent call last):
  File "C:\Users\Gerbuiker\AppData\Local\Programs\Python\Python37\lib\threading.py", line 926, in _bootstrap_inner
    self.run()
  File "C:\Users\Gerbuiker\AppData\Local\Programs\Python\Python37\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:/Users/Gerbuiker/PycharmProjects/LabEnvironment/receive_data.py", line 14, in read_stream
    ch_labels.append(ch.child_value("label"))
  File "C:\Users\Gerbuiker\Documents\Virtualenvs\HITLab\lib\site-packages\pylsl\pylsl.py", line 985, in child_value
    res = lib.lsl_child_value_n(self.e, str.encode(name))
OSError: exception: access violation reading 0xFFFFFFFFFFFFFFFF

Happens sporadically so I'm not sure how to reproduce it but I'm reading multiple streams, one stream per thread.

pylsl isn't great at threading. This is a known issue in Linux. I didn't think it was a problem in Windows but I guess I'm not surprised to see that it is.

An annoying workaround is to use a single thread to poll all the streams then to push data (e.g. via shared memory) to the other threads.

Probably a better design is to use unique processes (not threads) to do as much independent parallel work as possible until the processed data have to be joined, at which point you could use shared mem or another IPC to get them together.

While it's annoying this doesn't work, and I would welcome this being fixed, I have to admit that fixing pylsl in threads is fairly low priority. Due to the GIL there is barely any benefit to using threads in Python. If I have something I want to do in parallel without locking up the main function then I use multiprocessing.

Can you share a little bit more of your code, e.g. the read_stream module?

@cboulay the problem with using a single thread for multiple streams is that it introduces a lot of latency because it has to successfully pull in a sample from one stream before it can go onto the next, which completely locks up everything in case you have a marker stream you're pulling from. I guess I could try giving multiprocessing a shot instead.

@agricolab

def read_stream(stream_type):
    streams = resolve_stream('type', stream_type)
    inlet = StreamInlet(streams[0])

    ch_labels = []
    ch = inlet.info().desc().child("channels").child("channel")
    for k in range(inlet.info().channel_count()):
        ch_labels.append(ch.child_value("label"))
        ch = ch.next_sibling()

    csv_file = open(f'{stream_type}.csv', 'w', newline='')

    with csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(["Timestamp"] + ch_labels)

        while True:
            sample, timestamp = inlet.pull_sample()
            csv_sample = [timestamp] + sample

            writer.writerow(csv_sample)

            sample_dict = OrderedDict(zip(ch_labels, sample))
            print(stream_type, timestamp, sample_dict)

Hello,

If it helps, I was also having issues with multiple threads on python, but only when I was trying to access inlet's info. I think the issues comes from how XML is handled internally by liblsl. I managed to find a workaround by parsing it manually, e.g.:

import xml.etree.ElementTree as ET
root = ET.fromstring(inlet.info.as_xml())
var_value = root.find("desc").find("var").text

(tested with python 2, run on python 3 but I did not try to reproduce the original problem)

Beside that I never had any troubles with multi-threading and LSL, that I have been using heavily :)

edit: I don't remember the error I got but I think it was also a weird "access violation", always a shock wihen dealing with python :D

@cboulay the problem with using a single thread for multiple streams is that it introduces a lot of latency because it has to successfully pull in a sample from one stream before it can go onto the next

The preferred approach would be pull_chunk with a very small timeout so an irregular stream doesn't block anything.

If it helps, I was also having issues with multiple threads on python, but only when I was trying to access inlet's info. I think the issues comes from how XML is handled internally by liblsl. I managed to find a workaround by parsing it manually, e.g.:

import xml.etree.ElementTree as ET
root = ET.fromstring(inlet.info.as_xml())
var_value = root.find("desc").find("var").text

(tested with python 2, run on python 3 but I did not try to reproduce the original problem)

Interesting, is there any chance you could put together an MWE I could test against?

I never had any troubles with multi-threading and LSL, that I have been using heavily :)

Me neither. That's why i'm wondering. But in my code, i also convert the whole info to a dict before further accessing it, too.

I'l assume you implemented the multithreading similar as follows?

t = threading.Thread(target=read_stream, args=(stream_type))
t.start()

The error probably stems, as @jfrey-xx suggested from the info structure being accessed in a loop, considering it has a state due to ch = ch.next_sibling() and every thread might use the same dll. Maybe a lock around accessing the info until it is completely parsed might do the trick?

The preferred approach would be pull_chunk with a very small timeout so an irregular stream doesn't block anything.

@tstenner seems like a hacky approach which would still introduce unnecessary latency.

I'l assume you implemented the multithreading similar as follows?

@agricolab that is correct.

Something like

def read_stream(stream_type, lock):
    streams = resolve_stream('type', stream_type)
    inlet = StreamInlet(streams[0])

    ch_labels = []
    with lock:
        ch = inlet.info().desc().child("channels").child("channel"  )
        for k in range(inlet.info().channel_count()):
            ch_labels.append(ch.child_value("label"))
            ch = ch.next_sibling()

    csv_file = open(f'{stream_type}.csv', 'w', newline='')

    with csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(["Timestamp"] + ch_labels)

        while True:
            sample, timestamp = inlet.pull_sample()
            csv_sample = [timestamp] + sample

            writer.writerow(csv_sample)

            sample_dict = OrderedDict(zip(ch_labels, sample))
            print(stream_type, timestamp, sample_dict)

https://docs.python.org/3/library/threading.html#with-locks

On a side note: If you only pull single samples, but print and write to file every time, this might be slower than new samples coming in. You might want to switch to chunks or fill a buffer instead, unless your sampling rate is very slow.

@agricolab I guess that might do it.

On a side note: If you only pull single samples, but print and write to file every time, this might be slower than new samples coming in. You might want to switch to chunks or fill a buffer instead, unless your sampling rate is very slow.

I was aware of this and am going to rewrite it in the near future but thanks anyways.

Does inlet.pull_sample() in a thread definitely not block the GIL? Sorry I haven't tried myself.

Does inlet.pull_sample() in a thread definitely not block the GIL? Sorry I haven't tried myself.

Yes. Just tested it. Even if one of the outlets is killed, receiving the others will not be blocked.

@jdevoldere , were you able to solve your issue, so it can be closed?

@agricolab , would you be willing to update https://github.com/labstreaminglayer/liblsl-Python/blob/master/README.md#known-issues to specify the problem with accessing info in a thread?

After that then I think we can close this issue.

Sure, i just found the time to test again pulling from different threads with Python 3.7.6. on Pylsl 1.14 with

from threading import Thread


def publish():
    outlet = pylsl.StreamOutlet(
        pylsl.StreamInfo("Mock", "Mock", 1, 100, "float32", "mocking")
    )
    count = 0.0
    t1 = pylsl.local_clock()
    print("Started publishing at {}".format(t1))
    while True:
        count += 1
        outlet.push_sample([count])
        while (pylsl.local_clock() - t1) < 0.1:
            pass
        t1 = pylsl.local_clock()


def subscribe(name: str, thread_name: str):
    inlet = pylsl.StreamInlet(pylsl.resolve_byprop("name", name)[0])
    print("Started subscription at {}".format(pylsl.local_clock()))
    while True:
        sample, timestamp = inlet.pull_sample()
        print(thread_name, sample[0])


if __name__ == "__main__":
    print(pylsl.library_version())
    Thread(target=publish).start()
    Thread(target=subscribe, args=("Mock", "Thread_1")).start()
    Thread(target=subscribe, args=("Mock", "Thread_2")).start()

and it worked smoothly.

@cboulay I can test whether the error with accessing the info stems from unlocked calls from different threads, that'll take longer to evaluate.

import pylsl
from threading import Thread, Lock
from time import sleep


def publish():
    channel_count = 8
    info = pylsl.StreamInfo(
        "Mock", "Mock", channel_count, 100, "float32", "mocking"
    )
    channels = info.desc().append_child("channels")
    types = (f"MockEEG" for x in range(1, channel_count + 1, 1))
    units = ("au" for x in range(1, channel_count + 1, 1))
    names = (f"C{x:03d}" for x in range(1, channel_count + 1, 1))
    for c, u, t in zip(names, units, types):
        print(c, u, t)
        channels.append_child("channel").append_child_value(
            "label", c
        ).append_child_value("unit", u).append_child_value("type", t)
    outlet = pylsl.StreamOutlet(info)

    count = 0.0
    t1 = pylsl.local_clock()
    print("Started publishing at {}".format(t1))
    while True:
        count += 1
        outlet.push_sample([count] * channel_count)
        while (pylsl.local_clock() - t1) < 0.1:
            pass
        t1 = pylsl.local_clock()


def subscribe(name: str, thread_name: str, lock: Lock):
    inlet = pylsl.StreamInlet(pylsl.resolve_byprop("name", name)[0])
    if lock is not None:
        lock.acquire()
    ch_labels = []
    ch = inlet.info().desc().child("channels").child("channel")
    for k in range(inlet.info().channel_count()):
        ch_labels.append(ch.child_value("label"))
        ch = ch.next_sibling()
    if lock is not None:
        lock.release()

    print("Started subscription at {}".format(pylsl.local_clock()))
    print("My channel has ", ch_labels, "channels")
    while True:
        sample, timestamp = inlet.pull_sample()
        print(thread_name, sample[0])


if __name__ == "__main__":
    print(pylsl.library_version())
    Thread(target=publish).start()
    sleep(1)
    lock = Lock()    
    Thread(target=subscribe, args=("Mock", "Thread_1", lock)).start()
    Thread(target=subscribe, args=("Mock", "Thread_2", lock)).start()

accesses the XML in a locked fashion, and runs smoothly (on my machine)