Skip to content

Commit 5bf7913

Browse files
committed
Implement vpos-based readers for bam, vcf, bcf
1 parent 4e74d75 commit 5bf7913

File tree

5 files changed

+165
-64
lines changed

5 files changed

+165
-64
lines changed

pyproject.toml

+6-13
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ license = { text = "MIT" }
1313
readme = "README.md"
1414
requires-python = ">=3.7"
1515
classifiers = [
16-
"Development Status :: 1 - Planning",
16+
"Development Status :: 4 - Beta",
1717
"Intended Audience :: Science/Research",
1818
"Intended Audience :: Developers",
1919
"License :: OSI Approved :: MIT License",
@@ -28,14 +28,13 @@ classifiers = [
2828
"Programming Language :: Python :: 3.11",
2929
"Programming Language :: Python :: 3.12",
3030
"Topic :: Scientific/Engineering",
31+
"Topic :: Scientific/Engineering :: Bio-Informatics",
3132
"Typing :: Typed",
3233
]
3334
dynamic = ["version"]
3435
dependencies = [
35-
"bioframe",
3636
"dask",
37-
"oxbow",
38-
"pandas",
37+
"oxbow>=0.2.0",
3938
"pyarrow",
4039
"typing_extensions >=3.7; python_version<'3.8'",
4140
]
@@ -46,8 +45,10 @@ test = [
4645
"pytest-cov >=3",
4746
]
4847
dev = [
48+
"black",
4949
"pytest >=6",
5050
"pytest-cov >=3",
51+
"ruff"
5152
]
5253
docs = [
5354
"furo",
@@ -72,7 +73,7 @@ envs.default.dependencies = [
7273
]
7374

7475
[tool.hatch.envs.default.scripts]
75-
fix = "ruff --fix ."
76+
lint = "ruff --fix ."
7677
test = "pytest ."
7778
docs = "sphinx-autobuild docs docs/_build/html"
7879

@@ -139,12 +140,7 @@ select = [
139140
"NPY", # NumPy specific rules
140141
"PD", # pandas-vet
141142
]
142-
extend-ignore = [
143-
"PLR", # Design related pylint codes
144-
"E501", # Line too long
145-
]
146143
target-version = "py37"
147-
typing-modules = ["dask_ngs._compat.typing"]
148144
src = ["src"]
149145
unfixable = [
150146
"T20", # Removes print statements
@@ -153,6 +149,3 @@ unfixable = [
153149
exclude = []
154150
flake8-unused-arguments.ignore-variadic-names = true
155151
isort.required-imports = ["from __future__ import annotations"]
156-
157-
[tool.ruff.per-file-ignores]
158-
"tests/**" = ["T20"]

src/dask_ngs/__init__.py

+127-13
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from __future__ import annotations
22

33
from io import BytesIO
4+
from pathlib import Path
45

5-
import bioframe
66
import dask
77
import dask.dataframe as dd
88
import oxbow as ox
@@ -11,37 +11,151 @@
1111

1212
__version__ = "0.1.0"
1313

14-
__all__ = ("__version__", "read_bam")
14+
__all__ = ("__version__", "read_bam", "read_vcf", "read_bcf")
1515

1616

17-
def _read_bam_query_from_path(
18-
path: str, chrom: str, start: int, end: int
17+
def _read_bam_vpos_from_path(
18+
path: str, vpos_lo: tuple[int, int], vpos_hi: tuple[int, int]
1919
) -> pd.DataFrame:
20-
stream = BytesIO(ox.read_bam(path, f"{chrom}:{start}-{end}"))
20+
stream = BytesIO(ox.read_bam_vpos(path, vpos_lo, vpos_hi))
2121
ipc = pyarrow.ipc.open_file(stream)
2222
return ipc.read_pandas()
2323

2424

25-
def read_bam(path: str, chunksize: int = 10_000_000) -> dd.DataFrame:
25+
def _read_vcf_vpos_from_path(
26+
path: str, vpos_lo: tuple[int, int], vpos_hi: tuple[int, int]
27+
) -> pd.DataFrame:
28+
stream = BytesIO(ox.read_vcf_vpos(path, vpos_lo, vpos_hi))
29+
ipc = pyarrow.ipc.open_file(stream)
30+
return ipc.read_pandas()
31+
32+
33+
def _read_bcf_vpos_from_path(
34+
path: str, vpos_lo: tuple[int, int], vpos_hi: tuple[int, int]
35+
) -> pd.DataFrame:
36+
stream = BytesIO(ox.read_bcf_vpos(path, vpos_lo, vpos_hi))
37+
ipc = pyarrow.ipc.open_file(stream)
38+
return ipc.read_pandas()
39+
40+
41+
def read_bam(
42+
path: str | Path, chunksize: int = 10_000_000, index: str | Path | None = None
43+
) -> dd.DataFrame:
2644
"""
2745
Map an indexed BAM file to a Dask DataFrame.
2846
2947
Parameters
3048
----------
31-
path : str
49+
path : str or Path
3250
Path to the BAM file.
3351
chunksize : int, optional [default=10_000_000]
34-
Chunk size, currently in base pair coordinates.
52+
Approximate partition size, in compressed bytes.
53+
index : str or Path, optional
54+
Path to the index file. If not provided, the index file is assumed to
55+
be at the same location as the BAM file, with the same name but with
56+
the additional .bai or .csi extension.
57+
58+
Returns
59+
-------
60+
dask.dataframe.DataFrame
61+
"""
62+
path = Path(path)
63+
if index is None:
64+
bai_index = path.with_suffix(".bai")
65+
csi_index = path.with_suffix(".csi")
66+
if bai_index.exists():
67+
index = bai_index
68+
elif csi_index.exists():
69+
index = csi_index
70+
else:
71+
msg = "Index .bai or .csi file not found."
72+
raise FileNotFoundError(msg)
73+
74+
vpos = ox.partition_from_index_file(index, chunksize)
75+
chunks = [
76+
dask.delayed(_read_bam_vpos_from_path)(path, tuple(vpos[i]), tuple(vpos[i + 1]))
77+
for i in range(len(vpos) - 1)
78+
]
79+
80+
return dd.from_delayed(chunks)
81+
82+
83+
def read_vcf(
84+
path: str | Path, chunksize: int = 10_000_000, index: str | Path | None = None
85+
) -> dd.DataFrame:
86+
"""
87+
Map an indexed, bgzf-compressed VCF.gz file to a Dask DataFrame.
88+
89+
Parameters
90+
----------
91+
path : str or Path
92+
Path to the VCF.gz file.
93+
chunksize : int, optional [default=10_000_000]
94+
Approximate partition size, in compressed bytes.
95+
index : str or Path, optional
96+
Path to the index file. If not provided, the index file is assumed to
97+
be at the same location as the VCF.gz file, with the same name but with
98+
the additional .tbi or .csi extension.
3599
36100
Returns
37101
-------
38102
dask.dataframe.DataFrame
39-
A Dask DataFrame with the BAM file contents.
40103
"""
41-
chromsizes = bioframe.fetch_chromsizes("hg38")
42-
chunk_spans = bioframe.binnify(chromsizes, chunksize)
104+
path = Path(path)
105+
if index is None:
106+
tbi_index = path.with_suffix(".tbi")
107+
csi_index = path.with_suffix(".csi")
108+
if tbi_index.exists():
109+
index = tbi_index
110+
elif csi_index.exists():
111+
index = csi_index
112+
else:
113+
msg = "Index .tbi or .csi file not found."
114+
raise FileNotFoundError(msg)
115+
116+
vpos = ox.partition_from_index_file(index, chunksize)
43117
chunks = [
44-
dask.delayed(_read_bam_query_from_path)(path, chrom, start + 1, end)
45-
for chrom, start, end in chunk_spans.to_numpy()
118+
dask.delayed(_read_vcf_vpos_from_path)(path, tuple(vpos[i]), tuple(vpos[i + 1]))
119+
for i in range(len(vpos) - 1)
46120
]
121+
122+
return dd.from_delayed(chunks)
123+
124+
125+
def read_bcf(
126+
path: str | Path, chunksize: int = 10_000_000, index: str | Path | None = None
127+
) -> dd.DataFrame:
128+
"""
129+
Map an indexed BCF file to a Dask DataFrame.
130+
131+
Parameters
132+
----------
133+
path : str or Path
134+
Path to the BCF file.
135+
chunksize : int, optional [default=10_000_000]
136+
Approximate partition size, in compressed bytes.
137+
index : str or Path, optional
138+
Path to the index file. If not provided, the index file is assumed to
139+
be at the same location as the BCF file, with the same name but with
140+
the additional .csi extension.
141+
142+
Returns
143+
-------
144+
dask.dataframe.DataFrame
145+
"""
146+
path = Path(path)
147+
if index is None:
148+
csi_index = path.with_suffix(".csi")
149+
if csi_index.exists():
150+
index = csi_index
151+
else:
152+
msg = "Index .csi file not found."
153+
raise FileNotFoundError(msg)
154+
155+
vpos = ox.partition_from_index_file(index, chunksize)
156+
chunks = [
157+
dask.delayed(_read_bcf_vpos_from_path)(path, tuple(vpos[i]), tuple(vpos[i + 1]))
158+
for i in range(len(vpos) - 1)
159+
]
160+
47161
return dd.from_delayed(chunks)

src/dask_ngs/_compat/__init__.py

-1
This file was deleted.

src/dask_ngs/_compat/typing.py

-14
This file was deleted.

src/dask_ngs/_index.py

+32-23
Original file line numberDiff line numberDiff line change
@@ -120,21 +120,23 @@ def _cumsum_assign_chunks(arr: np.array, thresh: int) -> tuple[np.array, np.arra
120120
"""
121121
Loops through a given array of integers, cumulatively summing the values.
122122
The rows are labeled with a `chunk_id`, starting at 0.
123+
123124
When the cumulative sum exceeds the threshold, the chunk_id is incremented,
124125
and the next rows are binned into the next chunk until again the threshold
125126
is reached. The cumulative sum of that chunk is also recorded as `size`.
126127
Returns a tuple of the cumulative sum array and the chunk_id array.
127128
128-
Args:
129-
arr : numpy array
130-
The array of byte offsets to chunk
131-
thresh : int
132-
The size of chunks in bytes
133-
134-
Returns:
135-
Tuple of numpy arrays
136-
0 : array of cumulative byte sums
137-
1 : array of chunk_ids assigned to each row
129+
Parameters
130+
----------
131+
arr : numpy array
132+
The array of byte offsets to chunk
133+
thresh : int
134+
The size of chunks in bytes
135+
136+
Returns
137+
-------
138+
array of cumulative byte sums
139+
array of chunk_ids assigned to each row
138140
"""
139141
sum = 0
140142
chunkid = 0
@@ -153,14 +155,16 @@ def _cumsum_assign_chunks(arr: np.array, thresh: int) -> tuple[np.array, np.arra
153155
def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.DataFrame:
154156
"""Given a dataframe of offset positions, calculate the difference
155157
between each byte offset.
158+
156159
Group those differences into chunks of size `chunksize_bytes`.
157160
158-
Returns:
159-
A Pandas dataframe with additional columns:
160-
chunk_id : int
161-
The chunk index that row was assigned
162-
size : int
163-
The cumulative size of that chunk
161+
Returns
162+
-------
163+
A Pandas dataframe with additional columns:
164+
chunk_id : int
165+
The chunk index that row was assigned
166+
size : int
167+
The cumulative size of that chunk
164168
"""
165169

166170
# calculate the difference in byte positions from the prior row
@@ -191,15 +195,20 @@ def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.Dat
191195

192196

193197
def consolidate_chunks(offsets_uniq: pd.DataFrame) -> pd.DataFrame:
194-
"""Group the data by `chunk_id`,
195-
keeping the first compressed byte value (`ioffset.cpos`)
196-
and the first uncompressed byte value of that stream (`ioffset.upos`).
198+
"""Group the data by `chunk_id`, keeping the first compressed byte value
199+
(`ioffset.cpos`) and the first uncompressed byte value of that stream
200+
(`ioffset.upos`).
201+
197202
Take the last `size` value which tells you how many compressed bytes to read.
198203
199-
Returns:
200-
A Pandas dataframe grouped by `chunk_id`
201-
Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes.
202-
`ioffsets.upos` tells you which byte to read first from the uncompressed data.
204+
Returns
205+
-------
206+
A Pandas dataframe grouped by `chunk_id`
207+
208+
Notes
209+
-----
210+
Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes.
211+
`ioffsets.upos` tells you which byte to read first from the uncompressed data.
203212
"""
204213
return offsets_uniq.groupby("chunk_id").agg(
205214
{"ioffset.cpos": "first", "ioffset.upos": "first", "size": "last"}

0 commit comments

Comments
 (0)