neuropsychology/NeuroKit

ecg_peaks (method='neurokit') sometimes fails on ECG's with missing data

Opened this issue · 12 comments

Describe the bug
On certain ECG's with missing data or longer periods of constant values (e.g. zeros), ecg_peaks with method="neurokit" fails to find all but two peaks. Below I attached a screenshot with show=True:
nk_fail

The cause of this issue lies in the way Neurokit calculates the min_len. This variable defines the minimum qrs length (len_qrs) required to accept a valid R-peak.

# Identify R-peaks within QRS (ignore QRS that are too short).
num_qrs = min(beg_qrs.size, end_qrs.size)
min_len = np.mean(end_qrs[:num_qrs] - beg_qrs[:num_qrs]) * minlenweight
peaks = [0]

Changing np.mean() to np.median would provide a more robust estimate for the most common qrs length and would prevent min_len from becoming too high. The following screenshot shows the detected R-peaks with np.median().
Because the median calculation is a bit slower, I saw a 16% increase in runtime on a 24h ECG.

nk_pass_median

System Specifications

  • OS: Linux ( 64bit)

  • Python: 3.8.10

  • NeuroKit2: 0.2.0

  • NumPy: 1.23.4

  • Pandas: 1.5.3

  • SciPy: 1.10.1

  • sklearn: 1.2.2

  • matplotlib: 3.6.2

Hi 👋 Thanks for reaching out and opening your first issue here! We'll try to come back to you as soon as possible. ❤️ kenobi

Interesting.

@JanCBrammer what do you think about changing allowing for median instead of mean?

I reckon we should probably do some benchmarking and some tests to see whether the median option is as good as mean in most cases

@DominiqueMakowski, exactly my thoughts.
It's an interesting and useful observation. The median looks promising and on paper it's preferable over the mean in this context.
But it'd be good to confirm that (i.e., the medians "superiority") empirically.

Do we have benchmarking set up?
Otherwise the benchmarking over at biopeaks would do, since it's the same peak detection.

Do we have benchmarking set up?

Not really (aside from this old thread).

It seems like we've all at some point done bits of work on our side, it would be really cool to assemble around a "task force" project (also with @cbrnr as they did some cool work) for a cross-software benchmarking initiative...

If you worry about median speed, the bottleneck package will helps a lot

stale commented

Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward?

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

Hi!

In case you want to keep the 'neurokit' detection methods as it is, I would like to propose an alternative solution. We could add functions for flatline detection to Neurokit, so that the user can run peak detection specifically on the parts without missing data.

Context

I identified a similar problem while working with long-term ECGs (several hours of recording) with missing data: QRS detection fails or does not even terminate.

I try to reproduce this here with simulated data:

import neurokit2 as nk
import numpy as np

# This is roughly how the long-term ECGs look like.

signal = np.concatenate([
    nk.ecg_simulate(duration=10*60, sampling_rate=128, random_state=2, random_state_distort=2),
    [-4.0] * (360 * 60 * 128),
    nk.ecg_simulate(duration=90*60, sampling_rate=128, random_state=2, random_state_distort=2),
    [-4.0] * (30 * 60 * 128)
])

nk.signal_plot(signal)

eb02344b-d95a-499e-8da7-905fe5669548

Problem

QRS detection fails on raw and cleaned data:

_, info = nk.ecg_peaks(signal, sampling_rate=128, correct_artifacts=True)
info
{'ECG_R_Peaks': array([], dtype=int64), 'sampling_rate': 128}
cleaned = nk.ecg_clean(signal, sampling_rate=128)
_, info = nk.ecg_peaks(cleaned, sampling_rate=128, correct_artifacts=True)
info
{'ECG_R_Peaks': array([2837172]), 'sampling_rate': 128}

Proposed Solution

I addressed the problem by doing flatline detection (similar to signal_flatline.py) and then doing QRS detection only on parts without flatline:

import numpy as np

def moving_average(signal, window_size):
    """Moving window average on a signal.

    Parameters
    ----------
    signal : Union[list, np.array]
        The signal as a vector of values.
    window_size : int
        How many consequtive samples are used for averaging.

    Returns
    -------
    np.array
        Returns a signal of averages from the original signal. Note: The returned signal is shorter
        than the original signal by window_size - 1.
        
    """

    return np.convolve(signal, np.ones(window_size), 'valid') / window_size

def no_flatline(signal, sampling_rate, threshold=0.01, tolerance=60):
    """Finds areas in a signal that are not flatline.

    Parameters
    ----------
    signal : Union[list, np.array]
        The signal as a vector of values.
    sampling_rate : int
        The sampling rate of the signal, i.e. how many samples (values) per second.
    threshold : float, optional
        Flatline threshold relative to the biggest change in the signal.
        This is the percentage of the maximum value of absolute consecutive
        differences. Default: 0.01.
    tolerance : int, optional
        Determines how fine-grained the resulting signal is, i.e. how long (in seconds) can a
        flatline part be without being recognised as such. Default: 60.

    Returns
    -------
    np.array
        Returns a signal that is True/1 where there are usable data in the signal and False/0 where
        there is a sufficiently long flatline interval. Note: Returned signal is shorter than the
        original signal by (sampling_rate * tolerance) - 1.

    """
    abs_diff = np.abs(np.diff(signal))
    threshold = threshold * np.max(abs_diff)

    return moving_average(abs_diff >= threshold, sampling_rate * tolerance) >= threshold

def no_flatline_intervals(signal, sampling_rate, threshold=0.01, tolerance=60):
    """Finds areas in a signal that are not flatline.

    Parameters
    ----------
    signal : Union[list, np.array]
        The signal as a vector of values.
    sampling_rate : int
        The sampling rate of the signal, i.e. how many samples (values) per second.
    threshold : float, optional
        Flatline threshold relative to the biggest change in the signal.
        This is the percentage of the maximum value of absolute consecutive
        differences. Default: 0.01.
    tolerance : int, optional
        Determines how fine-grained the resulting signal is, i.e. how long (in seconds) can a
        flatline part be without being recognised as such. Default: 60.

    Returns
    -------
    tuple
        Returns a tuple of np.arrays:
        signal_starts: Indices where a usable part of a signal starts.
        signal_ends: Indices where a usable part of a signal ends.

    """

    # Identify flanks: +1 for beginning plateau; -1 for ending plateau.
    flanks = np.diff(no_flatline(signal, sampling_rate, threshold, tolerance).astype(int))
    
    signal_starts = np.flatnonzero(flanks > 0)
    signal_ends = np.flatnonzero(flanks < 0)

    # Correct offsets from moving average
    signal_starts = signal_starts + sampling_rate * tolerance

    # Insert start marker at signal start if a start marker is missing
    if len(signal_starts) < len(signal_ends):
        signal_starts = np.insert(signal_starts, 0, 0)

    # Insert end marker at signal end if an end marker is missing
    if len(signal_ends) < len(signal_starts):
        signal_ends = np.append(signal_ends, [len(signal) - 1])

    # Return instances where start < end (start >= end might occur due to offset correction).
    return [(start, end) for start, end in zip(signal_starts, signal_ends) if start < end]

Results

Flatline detection results look like this:

import matplotlib.pyplot as plt

fig, ax = plt.subplots()
fig.set_size_inches(20,10)

ax.plot(cleaned)
ax.plot(no_flatline(cleaned, sampling_rate=128))

3be172ba-9422-4c7d-8035-f70a36366913

Then, the usable intervals are correctly identified:

intervals = no_flatline_intervals(cleaned, sampling_rate=128)
intervals
[(0, 76375), (2842465, 3532059)]

And detection can be done like this (we might need to merge the arrays and DataFrames later on):

import pandas as pd

for start, end in intervals:
    
    signals, info = nk.ecg_peaks(cleaned[start:end], sampling_rate=128)

    signals.index = signals.index + start
    info['ECG_R_Peaks'] = info['ECG_R_Peaks'] + start

    print(signals)
    print(info)

Next steps?

What do you think about this approach? If you like it, I could try to turn this into a PR 🤷
Otherwise I just documented my solution to the issue here, in case others stumble into similar problems on similarly strange data 🙈

Interesting approach. But then the question would be how to determine the optimal thresholds for flat-line detection. How do you define "flat" in the presence of noise or random artifacts (large voltage spikes). How does this approach compare to the simple median?

Very cool indeed. I also had some issues recently with bad chunks of data, it would be good to have a robust method to either ignore them and/or adapt the parameters in them...

However, I'm not sure what the best API for that should look like, because it's not only peak detection that should be adapted to handle bad intervals. @danibene already started to make some functions resistant to missing values (#838).

Maybe:

  • *_peaks() would gain a bad_intervals argument (e.g., bad_intervals=[(300, 450), (680-700)]) with periods (that can be also entered manually) it would exclude/adapt. Importantly It would return these in the info dict.
  • Upstream: in general *_process() functions, we could incorporate a step of bad_intervals detection that would then get piped into bad_intervals
  • Downstream: other functions relying on peaks *_rate() etc. should have this argument as well and deal with them appropriately, for instance by setting the rate to NaNs on these bad intervals

My thoughts about this topic:

  1. Long term records are a-priori not stationary, so we should split it for several shorter chunks and process separately, then combine
  2. From the above we can estimate thresholds from each chunk, then analyze regularity of found peaks, select the correct thresholds and re-process the chunks with bad results
  3. As a help, we can also use the expected ECG values ~1 mV and HR ~30..300 BPM as a quick check of threshold correctness

Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward?

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.