-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhrm.py
243 lines (201 loc) · 7.11 KB
/
hrm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import csv
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import json
import sys
import logging
CONST_MAXVOLTAGE = 10000
CONST_MINVOLTAGE = -10000
CONST_MINBPS = 0.5
CONST_CUTOFF = 0.7
CONST_MINTIMEBETWEENBEATS = 0.2
CONST_MINTIME = -10000
CONST_MAXTIME = 10000
CONST_MINPOINTSFORECG = 1000
def readCSV(filename):
"""
Reads a time/voltage csv file and returns into arrays
:param filename: The string of file to be read
:returns: array of times and voltages
"""
with open(filename) as csvfile:
reader = csv.reader(csvfile, delimiter=',')
timesarr = []
voltagesarr = []
for row in reader:
if(len(row) != 2):
continue
if(row[0] == '' or row[1] == ''):
continue
try:
timesarr.append(float(row[0]))
voltagesarr.append(float(row[1]))
except:
logging.error("Found invalid values in file")
raise TypeError("Invalid values encountered in file")
if(max(voltagesarr) > 300):
logging.error("Found value above 300")
raise ValueError("Voltages Exceed Range")
return [timesarr, voltagesarr]
def getDuration(timesarr):
"""
gets the duration of the input time array
:param timesarr: array of times as returned by readCSV
:returns: Duration for which there are times
"""
dur = max(timesarr)-min(timesarr)
return dur
def getBeats(timesarr, voltagesarr):
"""
detects peaks over time of the ecg Signal
:param timesarr: array of times from readCSV
:param voltagesarr: array of voltages from readCSV
:returns: Array of times at which beats are detected
"""
dur = getDuration(timesarr)
numpointsforavg = int(np.ceil(CONST_MINBPS*dur))
sortedvoltagesasc = sorted(voltagesarr, reverse=True)
sortedvoltagesd = sorted(voltagesarr)
cutoff = CONST_CUTOFF * np.average(sortedvoltagesasc[0:numpointsforavg])
cutoff += np.average(sortedvoltagesd[0:numpointsforavg]) * (1-CONST_CUTOFF)
length = len(timesarr)
beats = []
lastbeat = -1
for x in range(0, length-2):
if(voltagesarr[x+1] > cutoff and voltagesarr[x+1] > voltagesarr[x] and
voltagesarr[x+1] > voltagesarr[x+2] and
timesarr[x+1]-lastbeat > CONST_MINTIMEBETWEENBEATS):
beats.append(timesarr[x+1])
lastbeat = timesarr[x+1]
return np.array(beats)
def getNumBeats(beatsarr):
"""
helper which gets total number of beats detected
:param beatsarr: Array of beat times from getBeats
:returns: Number of beats detected
"""
return len(beatsarr)
def getMeanHR(beatsarr, lower=CONST_MINTIME, upper=CONST_MAXTIME, duration=-1):
"""
Computes mean hr, must specify either lower && upper or duration
:param beatsarr: array of beat times from getBeats
:param lower: Optional bottom side of interval
:param upper: Optional upper side of interval
:param duration: Optional duration to compute hr over
:returns: Mean heart rate over interval
"""
length = len(beatsarr)
lowerindex = 0
upperindex = length-1
for i in range(0, length):
if beatsarr[i] > lower:
lowerindex = i
break
for j in range(0, length):
if beatsarr[length-j-1] < upper:
upperindex = length-j
break
nbeats = beatsarr[lowerindex:upperindex]
if(duration == -1):
return len(nbeats)/((upper-lower)/60)
else:
return len(nbeats)*60/duration
def voltageExtremes(voltages):
"""
Gets tuple of min,max of array of voltages
:param voltages: Array of voltages from readCSV
:returns: tuple of min, max voltages
"""
min = CONST_MAXVOLTAGE
max = CONST_MINVOLTAGE
for k in voltages:
if k < min:
min = k
if k > max:
max = k
return (min, max)
def hrd(time, voltage, lower=CONST_MINTIME, upper=CONST_MAXTIME, duration=-1):
"""
Wrapper function used to call other functions giving metrics
:param time: time array from readCSV
:param voltage: voltage array from readCSV
:param lower: see getMeanHR
:param upper: see getMeanHR
:param duration: see getMeanHR
:returns: dictionary of all compiled metrics
"""
dict = {}
dict["voltage_extremes"] = voltageExtremes(voltage)
dict["duration"] = getDuration(time)
beats = getBeats(time, voltage)
dict["beats"] = beats
dict["num_beats"] = getNumBeats(beats)
if duration == -1:
dict["mean_hr_bpm"] = getMeanHR(beats, lower, upper)
else:
if(duration == 0):
raise ValueError("Invalid Duration")
dict["mean_hr_bpm"] = getMeanHR(beats, duration=duration)
return dict
def writeJson(filename, dict):
"""
Writes json to file
:param filename: filename as entered by User
:param dict: dictionary as returned by hrd
:returns: -1 if unable to write , else does not return
"""
if(not isinstance(dict, type({}))):
return -1
ndict = dict.copy()
ndict["beats"] = ndict["beats"].tolist()
ind = filename.find('.')
if ind == -1:
return -1
jsonname = filename[0:ind] + ".json"
with open(jsonname, 'w') as file:
json.dump(ndict, file, ensure_ascii=False)
if __name__ == "__main__":
"""
Main gets user inputs and uses those inputs to perform task
"""
logging.basicConfig(filename='hrm.log', level=logging.INFO,
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')
logging.info("Starting New Log...")
try:
fname = input("Enter CSV File Name: ")
[time, voltage] = readCSV(fname)
except:
logging.error("Unable to Read File")
sys.exit("Invalid File")
logging.info("filename: " + fname)
if(len(time) < CONST_MINPOINTSFORECG):
sys.exit("Not enough data to perform analysis")
k = input("Select custom interval for Mean HR y/n?")
if k == "y":
print("Signal ranged from ")
print(str(min(time)) + " seconds to ")
print(str(max(time)) + " seconds")
lower = input("Enter lower in seconds:")
upper = input("Enter upper in seconds:")
logging.info("User Input Lower: " + lower)
logging.info("User Input Upper: " + upper)
try:
lower = float(lower)
upper = float(upper)
except:
logging.error("Check User Input")
sys.exit("User Input Non numeric")
if(lower < min(time)):
logging.error("Check User Input")
sys.exit("User Input Below Valid Range")
if(upper > max(time)):
logging.error("Check User Input")
sys.exit("User Input Above Valid Range")
hrdict = hrd(time, voltage, lower, upper)
else:
hrdict = hrd(time, voltage, duration=getDuration(time))
writeJson(fname, hrdict)
metrics = hrdict
print("Wrote Stats to File")