ecg_peaks (method='neurokit') sometimes fails on ECG's with missing data
Opened this issue · 12 comments
Describe the bug
On certain ECG's with missing data or longer periods of constant values (e.g. zeros), ecg_peaks
with method="neurokit"
fails to find all but two peaks. Below I attached a screenshot with show=True
:
The cause of this issue lies in the way Neurokit calculates the min_len
. This variable defines the minimum qrs length (len_qrs
) required to accept a valid R-peak.
NeuroKit/neurokit2/ecg/ecg_findpeaks.py
Lines 249 to 252 in 3d5ecfc
Changing np.mean()
to np.median
would provide a more robust estimate for the most common qrs length and would prevent min_len
from becoming too high. The following screenshot shows the detected R-peaks with np.median()
.
Because the median calculation is a bit slower, I saw a 16% increase in runtime on a 24h ECG.
System Specifications
-
OS: Linux ( 64bit)
-
Python: 3.8.10
-
NeuroKit2: 0.2.0
-
NumPy: 1.23.4
-
Pandas: 1.5.3
-
SciPy: 1.10.1
-
sklearn: 1.2.2
-
matplotlib: 3.6.2
Interesting.
@JanCBrammer what do you think about changing allowing for median instead of mean?
I reckon we should probably do some benchmarking and some tests to see whether the median option is as good as mean in most cases
@DominiqueMakowski, exactly my thoughts.
It's an interesting and useful observation. The median looks promising and on paper it's preferable over the mean in this context.
But it'd be good to confirm that (i.e., the medians "superiority") empirically.
Do we have benchmarking set up?
Otherwise the benchmarking over at biopeaks would do, since it's the same peak detection.
Do we have benchmarking set up?
Not really (aside from this old thread).
It seems like we've all at some point done bits of work on our side, it would be really cool to assemble around a "task force" project (also with @cbrnr as they did some cool work) for a cross-software benchmarking initiative...
If you worry about median speed, the bottleneck package will helps a lot
Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
Hi!
In case you want to keep the 'neurokit'
detection methods as it is, I would like to propose an alternative solution. We could add functions for flatline detection to Neurokit, so that the user can run peak detection specifically on the parts without missing data.
Context
I identified a similar problem while working with long-term ECGs (several hours of recording) with missing data: QRS detection fails or does not even terminate.
I try to reproduce this here with simulated data:
import neurokit2 as nk
import numpy as np
# This is roughly how the long-term ECGs look like.
signal = np.concatenate([
nk.ecg_simulate(duration=10*60, sampling_rate=128, random_state=2, random_state_distort=2),
[-4.0] * (360 * 60 * 128),
nk.ecg_simulate(duration=90*60, sampling_rate=128, random_state=2, random_state_distort=2),
[-4.0] * (30 * 60 * 128)
])
nk.signal_plot(signal)
Problem
QRS detection fails on raw and cleaned data:
_, info = nk.ecg_peaks(signal, sampling_rate=128, correct_artifacts=True)
info
{'ECG_R_Peaks': array([], dtype=int64), 'sampling_rate': 128}
cleaned = nk.ecg_clean(signal, sampling_rate=128)
_, info = nk.ecg_peaks(cleaned, sampling_rate=128, correct_artifacts=True)
info
{'ECG_R_Peaks': array([2837172]), 'sampling_rate': 128}
Proposed Solution
I addressed the problem by doing flatline detection (similar to signal_flatline.py) and then doing QRS detection only on parts without flatline:
import numpy as np
def moving_average(signal, window_size):
"""Moving window average on a signal.
Parameters
----------
signal : Union[list, np.array]
The signal as a vector of values.
window_size : int
How many consequtive samples are used for averaging.
Returns
-------
np.array
Returns a signal of averages from the original signal. Note: The returned signal is shorter
than the original signal by window_size - 1.
"""
return np.convolve(signal, np.ones(window_size), 'valid') / window_size
def no_flatline(signal, sampling_rate, threshold=0.01, tolerance=60):
"""Finds areas in a signal that are not flatline.
Parameters
----------
signal : Union[list, np.array]
The signal as a vector of values.
sampling_rate : int
The sampling rate of the signal, i.e. how many samples (values) per second.
threshold : float, optional
Flatline threshold relative to the biggest change in the signal.
This is the percentage of the maximum value of absolute consecutive
differences. Default: 0.01.
tolerance : int, optional
Determines how fine-grained the resulting signal is, i.e. how long (in seconds) can a
flatline part be without being recognised as such. Default: 60.
Returns
-------
np.array
Returns a signal that is True/1 where there are usable data in the signal and False/0 where
there is a sufficiently long flatline interval. Note: Returned signal is shorter than the
original signal by (sampling_rate * tolerance) - 1.
"""
abs_diff = np.abs(np.diff(signal))
threshold = threshold * np.max(abs_diff)
return moving_average(abs_diff >= threshold, sampling_rate * tolerance) >= threshold
def no_flatline_intervals(signal, sampling_rate, threshold=0.01, tolerance=60):
"""Finds areas in a signal that are not flatline.
Parameters
----------
signal : Union[list, np.array]
The signal as a vector of values.
sampling_rate : int
The sampling rate of the signal, i.e. how many samples (values) per second.
threshold : float, optional
Flatline threshold relative to the biggest change in the signal.
This is the percentage of the maximum value of absolute consecutive
differences. Default: 0.01.
tolerance : int, optional
Determines how fine-grained the resulting signal is, i.e. how long (in seconds) can a
flatline part be without being recognised as such. Default: 60.
Returns
-------
tuple
Returns a tuple of np.arrays:
signal_starts: Indices where a usable part of a signal starts.
signal_ends: Indices where a usable part of a signal ends.
"""
# Identify flanks: +1 for beginning plateau; -1 for ending plateau.
flanks = np.diff(no_flatline(signal, sampling_rate, threshold, tolerance).astype(int))
signal_starts = np.flatnonzero(flanks > 0)
signal_ends = np.flatnonzero(flanks < 0)
# Correct offsets from moving average
signal_starts = signal_starts + sampling_rate * tolerance
# Insert start marker at signal start if a start marker is missing
if len(signal_starts) < len(signal_ends):
signal_starts = np.insert(signal_starts, 0, 0)
# Insert end marker at signal end if an end marker is missing
if len(signal_ends) < len(signal_starts):
signal_ends = np.append(signal_ends, [len(signal) - 1])
# Return instances where start < end (start >= end might occur due to offset correction).
return [(start, end) for start, end in zip(signal_starts, signal_ends) if start < end]
Results
Flatline detection results look like this:
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
fig.set_size_inches(20,10)
ax.plot(cleaned)
ax.plot(no_flatline(cleaned, sampling_rate=128))
Then, the usable intervals are correctly identified:
intervals = no_flatline_intervals(cleaned, sampling_rate=128)
intervals
[(0, 76375), (2842465, 3532059)]
And detection can be done like this (we might need to merge the arrays and DataFrames later on):
import pandas as pd
for start, end in intervals:
signals, info = nk.ecg_peaks(cleaned[start:end], sampling_rate=128)
signals.index = signals.index + start
info['ECG_R_Peaks'] = info['ECG_R_Peaks'] + start
print(signals)
print(info)
Next steps?
What do you think about this approach? If you like it, I could try to turn this into a PR 🤷
Otherwise I just documented my solution to the issue here, in case others stumble into similar problems on similarly strange data 🙈
Interesting approach. But then the question would be how to determine the optimal thresholds for flat-line detection. How do you define "flat" in the presence of noise or random artifacts (large voltage spikes). How does this approach compare to the simple median?
Very cool indeed. I also had some issues recently with bad chunks of data, it would be good to have a robust method to either ignore them and/or adapt the parameters in them...
However, I'm not sure what the best API for that should look like, because it's not only peak detection that should be adapted to handle bad intervals. @danibene already started to make some functions resistant to missing values (#838).
Maybe:
*_peaks()
would gain abad_intervals
argument (e.g.,bad_intervals=[(300, 450), (680-700)]
) with periods (that can be also entered manually) it would exclude/adapt. Importantly It would return these in the info dict.- Upstream: in general
*_process()
functions, we could incorporate a step of bad_intervals detection that would then get piped into bad_intervals - Downstream: other functions relying on peaks
*_rate()
etc. should have this argument as well and deal with them appropriately, for instance by setting the rate to NaNs on these bad intervals
My thoughts about this topic:
- Long term records are a-priori not stationary, so we should split it for several shorter chunks and process separately, then combine
- From the above we can estimate thresholds from each chunk, then analyze regularity of found peaks, select the correct thresholds and re-process the chunks with bad results
- As a help, we can also use the expected ECG values ~1 mV and HR ~30..300 BPM as a quick check of threshold correctness
Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.