-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathPeakListParser.py
248 lines (207 loc) · 9.63 KB
/
PeakListParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import ntpath
import zipfile
import Ms2Reader as py_msn
import MGF as py_mgf
import pymzml
import re
import gzip
import os
class PeakListParseError(Exception):
pass
class ScanNotFoundException(Exception):
pass
class PeakListParser:
def __init__(self, pl_path, file_format_accession, spectrum_id_format_accession):
# self.spectra_data = spectra_data
self.file_format_accession = file_format_accession
self.spectrum_id_format_accession = spectrum_id_format_accession
self.peak_list_path = pl_path
self.peak_list_file_name = os.path.split(pl_path)[1]
try:
if self.is_mzML():
self.reader = pymzml.run.Reader(pl_path)
elif self.is_mgf():
self.reader = py_mgf.Reader(pl_path)
elif self.is_ms2():
self.reader = py_msn.Reader(pl_path)
else:
self.reader = None
except Exception as e:
message = "Error reading peak list file {0}: {1} - Arguments:\n{2!r}".format(self.peak_list_file_name, type(e).__name__, e.args)
raise PeakListParseError(message)
def is_mgf(self):
return self.file_format_accession == 'MS:1001062'
def is_mzML(self):
return self.file_format_accession == 'MS:1000584'
def is_ms2(self):
return self.file_format_accession == 'MS:1001466'
@staticmethod
def extract_gz(in_file):
if in_file.endswith('.gz'):
in_f = gzip.open(in_file, 'rb')
in_file = in_file.replace(".gz", "")
out_f = open(in_file, 'wb')
out_f.write(in_f.read())
in_f.close()
out_f.close()
return in_file
else:
raise StandardError("unsupported file extension for: %s" % in_file)
@staticmethod
def unzip_peak_lists(zip_file):
"""
unzips and returns resulting folder
:param zip_file: path to archive to unzip
:return: resulting folder
"""
if zip_file.endswith(".zip"):
zip_ref = zipfile.ZipFile(zip_file, 'r')
unzip_path = zip_file + '_unzip/'
zip_ref.extractall(unzip_path)
zip_ref.close()
return unzip_path
else:
raise StandardError("unsupported file extension for: %s" % zip_file)
@staticmethod
def get_ion_types_mzml(scan):
frag_methods = {
'beam-type collision-induced dissociation': ["b", "y"],
'collision-induced dissociation': ["b", "y"],
'electron transfer dissociation': ["c", "z"],
}
# get fragMethod and translate that to Ion Types
ion_types = []
for key in scan.keys():
if key in frag_methods.keys():
ion_types += frag_methods[key]
return ion_types
def get_scan(self, scan_id):
if self.reader is None:
raise PeakListParseError("unsupported peak list file type for: %s" % ntpath.basename(self.peak_list_file_name))
try:
scan = self.reader[scan_id]
except Exception as e:
# raise ScanNotFoundException(type(e).__name__,
# ntpath.basename(self.peak_list_path), e.args)
raise ScanNotFoundException("%s - for file: %s - scanId: %s" % (e.args[0], ntpath.basename(self.peak_list_path), scan_id))
if self.is_mzML():
peak_list = "\n".join(["%s %s" % (mz, i) for mz, i in scan.peaks if i > 0])
precursor = None
if 'precursors' in scan:
precursor = scan['precursors'][0]
elif self.is_mgf():
peak_list = scan['peaks']
precursor = scan['precursor']
elif self.is_ms2():
peak_list = scan['peaks']
precursor = scan['precursor']
scan = {
'peaks': peak_list,
'precursor': precursor
}
return scan
def parse_scan_id(self, spec_id):
# #
# # if (fileIdFormat == Constants.SpecIdFormat.MASCOT_QUERY_NUM) {
# # String rValueStr = spectrumID.replaceAll("query=", "");
# # String id = null;
# # if(rValueStr.matches(Constants.INTEGER)){
# # id = Integer.toString(Integer.parseInt(rValueStr) + 1);
# # }
# # return id;
# # } else if (fileIdFormat == Constants.SpecIdFormat.MULTI_PEAK_LIST_NATIVE_ID) {
# # String rValueStr = spectrumID.replaceAll("index=", "");
# # String id;
# # if(rValueStr.matches(Constants.INTEGER)){
# # id = Integer.toString(Integer.parseInt(rValueStr) + 1);
# # return id;
# # }
# # return spectrumID;
# # } else if (fileIdFormat == Constants.SpecIdFormat.SINGLE_PEAK_LIST_NATIVE_ID) {
# # return spectrumID.replaceAll("file=", "");
# # } else if (fileIdFormat == Constants.SpecIdFormat.MZML_ID) {
# # return spectrumID.replaceAll("mzMLid=", "");
# # } else if (fileIdFormat == Constants.SpecIdFormat.SCAN_NUMBER_NATIVE_ID) {
# # return spectrumID.replaceAll("scan=", "");
# # } else {
# # return spectrumID;
# # }
#
# # e.g.: MS:1000768(Thermo nativeID format)
# # e.g.: MS:1000769(Waters nativeID format)
# # e.g.: MS:1000770(WIFF nativeID format)
# # e.g.: MS:1000771(Bruker / Agilent YEP nativeID format)
# # e.g.: MS:1000772(Bruker BAF nativeID format)
# # e.g.: MS:1000773(Bruker FID nativeID format)
# # e.g.: MS:1000774(multiple peak list nativeID format)
# # e.g.: MS:1000775(single peak list nativeID format)
# # e.g.: MS:1000776(scan number only nativeID format)
# # e.g.: MS:1000777(spectrum identifier nativeID format)
# ignore_dict_index = False
identified_spec_id_format = False
# if spec_id_format is not None and 'accession' in spec_id_format: # not needed, checked in constructor
# MS:1000774 multiple peak list nativeID format - zero based
if self.spectrum_id_format_accession == 'MS:1000774':
identified_spec_id_format = True
# ignore_dict_index = True
try:
matches = re.match("index=([0-9]+)", spec_id).groups()
spec_id = int(matches[0])
# try to cast spec_id to int if re doesn't match -> PXD006767 has this format
# ToDo: do we want to be stricter?
except (AttributeError, IndexError):
try:
spec_id = int(spec_id)
except ValueError:
raise PeakListParseError("invalid spectrum ID format!")
# MS:1000775 single peak list nativeID format
# The nativeID must be the same as the source file ID.
# Used for referencing peak list files with one spectrum per file,
# typically in a folder of PKL or DTAs, where each sourceFileRef is different.
elif self.spectrum_id_format_accession == 'MS:1000775':
identified_spec_id_format = True
# ignore_dict_index = True
spec_id = 0
# MS:1000776 scan number only nativeID format
# Used for referencing mzXML, or a DTA folder where native scan numbers can be derived.
elif self.spectrum_id_format_accession == 'MS:1000776':
identified_spec_id_format = True
try:
matches = re.match("scan=([0-9]+)", spec_id).groups()
spec_id = int(matches[0])
except (IndexError, AttributeError):
raise PeakListParseError("invalid spectrum ID format!")
# MS:1000768 Thermo nativeID format:
# controllerType=xsd:nonNegativeIntege controllerNumber=xsd:positiveInteger scan=xsd:positiveInteger
elif self.spectrum_id_format_accession == 'MS:1000768':
identified_spec_id_format = True
try:
matches = re.search("scan=([0-9]+)", spec_id).groups()
spec_id = int(matches[0])
except (IndexError, AttributeError):
raise PeakListParseError("invalid spectrum ID format!")
# MS:1001530 mzML unique identifier:
# Used for referencing mzML. The value of the spectrum ID attribute is referenced directly.
elif self.spectrum_id_format_accession == 'MS:1001530':
# ToDo: pymzML uses scan number as index and not the spectrumID attribute therefore
# ToDo: we resort to using the default way of parsing the spec_id, i.e. using the last number in the string
# ToDo: This might change if we are going to use a different (more standards compliant approach)
identified_spec_id_format = False
# matches = re.search("scan=([0-9]+)", spec_id).groups()
# try:
# spec_id = int(matches[0])
# identified_spec_id_format = True
# except IndexError:
# pass
if not identified_spec_id_format:
# ToDo: display warning or throw error? depending on strict mode or not?
matches = re.findall("([0-9]+)", spec_id)
# match = re.match("(.*)([0-9]+)", spec_id)
try:
spec_id = int(matches[-1])
# spec_id = match.group(2)
except IndexError:
raise PeakListParseError("failed to parse spectrumID from %s" % spec_id)
#
# spec_id = match.group(2)
return spec_id