Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions tests/test_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,90 @@ def test_infer_sig_len(self):

assert record_2.__eq__(record)

def test_physical_conversion(self):
n_sig = 3
adc_gain = [1.0, 1234.567, 765.4321]
baseline = [10, 20, -30]
d_signal = np.repeat(np.arange(-100, 100), 3).reshape(-1, 3)
e_d_signal = list(d_signal.transpose())
fmt = ["16", "16", "16"]

# Test adding or subtracting a small offset (0.01 ADU) to check
# that we correctly round to the nearest integer
for offset in (0, -0.01, 0.01):
p_signal = (d_signal + offset - baseline) / adc_gain
e_p_signal = list(p_signal.transpose())

# Test converting p_signal to d_signal

record = wfdb.Record(
n_sig=n_sig,
p_signal=p_signal.copy(),
adc_gain=adc_gain,
baseline=baseline,
fmt=fmt,
)

d_signal_converted = record.adc(expanded=False, inplace=False)
np.testing.assert_array_equal(d_signal_converted, d_signal)

record.adc(expanded=False, inplace=True)
np.testing.assert_array_equal(record.d_signal, d_signal)

# Test converting e_p_signal to e_d_signal

record = wfdb.Record(
n_sig=n_sig,
e_p_signal=[s.copy() for s in e_p_signal],
adc_gain=adc_gain,
baseline=baseline,
fmt=fmt,
)

e_d_signal_converted = record.adc(expanded=True, inplace=False)
self.assertEqual(len(e_d_signal_converted), n_sig)
for x, y in zip(e_d_signal_converted, e_d_signal):
np.testing.assert_array_equal(x, y)

record.adc(expanded=True, inplace=True)
self.assertEqual(len(record.e_d_signal), n_sig)
for x, y in zip(record.e_d_signal, e_d_signal):
np.testing.assert_array_equal(x, y)

# Test automatic conversion using wfdb.wrsamp()

wfdb.wrsamp(
"test_physical_conversion",
fs=1000,
sig_name=["X", "Y", "Z"],
units=["mV", "mV", "mV"],
p_signal=p_signal,
adc_gain=adc_gain,
baseline=baseline,
fmt=["16", "16", "16"],
)
record = wfdb.rdrecord("test_physical_conversion", physical=False)
np.testing.assert_array_equal(record.d_signal, d_signal)

record = wfdb.rdrecord("test_physical_conversion", physical=True)
for ch, gain in enumerate(adc_gain):
np.testing.assert_allclose(
record.p_signal[:, ch],
p_signal[:, ch],
rtol=0.0000001,
atol=(0.05 / gain),
)

@classmethod
def tearDownClass(cls):
writefiles = [
"test_physical_conversion.dat",
"test_physical_conversion.hea",
]
for file in writefiles:
if os.path.isfile(file):
os.remove(file)


class TestDownload(unittest.TestCase):
# Test that we can download records with no "dat" file
Expand Down
8 changes: 7 additions & 1 deletion wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,11 @@ def adc(self, expanded=False, inplace=False):
self.e_p_signal[ch],
)
np.add(
e_p_signal[ch], self.baseline[ch], self.e_p_signal[ch]
self.e_p_signal[ch],
self.baseline[ch],
self.e_p_signal[ch],
)
np.round(self.e_p_signal[ch], 0, self.e_p_signal[ch])
self.e_p_signal[ch] = self.e_p_signal[ch].astype(
intdtype, copy=False
)
Expand All @@ -556,6 +559,7 @@ def adc(self, expanded=False, inplace=False):
nanlocs = np.isnan(self.p_signal)
np.multiply(self.p_signal, self.adc_gain, self.p_signal)
np.add(self.p_signal, self.baseline, self.p_signal)
np.round(self.p_signal, 0, self.p_signal)
self.p_signal = self.p_signal.astype(intdtype, copy=False)
self.d_signal = self.p_signal
self.p_signal = None
Expand All @@ -570,6 +574,7 @@ def adc(self, expanded=False, inplace=False):
ch_d_signal = self.e_p_signal[ch].copy()
np.multiply(ch_d_signal, self.adc_gain[ch], ch_d_signal)
np.add(ch_d_signal, self.baseline[ch], ch_d_signal)
np.round(ch_d_signal, 0, ch_d_signal)
ch_d_signal = ch_d_signal.astype(intdtype, copy=False)
ch_d_signal[ch_nanlocs] = d_nans[ch]
d_signal.append(ch_d_signal)
Expand All @@ -580,6 +585,7 @@ def adc(self, expanded=False, inplace=False):
d_signal = self.p_signal.copy()
np.multiply(d_signal, self.adc_gain, d_signal)
np.add(d_signal, self.baseline, d_signal)
np.round(d_signal, 0, d_signal)
d_signal = d_signal.astype(intdtype, copy=False)

if nanlocs.any():
Expand Down