|
1 | 1 | import csv |
2 | | -from functools import cached_property |
| 2 | +import json |
3 | 3 | from pathlib import Path |
4 | | -from typing import Any |
5 | 4 |
|
6 | 5 | from workflow_pathoscope.rust import run_expectation_maximization, PathoscopeResults |
7 | 6 |
|
8 | 7 |
|
9 | | -class SamLine: |
10 | | - def __init__(self, line: str): |
11 | | - self._line = line |
| 8 | +def write_isolate_fasta( |
| 9 | + otu_ids: set[str], |
| 10 | + json_path: Path, |
| 11 | + target_path: Path, |
| 12 | +) -> dict[str, int]: |
| 13 | + """Generate a FASTA file for all the isolates of the OTUs specified by ``otu_ids``. |
12 | 14 |
|
13 | | - def __str__(self) -> str: |
14 | | - return self.line |
15 | | - |
16 | | - @property |
17 | | - def line(self) -> str: |
18 | | - """The SAM line used to create the object.""" |
19 | | - return self._line |
20 | | - |
21 | | - @property |
22 | | - def read_id(self) -> str: |
23 | | - """The ID of the mapped read.""" |
24 | | - return self.fields[0] |
25 | | - |
26 | | - @cached_property |
27 | | - def read_length(self) -> int: |
28 | | - """The length of the mapped read.""" |
29 | | - return len(self.fields[9]) |
30 | | - |
31 | | - @cached_property |
32 | | - def fields(self) -> list[Any]: |
33 | | - """The SAM fields""" |
34 | | - return self.line.split("\t") |
35 | | - |
36 | | - @cached_property |
37 | | - def position(self) -> int: |
38 | | - """The position of the read on the reference.""" |
39 | | - return int(self.fields[3]) |
40 | | - |
41 | | - @cached_property |
42 | | - def score(self) -> float: |
43 | | - """The Pathoscope score for the alignment.""" |
44 | | - return find_sam_align_score(self.fields) |
45 | | - |
46 | | - @cached_property |
47 | | - def bitwise_flag(self) -> int: |
48 | | - """The SAM bitwise flag.""" |
49 | | - return int(self.fields[1]) |
50 | | - |
51 | | - @cached_property |
52 | | - def unmapped(self) -> bool: |
53 | | - """The read is unmapped. |
54 | | -
|
55 | | - This value is derived from the bitwise flag (0x4: segment unmapped). |
56 | | - """ |
57 | | - return self.bitwise_flag & 4 == 4 |
58 | | - |
59 | | - @cached_property |
60 | | - def ref_id(self) -> str: |
61 | | - """The ID of the mapped reference sequence.""" |
62 | | - return self.fields[2] |
63 | | - |
64 | | - |
65 | | -def find_sam_align_score(fields: list[Any]) -> float: |
66 | | - """Find the Bowtie2 alignment score for the given split line (``fields``). |
67 | | -
|
68 | | - Searches the SAM fields for the ``AS:i`` substring and extracts the Bowtie2-specific |
69 | | - alignment score. This will not work for other aligners. |
70 | | -
|
71 | | - :param fields: a SAM line that has been split on "\t" |
72 | | - :return: the alignment score |
| 15 | + :param otu_ids: the list of OTU IDs for which to generate and index |
| 16 | + :param json_path: the path to the reference index json file |
| 17 | + :param target_path: the path to write the fasta file to |
| 18 | + :return: a dictionary of the lengths of all sequences keyed by their IDS |
73 | 19 |
|
74 | 20 | """ |
75 | | - read_length = float(len(fields[9])) |
| 21 | + lengths = {} |
76 | 22 |
|
77 | | - for field in fields: |
78 | | - if field.startswith("AS:i:"): |
79 | | - a_score = int(field[5:]) |
80 | | - return a_score + read_length |
| 23 | + with open(json_path) as f_json, open(target_path, "w") as f_target: |
| 24 | + for otu in json.load(f_json): |
| 25 | + if otu["_id"] in otu_ids: |
| 26 | + for isolate in otu["isolates"]: |
| 27 | + for sequence in isolate["sequences"]: |
| 28 | + f_target.write(f">{sequence['_id']}\n{sequence['sequence']}\n") |
| 29 | + lengths[sequence["_id"]] = len(sequence["sequence"]) |
81 | 30 |
|
82 | | - raise ValueError("Could not find alignment score") |
| 31 | + return lengths |
83 | 32 |
|
84 | 33 |
|
85 | 34 | def write_report( |
@@ -208,15 +157,3 @@ def run_pathoscope( |
208 | 157 | p_score_cutoff, |
209 | 158 | ref_lengths, |
210 | 159 | ) |
211 | | - |
212 | | - |
213 | | -# Backward compatibility alias - DEPRECATED |
214 | | -def run_pathoscope_sam( |
215 | | - sam_path: Path, p_score_cutoff: float, ref_lengths: dict[str, int] |
216 | | -): |
217 | | - """ |
218 | | - Deprecated: Use run_pathoscope instead. |
219 | | -
|
220 | | - This function is kept for backward compatibility. |
221 | | - """ |
222 | | - return run_pathoscope(sam_path, p_score_cutoff, ref_lengths) |
0 commit comments