diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index c45d2b7b..e95e5f69 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -8,8 +8,8 @@ import numcodecs import tabulate -from . import plink, provenance, vcf2zarr, vcf_utils -from .vcf2zarr import icf as icf_mod +from . import icf as icf_mod +from . import plink, provenance, vcf_utils logger = logging.getLogger(__name__) @@ -236,7 +236,7 @@ def explode( """ setup_logging(verbose) check_overwrite_dir(icf_path, force) - vcf2zarr.explode( + icf_mod.explode( icf_path, vcfs, worker_processes=worker_processes, @@ -276,7 +276,7 @@ def dexplode_init( setup_logging(verbose) check_overwrite_dir(icf_path, force) check_partitions(num_partitions) - work_summary = vcf2zarr.explode_init( + work_summary = icf_mod.explode_init( icf_path, vcfs, target_num_partitions=num_partitions, @@ -304,7 +304,7 @@ def dexplode_partition(icf_path, partition, verbose, one_based): setup_logging(verbose) if one_based: partition -= 1 - vcf2zarr.explode_partition(icf_path, partition) + icf_mod.explode_partition(icf_path, partition) @click.command @@ -315,7 +315,7 @@ def dexplode_finalise(icf_path, verbose): Final step for distributed conversion of VCF(s) to intermediate columnar format. """ setup_logging(verbose) - vcf2zarr.explode_finalise(icf_path) + icf_mod.explode_finalise(icf_path) @click.command @@ -326,7 +326,7 @@ def inspect(path, verbose): Inspect an intermediate columnar format or Zarr path. """ setup_logging(verbose) - data = vcf2zarr.inspect(path) + data = icf_mod.inspect(path) click.echo(tabulate.tabulate(data, headers="keys")) @@ -345,7 +345,7 @@ def mkschema(icf_path, variants_chunk_size, samples_chunk_size, local_alleles): err=True, ) stream = click.get_text_stream("stdout") - vcf2zarr.mkschema( + icf_mod.mkschema( icf_path, stream, variants_chunk_size=variants_chunk_size, @@ -384,7 +384,7 @@ def encode( """ setup_logging(verbose) check_overwrite_dir(zarr_path, force) - vcf2zarr.encode( + icf_mod.encode( icf_path, zarr_path, schema_path=schema, @@ -438,7 +438,7 @@ def dencode_init( setup_logging(verbose) check_overwrite_dir(zarr_path, force) check_partitions(num_partitions) - work_summary = vcf2zarr.encode_init( + work_summary = icf_mod.encode_init( icf_path, zarr_path, target_num_partitions=num_partitions, @@ -466,7 +466,7 @@ def dencode_partition(zarr_path, partition, verbose, one_based): setup_logging(verbose) if one_based: partition -= 1 - vcf2zarr.encode_partition(zarr_path, partition) + icf_mod.encode_partition(zarr_path, partition) @click.command @@ -478,7 +478,7 @@ def dencode_finalise(zarr_path, verbose, progress): Final step for distributed conversion of ICF to VCF Zarr. """ setup_logging(verbose) - vcf2zarr.encode_finalise(zarr_path, show_progress=progress) + icf_mod.encode_finalise(zarr_path, show_progress=progress) @click.command(name="convert") @@ -507,7 +507,7 @@ def convert_vcf( """ setup_logging(verbose) check_overwrite_dir(zarr_path, force) - vcf2zarr.convert( + icf_mod.convert( vcfs, zarr_path, variants_chunk_size=variants_chunk_size, diff --git a/bio2zarr/core.py b/bio2zarr/core.py index b53cbe02..631ba128 100644 --- a/bio2zarr/core.py +++ b/bio2zarr/core.py @@ -34,6 +34,16 @@ def display_size(n): return humanfriendly.format_size(n, binary=True) +def parse_max_memory(max_memory): + if max_memory is None: + # Effectively unbounded + return 2**63 + if isinstance(max_memory, str): + max_memory = humanfriendly.parse_size(max_memory) + logger.info(f"Set memory budget to {display_size(max_memory)}") + return max_memory + + def min_int_dtype(min_value, max_value): if min_value > max_value: raise ValueError("min_value must be <= max_value") diff --git a/bio2zarr/vcf2zarr/icf.py b/bio2zarr/icf.py similarity index 68% rename from bio2zarr/vcf2zarr/icf.py rename to bio2zarr/icf.py index 29e90a57..370432e2 100644 --- a/bio2zarr/vcf2zarr/icf.py +++ b/bio2zarr/icf.py @@ -8,12 +8,14 @@ import pickle import shutil import sys +import tempfile +from functools import partial from typing import Any import numcodecs import numpy as np -from .. import constants, core, provenance, vcf_utils +from . import constants, core, provenance, vcf_utils, vcz logger = logging.getLogger(__name__) @@ -116,23 +118,6 @@ class VcfPartition: ) -@dataclasses.dataclass -class Contig: - id: str - length: int = None - - -@dataclasses.dataclass -class Sample: - id: str - - -@dataclasses.dataclass -class Filter: - id: str - description: str = "" - - @dataclasses.dataclass class IcfMetadata(core.JsonDataclass): samples: list @@ -187,9 +172,9 @@ def fromdict(d): d = d.copy() d["partitions"] = partitions d["fields"] = [VcfField.fromdict(fd) for fd in d["fields"]] - d["samples"] = [Sample(**sd) for sd in d["samples"]] - d["filters"] = [Filter(**fd) for fd in d["filters"]] - d["contigs"] = [Contig(**cd) for cd in d["contigs"]] + d["samples"] = [vcz.Sample(**sd) for sd in d["samples"]] + d["filters"] = [vcz.Filter(**fd) for fd in d["filters"]] + d["contigs"] = [vcz.Contig(**cd) for cd in d["contigs"]] return IcfMetadata(**d) def __eq__(self, other): @@ -240,7 +225,7 @@ def scan_vcf(path, target_num_partitions): description = "" if h["ID"] == "PASS": pass_index = len(filters) - filters.append(Filter(h["ID"], description)) + filters.append(vcz.Filter(h["ID"], description)) # Ensure PASS is the first filter if present if pass_index > 0: @@ -262,9 +247,9 @@ def scan_vcf(path, target_num_partitions): contig_lengths = [None for _ in vcf.seqnames] metadata = IcfMetadata( - samples=[Sample(sample_id) for sample_id in vcf.samples], + samples=[vcz.Sample(sample_id) for sample_id in vcf.samples], contigs=[ - Contig(contig_id, length) + vcz.Contig(contig_id, length) for contig_id, length in zip(vcf.seqnames, contig_lengths) ], filters=filters, @@ -366,64 +351,58 @@ def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1): return icf_metadata, header -def sanitise_value_bool(buff, j, value): +def sanitise_value_bool(shape, value): x = True if value is None: x = False - buff[j] = x + return x -def sanitise_value_float_scalar(buff, j, value): +def sanitise_value_float_scalar(shape, value): x = value if value is None: x = [constants.FLOAT32_MISSING] - buff[j] = x[0] + return x[0] -def sanitise_value_int_scalar(buff, j, value): +def sanitise_value_int_scalar(shape, value): x = value if value is None: - # print("MISSING", INT_MISSING, INT_FILL) x = [constants.INT_MISSING] else: x = sanitise_int_array(value, ndmin=1, dtype=np.int32) - buff[j] = x[0] + return x[0] -def sanitise_value_string_scalar(buff, j, value): +def sanitise_value_string_scalar(shape, value): if value is None: - buff[j] = "." + return "." else: - buff[j] = value[0] + return value[0] -def sanitise_value_string_1d(buff, j, value): +def sanitise_value_string_1d(shape, value): if value is None: - buff[j] = "." + return np.full(shape, ".", dtype="O") else: - # value = np.array(value, ndmin=1, dtype=buff.dtype, copy=False) - # FIXME failure isn't coming from here, it seems to be from an - # incorrectly detected dimension in the zarr array - # The dimesions look all wrong, and the dtype should be Object - # not str value = drop_empty_second_dim(value) - buff[j] = "" - buff[j, : value.shape[0]] = value + result = np.full(shape, "", dtype=value.dtype) + result[: value.shape[0]] = value + return result -def sanitise_value_string_2d(buff, j, value): +def sanitise_value_string_2d(shape, value): if value is None: - buff[j] = "." + return np.full(shape, ".", dtype="O") else: - # print(buff.shape, value.dtype, value) - # assert value.ndim == 2 - buff[j] = "" + result = np.full(shape, "", dtype="O") if value.ndim == 2: - buff[j, :, : value.shape[1]] = value + result[: value.shape[0], : value.shape[1]] = value else: - # TODO check if this is still necessary + # Convert 1D array into 2D with appropriate shape for k, val in enumerate(value): - buff[j, k, : len(val)] = val + result[k, : len(val)] = val + return result def drop_empty_second_dim(value): @@ -433,27 +412,28 @@ def drop_empty_second_dim(value): return value -def sanitise_value_float_1d(buff, j, value): +def sanitise_value_float_1d(shape, value): if value is None: - buff[j] = constants.FLOAT32_MISSING + return np.full(shape, constants.FLOAT32_MISSING) else: - value = np.array(value, ndmin=1, dtype=buff.dtype, copy=True) + value = np.array(value, ndmin=1, dtype=np.float32, copy=True) # numpy will map None values to Nan, but we need a # specific NaN value[np.isnan(value)] = constants.FLOAT32_MISSING value = drop_empty_second_dim(value) - buff[j] = constants.FLOAT32_FILL - buff[j, : value.shape[0]] = value + result = np.full(shape, constants.FLOAT32_FILL, dtype=np.float32) + result[: value.shape[0]] = value + return result -def sanitise_value_float_2d(buff, j, value): +def sanitise_value_float_2d(shape, value): if value is None: - buff[j] = constants.FLOAT32_MISSING + return np.full(shape, constants.FLOAT32_MISSING) else: - # print("value = ", value) - value = np.array(value, ndmin=2, dtype=buff.dtype, copy=True) - buff[j] = constants.FLOAT32_FILL - buff[j, :, : value.shape[1]] = value + value = np.array(value, ndmin=2, dtype=np.float32, copy=True) + result = np.full(shape, constants.FLOAT32_FILL, dtype=np.float32) + result[:, : value.shape[1]] = value + return result def sanitise_int_array(value, ndmin, dtype): @@ -468,23 +448,25 @@ def sanitise_int_array(value, ndmin, dtype): return value.astype(dtype) -def sanitise_value_int_1d(buff, j, value): +def sanitise_value_int_1d(shape, value): if value is None: - buff[j] = -1 + return np.full(shape, -1) else: - value = sanitise_int_array(value, 1, buff.dtype) + value = sanitise_int_array(value, 1, np.int32) value = drop_empty_second_dim(value) - buff[j] = -2 - buff[j, : value.shape[0]] = value + result = np.full(shape, -2, dtype=np.int32) + result[: value.shape[0]] = value + return result -def sanitise_value_int_2d(buff, j, value): +def sanitise_value_int_2d(shape, value): if value is None: - buff[j] = -1 + return np.full(shape, -1) else: - value = sanitise_int_array(value, 2, buff.dtype) - buff[j] = -2 - buff[j, :, : value.shape[1]] = value + value = sanitise_int_array(value, 2, np.int32) + result = np.full(shape, -2, dtype=np.int32) + result[:, : value.shape[1]] = value + return result missing_value_map = { @@ -707,36 +689,32 @@ def values(self): return ret def sanitiser_factory(self, shape): - """ - Return a function that sanitised values from this column - and writes into a buffer of the specified shape. - """ - assert len(shape) <= 3 + assert len(shape) <= 2 if self.vcf_field.vcf_type == "Flag": - assert len(shape) == 1 - return sanitise_value_bool + assert len(shape) == 0 + return partial(sanitise_value_bool, shape) elif self.vcf_field.vcf_type == "Float": - if len(shape) == 1: - return sanitise_value_float_scalar - elif len(shape) == 2: - return sanitise_value_float_1d + if len(shape) == 0: + return partial(sanitise_value_float_scalar, shape) + elif len(shape) == 1: + return partial(sanitise_value_float_1d, shape) else: - return sanitise_value_float_2d + return partial(sanitise_value_float_2d, shape) elif self.vcf_field.vcf_type == "Integer": - if len(shape) == 1: - return sanitise_value_int_scalar - elif len(shape) == 2: - return sanitise_value_int_1d + if len(shape) == 0: + return partial(sanitise_value_int_scalar, shape) + elif len(shape) == 1: + return partial(sanitise_value_int_1d, shape) else: - return sanitise_value_int_2d + return partial(sanitise_value_int_2d, shape) else: assert self.vcf_field.vcf_type in ("String", "Character") - if len(shape) == 1: - return sanitise_value_string_scalar - elif len(shape) == 2: - return sanitise_value_string_1d + if len(shape) == 0: + return partial(sanitise_value_string_scalar, shape) + elif len(shape) == 1: + return partial(sanitise_value_string_1d, shape) else: - return sanitise_value_string_2d + return partial(sanitise_value_string_2d, shape) @dataclasses.dataclass @@ -843,6 +821,62 @@ def __exit__(self, exc_type, exc_val, exc_tb): return False +def convert_local_allele_field_types(fields): + """ + Update the specified list of fields to include the LAA field, and to convert + any supported localisable fields to the L* counterpart. + + Note that we currently support only two ALT alleles per sample, and so the + dimensions of these fields are fixed by that requirement. Later versions may + use summary data storted in the ICF to make different choices, if information + about subsequent alleles (not in the actual genotype calls) should also be + stored. + """ + fields_by_name = {field.name: field for field in fields} + gt = fields_by_name["call_genotype"] + if gt.shape[-1] != 2: + raise ValueError("Local alleles only supported on diploid data") + + # TODO check if LA is already in here + + shape = gt.shape[:-1] + chunks = gt.chunks[:-1] + dimensions = gt.dimensions[:-1] + + la = vcz.ZarrArraySpec.new( + vcf_field=None, + name="call_LA", + dtype="i1", + shape=gt.shape, + chunks=gt.chunks, + dimensions=(*dimensions, "local_alleles"), + description=( + "0-based indices into REF+ALT, indicating which alleles" + " are relevant (local) for the current sample" + ), + ) + ad = fields_by_name.get("call_AD", None) + if ad is not None: + # TODO check if call_LAD is in the list already + ad.name = "call_LAD" + ad.vcf_field = None + ad.shape = (*shape, 2) + ad.chunks = (*chunks, 2) + ad.dimensions = (*dimensions, "local_alleles") + ad.description += " (local-alleles)" + + pl = fields_by_name.get("call_PL", None) + if pl is not None: + # TODO check if call_LPL is in the list already + pl.name = "call_LPL" + pl.vcf_field = None + pl.shape = (*shape, 3) + pl.chunks = (*chunks, 3) + pl.description += " (local-alleles)" + pl.dimensions = (*dimensions, "local_" + pl.dimensions[-1]) + return [*fields, la] + + class IntermediateColumnarFormat(collections.abc.Mapping): def __init__(self, path): self.path = pathlib.Path(path) @@ -908,6 +942,10 @@ def num_records(self): def num_partitions(self): return len(self.metadata.partitions) + @property + def samples(self): + return [sample.id for sample in self.metadata.samples] + @property def num_samples(self): return len(self.metadata.samples) @@ -916,6 +954,230 @@ def num_samples(self): def num_fields(self): return len(self.fields) + @property + def root_attrs(self): + return { + "vcf_header": self.vcf_header, + } + + def iter_alleles(self, start, stop, num_alleles): + ref_field = self.fields["REF"] + alt_field = self.fields["ALT"] + + for ref, alt in zip( + ref_field.iter_values(start, stop), + alt_field.iter_values(start, stop), + ): + alleles = np.full(num_alleles, constants.STR_FILL, dtype="O") + alleles[0] = ref[0] + alleles[1 : 1 + len(alt)] = alt + yield alleles + + def iter_id(self, start, stop): + for value in self.fields["ID"].iter_values(start, stop): + if value is not None: + yield value[0] + else: + yield None + + def iter_filters(self, start, stop): + source_field = self.fields["FILTERS"] + lookup = {filt.id: index for index, filt in enumerate(self.metadata.filters)} + + for filter_values in source_field.iter_values(start, stop): + filters = np.zeros(len(self.metadata.filters), dtype=bool) + if filter_values is not None: + for filter_id in filter_values: + try: + filters[lookup[filter_id]] = True + except KeyError: + raise ValueError( + f"Filter '{filter_id}' was not defined in the header." + ) from None + yield filters + + def iter_contig(self, start, stop): + source_field = self.fields["CHROM"] + lookup = { + contig.id: index for index, contig in enumerate(self.metadata.contigs) + } + + for value in source_field.iter_values(start, stop): + # Note: because we are using the indexes to define the lookups + # and we always have an index, it seems that we the contig lookup + # will always succeed. However, if anyone ever does hit a KeyError + # here, please do open an issue with a reproducible example! + yield lookup[value[0]] + + def iter_field(self, field_name, shape, start, stop): + source_field = self.fields[field_name] + sanitiser = source_field.sanitiser_factory(shape) + for value in source_field.iter_values(start, stop): + yield sanitiser(value) + + def iter_genotypes(self, shape, start, stop): + source_field = self.fields["FORMAT/GT"] + for value in source_field.iter_values(start, stop): + genotypes = value[:, :-1] if value is not None else None + phased = value[:, -1] if value is not None else None + sanitised_genotypes = sanitise_value_int_2d(shape, genotypes) + sanitised_phased = sanitise_value_int_1d(shape[:-1], phased) + yield sanitised_genotypes, sanitised_phased + + def generate_schema( + self, variants_chunk_size=None, samples_chunk_size=None, local_alleles=None + ): + m = self.num_records + n = self.num_samples + if local_alleles is None: + local_alleles = False + + schema_instance = vcz.VcfZarrSchema( + format_version=vcz.ZARR_SCHEMA_FORMAT_VERSION, + samples_chunk_size=samples_chunk_size, + variants_chunk_size=variants_chunk_size, + fields=[], + samples=self.metadata.samples, + contigs=self.metadata.contigs, + filters=self.metadata.filters, + ) + + logger.info( + "Generating schema with chunks=" + f"{schema_instance.variants_chunk_size, schema_instance.samples_chunk_size}" + ) + + def spec_from_field(field, array_name=None): + return vcz.ZarrArraySpec.from_field( + field, + num_samples=n, + num_variants=m, + samples_chunk_size=schema_instance.samples_chunk_size, + variants_chunk_size=schema_instance.variants_chunk_size, + array_name=array_name, + ) + + def fixed_field_spec( + name, + dtype, + vcf_field=None, + shape=(m,), + dimensions=("variants",), + chunks=None, + ): + return vcz.ZarrArraySpec.new( + vcf_field=vcf_field, + name=name, + dtype=dtype, + shape=shape, + description="", + dimensions=dimensions, + chunks=chunks or [schema_instance.variants_chunk_size], + ) + + alt_field = self.fields["ALT"] + max_alleles = alt_field.vcf_field.summary.max_number + 1 + + array_specs = [ + fixed_field_spec( + name="variant_contig", + dtype=core.min_int_dtype(0, self.metadata.num_contigs), + ), + fixed_field_spec( + name="variant_filter", + dtype="bool", + shape=(m, self.metadata.num_filters), + dimensions=["variants", "filters"], + chunks=(schema_instance.variants_chunk_size, self.metadata.num_filters), + ), + fixed_field_spec( + name="variant_allele", + dtype="O", + shape=(m, max_alleles), + dimensions=["variants", "alleles"], + chunks=(schema_instance.variants_chunk_size, max_alleles), + ), + fixed_field_spec( + name="variant_id", + dtype="O", + ), + fixed_field_spec( + name="variant_id_mask", + dtype="bool", + ), + ] + name_map = {field.full_name: field for field in self.metadata.fields} + + # Only three of the fixed fields have a direct one-to-one mapping. + array_specs.extend( + [ + spec_from_field(name_map["QUAL"], array_name="variant_quality"), + spec_from_field(name_map["POS"], array_name="variant_position"), + spec_from_field(name_map["rlen"], array_name="variant_length"), + ] + ) + array_specs.extend( + [spec_from_field(field) for field in self.metadata.info_fields] + ) + + gt_field = None + for field in self.metadata.format_fields: + if field.name == "GT": + gt_field = field + continue + array_specs.append(spec_from_field(field)) + + if gt_field is not None and n > 0: + ploidy = max(gt_field.summary.max_number - 1, 1) + shape = [m, n] + chunks = [ + schema_instance.variants_chunk_size, + schema_instance.samples_chunk_size, + ] + dimensions = ["variants", "samples"] + array_specs.append( + vcz.ZarrArraySpec.new( + vcf_field=None, + name="call_genotype_phased", + dtype="bool", + shape=list(shape), + chunks=list(chunks), + dimensions=list(dimensions), + description="", + ) + ) + shape += [ploidy] + chunks += [ploidy] + dimensions += ["ploidy"] + array_specs.append( + vcz.ZarrArraySpec.new( + vcf_field=None, + name="call_genotype", + dtype=gt_field.smallest_dtype(), + shape=list(shape), + chunks=list(chunks), + dimensions=list(dimensions), + description="", + ) + ) + array_specs.append( + vcz.ZarrArraySpec.new( + vcf_field=None, + name="call_genotype_mask", + dtype="bool", + shape=list(shape), + chunks=list(chunks), + dimensions=list(dimensions), + description="", + ) + ) + + if local_alleles: + array_specs = convert_local_allele_field_types(array_specs) + + schema_instance.fields = array_specs + return schema_instance + @dataclasses.dataclass class IcfPartitionMetadata(core.JsonDataclass): @@ -1255,3 +1517,161 @@ def explode_partition(icf_path, partition): def explode_finalise(icf_path): writer = IntermediateColumnarFormatWriter(icf_path) writer.finalise() + + +def inspect(path): + path = pathlib.Path(path) + if not path.exists(): + raise ValueError(f"Path not found: {path}") + if (path / "metadata.json").exists(): + obj = IntermediateColumnarFormat(path) + # NOTE: this is too strict, we should support more general Zarrs, see #276 + elif (path / ".zmetadata").exists(): + obj = vcz.VcfZarr(path) + else: + raise ValueError(f"{path} not in ICF or VCF Zarr format") + return obj.summary_table() + + +def mkschema( + if_path, + out, + *, + variants_chunk_size=None, + samples_chunk_size=None, + local_alleles=None, +): + store = IntermediateColumnarFormat(if_path) + spec = store.generate_schema( + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, + local_alleles=local_alleles, + ) + out.write(spec.asjson()) + + +def convert( + vcfs, + out_path, + *, + variants_chunk_size=None, + samples_chunk_size=None, + worker_processes=1, + local_alleles=None, + show_progress=False, + icf_path=None, +): + if icf_path is None: + cm = temp_icf_path(prefix="vcf2zarr") + else: + cm = contextlib.nullcontext(icf_path) + + with cm as icf_path: + explode( + icf_path, + vcfs, + worker_processes=worker_processes, + show_progress=show_progress, + ) + encode( + icf_path, + out_path, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, + worker_processes=worker_processes, + show_progress=show_progress, + local_alleles=local_alleles, + ) + + +@contextlib.contextmanager +def temp_icf_path(prefix=None): + with tempfile.TemporaryDirectory(prefix=prefix) as tmp: + yield pathlib.Path(tmp) / "icf" + + +def encode( + icf_path, + zarr_path, + schema_path=None, + variants_chunk_size=None, + samples_chunk_size=None, + max_variant_chunks=None, + dimension_separator=None, + max_memory=None, + local_alleles=None, + worker_processes=1, + show_progress=False, +): + # Rough heuristic to split work up enough to keep utilisation high + target_num_partitions = max(1, worker_processes * 4) + encode_init( + icf_path, + zarr_path, + target_num_partitions, + schema_path=schema_path, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, + local_alleles=local_alleles, + max_variant_chunks=max_variant_chunks, + dimension_separator=dimension_separator, + ) + vzw = vcz.VcfZarrWriter(IntermediateColumnarFormat, zarr_path) + vzw.encode_all_partitions( + worker_processes=worker_processes, + show_progress=show_progress, + max_memory=max_memory, + ) + vzw.finalise(show_progress) + vzw.create_index() + + +def encode_init( + icf_path, + zarr_path, + target_num_partitions, + *, + schema_path=None, + variants_chunk_size=None, + samples_chunk_size=None, + local_alleles=None, + max_variant_chunks=None, + dimension_separator=None, + max_memory=None, + worker_processes=1, + show_progress=False, +): + icf_store = IntermediateColumnarFormat(icf_path) + if schema_path is None: + schema_instance = icf_store.generate_schema( + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, + local_alleles=local_alleles, + ) + else: + logger.info(f"Reading schema from {schema_path}") + if variants_chunk_size is not None or samples_chunk_size is not None: + raise ValueError( + "Cannot specify schema along with chunk sizes" + ) # NEEDS TEST + with open(schema_path) as f: + schema_instance = vcz.VcfZarrSchema.fromjson(f.read()) + zarr_path = pathlib.Path(zarr_path) + vzw = vcz.VcfZarrWriter("icf", zarr_path) + return vzw.init( + icf_store, + target_num_partitions=target_num_partitions, + schema=schema_instance, + dimension_separator=dimension_separator, + max_variant_chunks=max_variant_chunks, + ) + + +def encode_partition(zarr_path, partition): + writer_instance = vcz.VcfZarrWriter(IntermediateColumnarFormat, zarr_path) + writer_instance.encode_partition(partition) + + +def encode_finalise(zarr_path, show_progress=False): + writer_instance = vcz.VcfZarrWriter(IntermediateColumnarFormat, zarr_path) + writer_instance.finalise(show_progress=show_progress) diff --git a/bio2zarr/plink.py b/bio2zarr/plink.py index 11e45b2d..6789de92 100644 --- a/bio2zarr/plink.py +++ b/bio2zarr/plink.py @@ -1,177 +1,170 @@ import logging +import pathlib import bed_reader -import humanfriendly -import numcodecs import numpy as np import zarr -from bio2zarr.zarr_utils import ZARR_FORMAT_KWARGS - -from . import core +from bio2zarr import constants, vcz logger = logging.getLogger(__name__) -def encode_genotypes_slice(bed_path, zarr_path, start, stop): - # We need to count the A2 alleles here if we want to keep the - # alleles reported as allele_1, allele_2. It's obvious here what - # the correct approach is, but it is important to note that the - # 0th allele is *not* necessarily the REF for these datasets. - bed = bed_reader.open_bed(bed_path, num_threads=1, count_A1=False) - root = zarr.open(store=zarr_path, mode="a", **ZARR_FORMAT_KWARGS) - gt = core.BufferedArray(root["call_genotype"], start) - gt_mask = core.BufferedArray(root["call_genotype_mask"], start) - gt_phased = core.BufferedArray(root["call_genotype_phased"], start) - variants_chunk_size = gt.array.chunks[0] - assert start % variants_chunk_size == 0 - - logger.debug(f"Reading slice {start}:{stop}") - chunk_start = start - while chunk_start < stop: - chunk_stop = min(chunk_start + variants_chunk_size, stop) - logger.debug(f"Reading bed slice {chunk_start}:{chunk_stop}") - bed_chunk = bed.read(slice(chunk_start, chunk_stop), dtype=np.int8).T - logger.debug(f"Got bed slice {humanfriendly.format_size(bed_chunk.nbytes)}") - # Probably should do this without iterating over rows, but it's a bit - # simpler and lines up better with the array buffering API. The bottleneck - # is in the encoding anyway. +class PlinkFormat: + def __init__(self, path): + self.path = path + self.bed = bed_reader.open_bed(path, num_threads=1, count_A1=False) + self.num_records = self.bed.sid_count + self.samples = list(self.bed.iid) + self.num_samples = len(self.samples) + self.root_attrs = {} + + def iter_alleles(self, start, stop, num_alleles): + ref_field = self.bed.allele_1 + alt_field = self.bed.allele_2 + + for ref, alt in zip( + ref_field[start:stop], + alt_field[start:stop], + ): + alleles = np.full(num_alleles, constants.STR_FILL, dtype="O") + alleles[0] = ref + alleles[1 : 1 + len(alt)] = alt + yield alleles + + def iter_field(self, field_name, shape, start, stop): + assert field_name == "position" # Only position field is supported from plink + yield from self.bed.bp_position[start:stop] + + def iter_genotypes(self, shape, start, stop): + bed_chunk = self.bed.read(slice(start, stop), dtype=np.int8).T + gt = np.zeros(shape, dtype=np.int8) + phased = np.zeros(shape[:-1], dtype=bool) for values in bed_chunk: - j = gt.next_buffer_row() - g = np.zeros_like(gt.buff[j]) - g[values == -127] = -1 - g[values == 2] = 1 - g[values == 1, 0] = 1 - gt.buff[j] = g - j = gt_phased.next_buffer_row() - gt_phased.buff[j] = False - j = gt_mask.next_buffer_row() - gt_mask.buff[j] = gt.buff[j] == -1 - chunk_start = chunk_stop - gt.flush() - gt_phased.flush() - gt_mask.flush() - logger.debug(f"GT slice {start}:{stop} done") + gt[:] = 0 + gt[values == -127] = -1 + gt[values == 2] = 1 + gt[values == 1, 0] = 1 + + yield gt, phased + + def generate_schema( + self, + variants_chunk_size=None, + samples_chunk_size=None, + ): + n = self.bed.iid_count + m = self.bed.sid_count + logging.info(f"Scanned plink with {n} samples and {m} variants") + + schema_instance = vcz.VcfZarrSchema( + format_version=vcz.ZARR_SCHEMA_FORMAT_VERSION, + samples_chunk_size=samples_chunk_size, + variants_chunk_size=variants_chunk_size, + fields=[], + samples=[vcz.Sample(id=sample) for sample in self.bed.iid], + contigs=[], + filters=[], + ) + + logger.info( + "Generating schema with chunks=" + f"{schema_instance.variants_chunk_size, schema_instance.samples_chunk_size}" + ) + + array_specs = [ + vcz.ZarrArraySpec.new( + vcf_field="position", + name="variant_position", + dtype="i4", + shape=[m], + dimensions=["variants"], + chunks=[schema_instance.variants_chunk_size], + description=None, + ), + vcz.ZarrArraySpec.new( + vcf_field=None, + name="variant_allele", + dtype="O", + shape=[m, 2], + dimensions=["variants", "alleles"], + chunks=[schema_instance.variants_chunk_size, 2], + description=None, + ), + vcz.ZarrArraySpec.new( + vcf_field=None, + name="call_genotype_phased", + dtype="bool", + shape=[m, n], + dimensions=["variants", "samples"], + chunks=[ + schema_instance.variants_chunk_size, + schema_instance.samples_chunk_size, + ], + description=None, + ), + vcz.ZarrArraySpec.new( + vcf_field=None, + name="call_genotype", + dtype="i1", + shape=[m, n, 2], + dimensions=["variants", "samples", "ploidy"], + chunks=[ + schema_instance.variants_chunk_size, + schema_instance.samples_chunk_size, + 2, + ], + description=None, + ), + vcz.ZarrArraySpec.new( + vcf_field=None, + name="call_genotype_mask", + dtype="bool", + shape=[m, n, 2], + dimensions=["variants", "samples", "ploidy"], + chunks=[ + schema_instance.variants_chunk_size, + schema_instance.samples_chunk_size, + 2, + ], + description=None, + ), + ] + schema_instance.fields = array_specs + return schema_instance def convert( bed_path, zarr_path, *, - show_progress=False, - worker_processes=1, variants_chunk_size=None, samples_chunk_size=None, + worker_processes=1, + show_progress=False, ): - bed = bed_reader.open_bed(bed_path, num_threads=1) - n = bed.iid_count - m = bed.sid_count - logging.info(f"Scanned plink with {n} samples and {m} variants") - - # FIXME - if samples_chunk_size is None: - samples_chunk_size = 1000 - if variants_chunk_size is None: - variants_chunk_size = 10_000 - - root = zarr.open_group(store=zarr_path, mode="w", **ZARR_FORMAT_KWARGS) - - ploidy = 2 - shape = [m, n] - chunks = [variants_chunk_size, samples_chunk_size] - dimensions = ["variants", "samples"] - - # TODO we should be reusing some logic from vcfzarr here on laying - # out the basic dataset, and using the schema generator. Currently - # we're not using the best Blosc settings for genotypes here. - default_compressor = numcodecs.Blosc(cname="zstd", clevel=7) - - a = root.array( - "sample_id", - data=bed.iid, - shape=bed.iid.shape, - dtype="str", - compressor=default_compressor, - chunks=(samples_chunk_size,), - ) - a.attrs["_ARRAY_DIMENSIONS"] = ["samples"] - logger.debug("Encoded samples") - - # TODO encode these in slices - but read them in one go to avoid - # fetching repeatedly from bim file - a = root.array( - "variant_position", - data=bed.bp_position, - shape=bed.bp_position.shape, - dtype=np.int32, - compressor=default_compressor, - chunks=(variants_chunk_size,), - ) - a.attrs["_ARRAY_DIMENSIONS"] = ["variants"] - logger.debug("encoded variant_position") - - alleles = np.stack([bed.allele_1, bed.allele_2], axis=1) - a = root.array( - "variant_allele", - data=alleles, - shape=alleles.shape, - dtype="str", - compressor=default_compressor, - chunks=(variants_chunk_size, alleles.shape[1]), + plink_format = PlinkFormat(bed_path) + schema_instance = plink_format.generate_schema( + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, ) - a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "alleles"] - logger.debug("encoded variant_allele") - - # TODO remove this? - a = root.empty( - name="call_genotype_phased", - dtype="bool", - shape=list(shape), - chunks=list(chunks), - compressor=default_compressor, - **ZARR_FORMAT_KWARGS, + zarr_path = pathlib.Path(zarr_path) + vzw = vcz.VcfZarrWriter(PlinkFormat, zarr_path) + # Rough heuristic to split work up enough to keep utilisation high + target_num_partitions = max(1, worker_processes * 4) + vzw.init( + plink_format, + target_num_partitions=target_num_partitions, + schema=schema_instance, ) - a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions) - - shape += [ploidy] - dimensions += ["ploidy"] - a = root.empty( - name="call_genotype", - dtype="i1", - shape=list(shape), - chunks=list(chunks), - compressor=default_compressor, - **ZARR_FORMAT_KWARGS, - ) - a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions) - - a = root.empty( - name="call_genotype_mask", - dtype="bool", - shape=list(shape), - chunks=list(chunks), - compressor=default_compressor, - **ZARR_FORMAT_KWARGS, - ) - a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions) - - del bed - - num_slices = max(1, worker_processes * 4) - slices = core.chunk_aligned_slices(a, num_slices) - - total_chunks = sum(a.nchunks for _, a in root.arrays()) - - progress_config = core.ProgressConfig( - total=total_chunks, title="Convert", units="chunks", show=show_progress + vzw.encode_all_partitions( + worker_processes=worker_processes, + show_progress=show_progress, ) - with core.ParallelWorkManager(worker_processes, progress_config) as pwm: - for start, stop in slices: - pwm.submit(encode_genotypes_slice, bed_path, zarr_path, start, stop) + vzw.finalise(show_progress) - # TODO also add atomic swap like VCF. Should be abstracted to - # share basic code for setting up the variation dataset zarr - zarr.consolidate_metadata(zarr_path) + # TODO - index code needs variant_contig + # vzw.create_index() # FIXME do this more efficiently - currently reading the whole thing diff --git a/bio2zarr/vcf2zarr/__init__.py b/bio2zarr/vcf2zarr/__init__.py deleted file mode 100644 index 2d31b881..00000000 --- a/bio2zarr/vcf2zarr/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ -from .icf import ( - IntermediateColumnarFormat, - explode, - explode_finalise, - explode_init, - explode_partition, -) -from .vcz import ( - VcfZarrSchema, - convert, - encode, - encode_finalise, - encode_init, - encode_partition, - inspect, - mkschema, -) -from .verification import verify - -# NOTE some of these aren't intended to be part of the external -# interface (like IntermediateColumnarFormat), but putting -# them into the list to keep the lint nagging under control -__all__ = [ - "IntermediateColumnarFormat", - "explode", - "explode_finalise", - "explode_init", - "explode_partition", - "VcfZarrSchema", - "convert", - "encode", - "encode_finalise", - "encode_init", - "encode_partition", - "inspect", - "mkschema", - "verify", -] diff --git a/bio2zarr/vcf2zarr/vcz.py b/bio2zarr/vcz.py similarity index 68% rename from bio2zarr/vcf2zarr/vcz.py rename to bio2zarr/vcz.py index 074e0712..a3c306ca 100644 --- a/bio2zarr/vcf2zarr/vcz.py +++ b/bio2zarr/vcz.py @@ -1,40 +1,19 @@ -import contextlib import dataclasses import json import logging import os -import os.path import pathlib import shutil -import tempfile -import humanfriendly import numcodecs import numpy as np import zarr -from bio2zarr.zarr_utils import ZARR_FORMAT_KWARGS, zarr_v3 - -from .. import constants, core, provenance -from . import icf +from bio2zarr import constants, core, provenance, zarr_utils logger = logging.getLogger(__name__) - -def inspect(path): - path = pathlib.Path(path) - if not path.exists(): - raise ValueError(f"Path not found: {path}") - if (path / "metadata.json").exists(): - obj = icf.IntermediateColumnarFormat(path) - # NOTE: this is too strict, we should support more general Zarrs, see #276 - elif (path / ".zmetadata").exists(): - obj = VcfZarr(path) - else: - raise ValueError(f"{path} not in ICF or VCF Zarr format") - return obj.summary_table() - - +ZARR_SCHEMA_FORMAT_VERSION = "0.4" DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7) _fixed_field_descriptions = { @@ -181,63 +160,21 @@ def variant_chunk_nbytes(self): return chunk_items * dt.itemsize -ZARR_SCHEMA_FORMAT_VERSION = "0.4" +@dataclasses.dataclass +class Contig: + id: str + length: int = None -def convert_local_allele_field_types(fields): - """ - Update the specified list of fields to include the LAA field, and to convert - any supported localisable fields to the L* counterpart. - - Note that we currently support only two ALT alleles per sample, and so the - dimensions of these fields are fixed by that requirement. Later versions may - use summry data storted in the ICF to make different choices, if information - about subsequent alleles (not in the actual genotype calls) should also be - stored. - """ - fields_by_name = {field.name: field for field in fields} - gt = fields_by_name["call_genotype"] - if gt.shape[-1] != 2: - raise ValueError("Local alleles only supported on diploid data") - - # TODO check if LA is already in here - - shape = gt.shape[:-1] - chunks = gt.chunks[:-1] - dimensions = gt.dimensions[:-1] - - la = ZarrArraySpec.new( - vcf_field=None, - name="call_LA", - dtype="i1", - shape=gt.shape, - chunks=gt.chunks, - dimensions=(*dimensions, "local_alleles"), - description=( - "0-based indices into REF+ALT, indicating which alleles" - " are relevant (local) for the current sample" - ), - ) - ad = fields_by_name.get("call_AD", None) - if ad is not None: - # TODO check if call_LAD is in the list already - ad.name = "call_LAD" - ad.vcf_field = None - ad.shape = (*shape, 2) - ad.chunks = (*chunks, 2) - ad.dimensions = (*dimensions, "local_alleles") - ad.description += " (local-alleles)" - - pl = fields_by_name.get("call_PL", None) - if pl is not None: - # TODO check if call_LPL is in the list already - pl.name = "call_LPL" - pl.vcf_field = None - pl.shape = (*shape, 3) - pl.chunks = (*chunks, 3) - pl.description += " (local-alleles)" - pl.dimensions = (*dimensions, "local_" + pl.dimensions[-1]) - return [*fields, la] +@dataclasses.dataclass +class Sample: + id: str + + +@dataclasses.dataclass +class Filter: + id: str + description: str = "" @dataclasses.dataclass @@ -250,6 +187,28 @@ class VcfZarrSchema(core.JsonDataclass): filters: list fields: list + def __init__( + self, + format_version: str, + samples: list, + contigs: list, + filters: list, + fields: list, + variants_chunk_size: int = None, + samples_chunk_size: int = None, + ): + self.format_version = format_version + self.samples = samples + self.contigs = contigs + self.filters = filters + self.fields = fields + if variants_chunk_size is None: + variants_chunk_size = 1000 + self.variants_chunk_size = variants_chunk_size + if samples_chunk_size is None: + samples_chunk_size = 10_000 + self.samples_chunk_size = samples_chunk_size + def validate(self): """ Checks that the schema is well-formed and within required limits. @@ -279,9 +238,9 @@ def fromdict(d): f"{d['format_version']} != {ZARR_SCHEMA_FORMAT_VERSION}" ) ret = VcfZarrSchema(**d) - ret.samples = [icf.Sample(**sd) for sd in d["samples"]] - ret.contigs = [icf.Contig(**sd) for sd in d["contigs"]] - ret.filters = [icf.Filter(**sd) for sd in d["filters"]] + ret.samples = [Sample(**sd) for sd in d["samples"]] + ret.contigs = [Contig(**sd) for sd in d["contigs"]] + ret.filters = [Filter(**sd) for sd in d["filters"]] ret.fields = [ZarrArraySpec(**sd) for sd in d["fields"]] return ret @@ -289,242 +248,17 @@ def fromdict(d): def fromjson(s): return VcfZarrSchema.fromdict(json.loads(s)) - @staticmethod - def generate( - icf, variants_chunk_size=None, samples_chunk_size=None, local_alleles=None - ): - m = icf.num_records - n = icf.num_samples - if samples_chunk_size is None: - samples_chunk_size = 10_000 - if variants_chunk_size is None: - variants_chunk_size = 1000 - if local_alleles is None: - local_alleles = False - logger.info( - f"Generating schema with chunks={variants_chunk_size, samples_chunk_size}" - ) - - def spec_from_field(field, array_name=None): - return ZarrArraySpec.from_field( - field, - num_samples=n, - num_variants=m, - samples_chunk_size=samples_chunk_size, - variants_chunk_size=variants_chunk_size, - array_name=array_name, - ) - - def fixed_field_spec( - name, - dtype, - vcf_field=None, - shape=(m,), - dimensions=("variants",), - chunks=None, - ): - return ZarrArraySpec.new( - vcf_field=vcf_field, - name=name, - dtype=dtype, - shape=shape, - description="", - dimensions=dimensions, - chunks=chunks or [variants_chunk_size], - ) - - alt_field = icf.fields["ALT"] - max_alleles = alt_field.vcf_field.summary.max_number + 1 - - array_specs = [ - fixed_field_spec( - name="variant_contig", - dtype=core.min_int_dtype(0, icf.metadata.num_contigs), - ), - fixed_field_spec( - name="variant_filter", - dtype="bool", - shape=(m, icf.metadata.num_filters), - dimensions=["variants", "filters"], - chunks=(variants_chunk_size, icf.metadata.num_filters), - ), - fixed_field_spec( - name="variant_allele", - dtype="O", - shape=(m, max_alleles), - dimensions=["variants", "alleles"], - chunks=(variants_chunk_size, max_alleles), - ), - fixed_field_spec( - name="variant_id", - dtype="O", - ), - fixed_field_spec( - name="variant_id_mask", - dtype="bool", - ), - ] - name_map = {field.full_name: field for field in icf.metadata.fields} - - # Only three of the fixed fields have a direct one-to-one mapping. - array_specs.extend( - [ - spec_from_field(name_map["QUAL"], array_name="variant_quality"), - spec_from_field(name_map["POS"], array_name="variant_position"), - spec_from_field(name_map["rlen"], array_name="variant_length"), - ] - ) - array_specs.extend( - [spec_from_field(field) for field in icf.metadata.info_fields] - ) - - gt_field = None - for field in icf.metadata.format_fields: - if field.name == "GT": - gt_field = field - continue - array_specs.append(spec_from_field(field)) - - if gt_field is not None and n > 0: - ploidy = max(gt_field.summary.max_number - 1, 1) - shape = [m, n] - chunks = [variants_chunk_size, samples_chunk_size] - dimensions = ["variants", "samples"] - array_specs.append( - ZarrArraySpec.new( - vcf_field=None, - name="call_genotype_phased", - dtype="bool", - shape=list(shape), - chunks=list(chunks), - dimensions=list(dimensions), - description="", - ) - ) - shape += [ploidy] - chunks += [ploidy] - dimensions += ["ploidy"] - array_specs.append( - ZarrArraySpec.new( - vcf_field=None, - name="call_genotype", - dtype=gt_field.smallest_dtype(), - shape=list(shape), - chunks=list(chunks), - dimensions=list(dimensions), - description="", - ) - ) - array_specs.append( - ZarrArraySpec.new( - vcf_field=None, - name="call_genotype_mask", - dtype="bool", - shape=list(shape), - chunks=list(chunks), - dimensions=list(dimensions), - description="", - ) - ) - - if local_alleles: - array_specs = convert_local_allele_field_types(array_specs) - - return VcfZarrSchema( - format_version=ZARR_SCHEMA_FORMAT_VERSION, - samples_chunk_size=samples_chunk_size, - variants_chunk_size=variants_chunk_size, - fields=array_specs, - samples=icf.metadata.samples, - contigs=icf.metadata.contigs, - filters=icf.metadata.filters, - ) - - -class VcfZarr: - def __init__(self, path): - if not (path / ".zmetadata").exists(): - raise ValueError("Not in VcfZarr format") # NEEDS TEST - self.path = path - self.root = zarr.open(path, mode="r") - - def summary_table(self): - data = [] - arrays = [(core.du(self.path / a.basename), a) for _, a in self.root.arrays()] - arrays.sort(key=lambda x: x[0]) - for stored, array in reversed(arrays): - d = { - "name": array.name, - "dtype": str(array.dtype), - "stored": core.display_size(stored), - "size": core.display_size(array.nbytes), - "ratio": core.display_number(array.nbytes / stored), - "nchunks": str(array.nchunks), - "chunk_size": core.display_size(array.nbytes / array.nchunks), - "avg_chunk_stored": core.display_size(int(stored / array.nchunks)), - "shape": str(array.shape), - "chunk_shape": str(array.chunks), - "compressor": str(array.compressor), - "filters": str(array.filters), - } - data.append(d) - return data - - -def parse_max_memory(max_memory): - if max_memory is None: - # Effectively unbounded - return 2**63 - if isinstance(max_memory, str): - max_memory = humanfriendly.parse_size(max_memory) - logger.info(f"Set memory budget to {core.display_size(max_memory)}") - return max_memory - - -@dataclasses.dataclass -class VcfZarrPartition: - start: int - stop: int - @staticmethod - def generate_partitions(num_records, chunk_size, num_partitions, max_chunks=None): - num_chunks = int(np.ceil(num_records / chunk_size)) - if max_chunks is not None: - num_chunks = min(num_chunks, max_chunks) - partitions = [] - splits = np.array_split(np.arange(num_chunks), min(num_partitions, num_chunks)) - for chunk_slice in splits: - start_chunk = int(chunk_slice[0]) - stop_chunk = int(chunk_slice[-1]) + 1 - start_index = start_chunk * chunk_size - stop_index = min(stop_chunk * chunk_size, num_records) - partitions.append(VcfZarrPartition(start_index, stop_index)) - return partitions - - -VZW_METADATA_FORMAT_VERSION = "0.1" - - -@dataclasses.dataclass -class VcfZarrWriterMetadata(core.JsonDataclass): - format_version: str - icf_path: str - schema: VcfZarrSchema - dimension_separator: str - partitions: list - provenance: dict - - @staticmethod - def fromdict(d): - if d["format_version"] != VZW_METADATA_FORMAT_VERSION: - raise ValueError( - "VcfZarrWriter format version mismatch: " - f"{d['format_version']} != {VZW_METADATA_FORMAT_VERSION}" - ) - ret = VcfZarrWriterMetadata(**d) - ret.schema = VcfZarrSchema.fromdict(ret.schema) - ret.partitions = [VcfZarrPartition(**p) for p in ret.partitions] - return ret +def sanitise_int_array(value, ndmin, dtype): + if isinstance(value, tuple): + value = [ + constants.VCF_INT_MISSING if x is None else x for x in value + ] # NEEDS TEST + value = np.array(value, ndmin=ndmin, copy=True) + value[value == constants.VCF_INT_MISSING] = -1 + value[value == constants.VCF_INT_FILL] = -2 + # TODO watch out for clipping here! + return value.astype(dtype) def compute_la_field(genotypes): @@ -597,14 +331,60 @@ class LocalisableFieldDescriptor: localisable_fields = [ LocalisableFieldDescriptor( - "call_LAD", "FORMAT/AD", icf.sanitise_int_array, compute_lad_field + "call_LAD", "FORMAT/AD", sanitise_int_array, compute_lad_field ), LocalisableFieldDescriptor( - "call_LPL", "FORMAT/PL", icf.sanitise_int_array, compute_lpl_field + "call_LPL", "FORMAT/PL", sanitise_int_array, compute_lpl_field ), ] +@dataclasses.dataclass +class VcfZarrPartition: + start: int + stop: int + + @staticmethod + def generate_partitions(num_records, chunk_size, num_partitions, max_chunks=None): + num_chunks = int(np.ceil(num_records / chunk_size)) + if max_chunks is not None: + num_chunks = min(num_chunks, max_chunks) + partitions = [] + splits = np.array_split(np.arange(num_chunks), min(num_partitions, num_chunks)) + for chunk_slice in splits: + start_chunk = int(chunk_slice[0]) + stop_chunk = int(chunk_slice[-1]) + 1 + start_index = start_chunk * chunk_size + stop_index = min(stop_chunk * chunk_size, num_records) + partitions.append(VcfZarrPartition(start_index, stop_index)) + return partitions + + +VZW_METADATA_FORMAT_VERSION = "0.1" + + +@dataclasses.dataclass +class VcfZarrWriterMetadata(core.JsonDataclass): + format_version: str + source_path: str + schema: VcfZarrSchema + dimension_separator: str + partitions: list + provenance: dict + + @staticmethod + def fromdict(d): + if d["format_version"] != VZW_METADATA_FORMAT_VERSION: + raise ValueError( + "VcfZarrWriter format version mismatch: " + f"{d['format_version']} != {VZW_METADATA_FORMAT_VERSION}" + ) + ret = VcfZarrWriterMetadata(**d) + ret.schema = VcfZarrSchema.fromdict(ret.schema) + ret.partitions = [VcfZarrPartition(**p) for p in ret.partitions] + return ret + + @dataclasses.dataclass class VcfZarrWriteSummary(core.JsonDataclass): num_partitions: int @@ -615,13 +395,14 @@ class VcfZarrWriteSummary(core.JsonDataclass): class VcfZarrWriter: - def __init__(self, path): + def __init__(self, source_type, path): + self.source_type = source_type self.path = pathlib.Path(path) self.wip_path = self.path / "wip" self.arrays_path = self.wip_path / "arrays" self.partitions_path = self.wip_path / "partitions" self.metadata = None - self.icf = None + self.source = None @property def schema(self): @@ -649,19 +430,19 @@ def has_local_alleles(self): def init( self, - icf, + source, *, target_num_partitions, schema, dimension_separator=None, max_variant_chunks=None, ): - self.icf = icf + self.source = source if self.path.exists(): raise ValueError("Zarr path already exists") # NEEDS TEST schema.validate() partitions = VcfZarrPartition.generate_partitions( - self.icf.num_records, + self.source.num_records, schema.variants_chunk_size, target_num_partitions, max_chunks=max_variant_chunks, @@ -673,7 +454,7 @@ def init( ) self.metadata = VcfZarrWriterMetadata( format_version=VZW_METADATA_FORMAT_VERSION, - icf_path=str(self.icf.path), + source_path=str(self.source.path), schema=schema, dimension_separator=dimension_separator, partitions=partitions, @@ -682,15 +463,16 @@ def init( ) self.path.mkdir() - root = zarr.open(store=self.path, mode="a", **ZARR_FORMAT_KWARGS) + root = zarr.open(store=self.path, mode="a", **zarr_utils.ZARR_FORMAT_KWARGS) root.attrs.update( { "vcf_zarr_version": "0.2", - "vcf_header": self.icf.vcf_header, "source": f"bio2zarr-{provenance.__version__}", } ) - # Doing this syncronously - this is fine surely + root.attrs.update(self.source.root_attrs) + + # Doing this synchronously - this is fine surely self.encode_samples(root) self.encode_filter_id(root) self.encode_contig_id(root) @@ -698,7 +480,9 @@ def init( self.wip_path.mkdir() self.arrays_path.mkdir() self.partitions_path.mkdir() - root = zarr.open(store=self.arrays_path, mode="a", **ZARR_FORMAT_KWARGS) + root = zarr.open( + store=self.arrays_path, mode="a", **zarr_utils.ZARR_FORMAT_KWARGS + ) total_chunks = 0 for field in self.schema.fields: @@ -710,15 +494,15 @@ def init( json.dump(self.metadata.asdict(), f, indent=4) return VcfZarrWriteSummary( - num_variants=self.icf.num_records, - num_samples=self.icf.num_samples, + num_variants=self.source.num_records, + num_samples=self.source.num_samples, num_partitions=self.num_partitions, num_chunks=total_chunks, max_encoding_memory=core.display_size(self.get_max_encoding_memory()), ) def encode_samples(self, root): - if self.schema.samples != self.icf.metadata.samples: + if [s.id for s in self.schema.samples] != self.source.samples: raise ValueError("Subsetting or reordering samples not supported currently") array = root.array( "sample_id", @@ -763,15 +547,15 @@ def encode_filter_id(self, root): array.attrs["_ARRAY_DIMENSIONS"] = ["filters"] def init_array(self, root, array_spec, variants_dim_size): - kwargs = dict(ZARR_FORMAT_KWARGS) + kwargs = dict(zarr_utils.ZARR_FORMAT_KWARGS) filters = [numcodecs.get_codec(filt) for filt in array_spec.filters] if array_spec.dtype == "O": - if zarr_v3(): + if zarr_utils.zarr_v3(): filters = [*list(filters), numcodecs.VLenUTF8()] else: kwargs["object_codec"] = numcodecs.VLenUTF8() - if not zarr_v3(): + if not zarr_utils.zarr_v3(): kwargs["dimension_separator"] = self.metadata.dimension_separator shape = list(array_spec.shape) @@ -804,7 +588,7 @@ def load_metadata(self): if self.metadata is None: with open(self.wip_path / "metadata.json") as f: self.metadata = VcfZarrWriterMetadata.fromdict(json.load(f)) - self.icf = icf.IntermediateColumnarFormat(self.metadata.icf_path) + self.source = self.source_type(self.metadata.source_path) def partition_path(self, partition_index): return self.partitions_path / f"p{partition_index}" @@ -826,9 +610,13 @@ def encode_partition(self, partition_index): partition_path.mkdir(exist_ok=True) logger.info(f"Encoding partition {partition_index} to {partition_path}") - self.encode_id_partition(partition_index) - self.encode_filters_partition(partition_index) - self.encode_contig_partition(partition_index) + all_field_names = [field.name for field in self.schema.fields] + if "variant_id" in all_field_names: + self.encode_id_partition(partition_index) + if "variant_filter" in all_field_names: + self.encode_filters_partition(partition_index) + if "variant_contig" in all_field_names: + self.encode_contig_partition(partition_index) self.encode_alleles_partition(partition_index) for array_spec in self.schema.fields: if array_spec.vcf_field is not None: @@ -874,14 +662,15 @@ def finalise_partition_array(self, partition_index, buffered_array): def encode_array_partition(self, array_spec, partition_index): partition = self.metadata.partitions[partition_index] ba = self.init_partition_array(partition_index, array_spec.name) - source_field = self.icf.fields[array_spec.vcf_field] - sanitiser = source_field.sanitiser_factory(ba.buff.shape) - - for value in source_field.iter_values(partition.start, partition.stop): - # We write directly into the buffer in the sanitiser function - # to make it easier to reason about dimension padding + for value in self.source.iter_field( + array_spec.vcf_field, + ba.buff.shape[1:], + partition.start, + partition.stop, + ): j = ba.next_buffer_row() - sanitiser(ba.buff, j, value) + ba.buff[j] = value + self.finalise_partition_array(partition_index, ba) def encode_genotypes_partition(self, partition_index): @@ -889,16 +678,14 @@ def encode_genotypes_partition(self, partition_index): gt = self.init_partition_array(partition_index, "call_genotype") gt_phased = self.init_partition_array(partition_index, "call_genotype_phased") - source_field = self.icf.fields["FORMAT/GT"] - for value in source_field.iter_values(partition.start, partition.stop): + for genotype, phased in self.source.iter_genotypes( + gt.buff.shape[1:], partition.start, partition.stop + ): j = gt.next_buffer_row() - icf.sanitise_value_int_2d( - gt.buff, j, value[:, :-1] if value is not None else None - ) - j = gt_phased.next_buffer_row() - icf.sanitise_value_int_1d( - gt_phased.buff, j, value[:, -1] if value is not None else None - ) + gt.buff[j] = genotype + + j_phased = gt_phased.next_buffer_row() + gt_phased.buff[j_phased] = phased self.finalise_partition_array(partition_index, gt) self.finalise_partition_array(partition_index, gt_phased) @@ -951,7 +738,7 @@ def encode_local_allele_fields_partition(self, partition_index): assert field_map[descriptor.array_name].vcf_field is None buff = self.init_partition_array(partition_index, descriptor.array_name) - source = self.icf.fields[descriptor.vcf_field].iter_values( + source = self.source.fields[descriptor.vcf_field].iter_values( partition.start, partition.stop ) for la in core.first_dim_slice_iter( @@ -966,31 +753,26 @@ def encode_local_allele_fields_partition(self, partition_index): def encode_alleles_partition(self, partition_index): alleles = self.init_partition_array(partition_index, "variant_allele") partition = self.metadata.partitions[partition_index] - ref_field = self.icf.fields["REF"] - alt_field = self.icf.fields["ALT"] - for ref, alt in zip( - ref_field.iter_values(partition.start, partition.stop), - alt_field.iter_values(partition.start, partition.stop), + for value in self.source.iter_alleles( + partition.start, partition.stop, alleles.array.shape[1] ): j = alleles.next_buffer_row() - alleles.buff[j, :] = constants.STR_FILL - alleles.buff[j, 0] = ref[0] - alleles.buff[j, 1 : 1 + len(alt)] = alt + alleles.buff[j] = value + self.finalise_partition_array(partition_index, alleles) def encode_id_partition(self, partition_index): vid = self.init_partition_array(partition_index, "variant_id") vid_mask = self.init_partition_array(partition_index, "variant_id_mask") partition = self.metadata.partitions[partition_index] - field = self.icf.fields["ID"] - for value in field.iter_values(partition.start, partition.stop): + for value in self.source.iter_id(partition.start, partition.stop): j = vid.next_buffer_row() k = vid_mask.next_buffer_row() assert j == k if value is not None: - vid.buff[j] = value[0] + vid.buff[j] = value vid_mask.buff[j] = False else: vid.buff[j] = constants.STR_MISSING @@ -1000,37 +782,22 @@ def encode_id_partition(self, partition_index): self.finalise_partition_array(partition_index, vid_mask) def encode_filters_partition(self, partition_index): - lookup = {filt.id: index for index, filt in enumerate(self.schema.filters)} var_filter = self.init_partition_array(partition_index, "variant_filter") partition = self.metadata.partitions[partition_index] - field = self.icf.fields["FILTERS"] - for value in field.iter_values(partition.start, partition.stop): + for filter_values in self.source.iter_filters(partition.start, partition.stop): j = var_filter.next_buffer_row() - var_filter.buff[j] = False - for f in value: - try: - var_filter.buff[j, lookup[f]] = True - except KeyError: - raise ValueError( - f"Filter '{f}' was not defined in the header." - ) from None + var_filter.buff[j] = filter_values self.finalise_partition_array(partition_index, var_filter) def encode_contig_partition(self, partition_index): - lookup = {contig.id: index for index, contig in enumerate(self.schema.contigs)} contig = self.init_partition_array(partition_index, "variant_contig") partition = self.metadata.partitions[partition_index] - field = self.icf.fields["CHROM"] - for value in field.iter_values(partition.start, partition.stop): + for contig_index in self.source.iter_contig(partition.start, partition.stop): j = contig.next_buffer_row() - # Note: because we are using the indexes to define the lookups - # and we always have an index, it seems that we the contig lookup - # will always succeed. However, if anyone ever does hit a KeyError - # here, please do open an issue with a reproducible example! - contig.buff[j] = lookup[value[0]] + contig.buff[j] = contig_index self.finalise_partition_array(partition_index, contig) @@ -1109,60 +876,8 @@ def finalise(self, show_progress=False): def create_index(self): """Create an index to support efficient region queries.""" - root = zarr.open_group(store=self.path, mode="r+") - - contig = root["variant_contig"] - pos = root["variant_position"] - length = root["variant_length"] - - assert contig.cdata_shape == pos.cdata_shape - - index = [] - - logger.info("Creating region index") - for v_chunk in range(pos.cdata_shape[0]): - c = contig.blocks[v_chunk] - p = pos.blocks[v_chunk] - e = p + length.blocks[v_chunk] - 1 - - # create a row for each contig in the chunk - d = np.diff(c, append=-1) - c_start_idx = 0 - for c_end_idx in np.nonzero(d)[0]: - assert c[c_start_idx] == c[c_end_idx] - index.append( - ( - v_chunk, # chunk index - c[c_start_idx], # contig ID - p[c_start_idx], # start - p[c_end_idx], # end - np.max(e[c_start_idx : c_end_idx + 1]), # max end - c_end_idx - c_start_idx + 1, # num records - ) - ) - c_start_idx = c_end_idx + 1 - - index = np.array(index, dtype=pos.dtype) - kwargs = {} - if not zarr_v3(): - kwargs["dimension_separator"] = self.metadata.dimension_separator - array = root.array( - "region_index", - data=index, - shape=index.shape, - chunks=index.shape, - dtype=index.dtype, - compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=0), - fill_value=None, - **kwargs, - ) - array.attrs["_ARRAY_DIMENSIONS"] = [ - "region_index_values", - "region_index_fields", - ] - - logger.info("Consolidating Zarr metadata") - zarr.consolidate_metadata(self.path) + indexer = VcfZarrIndexer(self.path) + indexer.create_index() ###################### # encode_all_partitions @@ -1187,7 +902,7 @@ def get_max_encoding_memory(self): def encode_all_partitions( self, *, worker_processes=1, show_progress=False, max_memory=None ): - max_memory = parse_max_memory(max_memory) + max_memory = core.parse_max_memory(max_memory) self.load_metadata() num_partitions = self.num_partitions per_worker_memory = self.get_max_encoding_memory() @@ -1229,147 +944,105 @@ def encode_all_partitions( pwm.submit(self.encode_partition, partition_index) -def mkschema( - if_path, - out, - *, - variants_chunk_size=None, - samples_chunk_size=None, - local_alleles=None, -): - store = icf.IntermediateColumnarFormat(if_path) - spec = VcfZarrSchema.generate( - store, - variants_chunk_size=variants_chunk_size, - samples_chunk_size=samples_chunk_size, - local_alleles=local_alleles, - ) - out.write(spec.asjson()) - - -def encode( - if_path, - zarr_path, - schema_path=None, - variants_chunk_size=None, - samples_chunk_size=None, - max_variant_chunks=None, - dimension_separator=None, - max_memory=None, - local_alleles=None, - worker_processes=1, - show_progress=False, -): - # Rough heuristic to split work up enough to keep utilisation high - target_num_partitions = max(1, worker_processes * 4) - encode_init( - if_path, - zarr_path, - target_num_partitions, - schema_path=schema_path, - variants_chunk_size=variants_chunk_size, - samples_chunk_size=samples_chunk_size, - local_alleles=local_alleles, - max_variant_chunks=max_variant_chunks, - dimension_separator=dimension_separator, - ) - vzw = VcfZarrWriter(zarr_path) - vzw.encode_all_partitions( - worker_processes=worker_processes, - show_progress=show_progress, - max_memory=max_memory, - ) - vzw.finalise(show_progress) - vzw.create_index() - - -def encode_init( - icf_path, - zarr_path, - target_num_partitions, - *, - schema_path=None, - variants_chunk_size=None, - samples_chunk_size=None, - local_alleles=None, - max_variant_chunks=None, - dimension_separator=None, - max_memory=None, - worker_processes=1, - show_progress=False, -): - icf_store = icf.IntermediateColumnarFormat(icf_path) - if schema_path is None: - schema = VcfZarrSchema.generate( - icf_store, - variants_chunk_size=variants_chunk_size, - samples_chunk_size=samples_chunk_size, - local_alleles=local_alleles, - ) - else: - logger.info(f"Reading schema from {schema_path}") - if variants_chunk_size is not None or samples_chunk_size is not None: - raise ValueError( - "Cannot specify schema along with chunk sizes" - ) # NEEDS TEST - with open(schema_path) as f: - schema = VcfZarrSchema.fromjson(f.read()) - zarr_path = pathlib.Path(zarr_path) - vzw = VcfZarrWriter(zarr_path) - return vzw.init( - icf_store, - target_num_partitions=target_num_partitions, - schema=schema, - dimension_separator=dimension_separator, - max_variant_chunks=max_variant_chunks, - ) - - -def encode_partition(zarr_path, partition): - writer = VcfZarrWriter(zarr_path) - writer.encode_partition(partition) - - -def encode_finalise(zarr_path, show_progress=False): - writer = VcfZarrWriter(zarr_path) - writer.finalise(show_progress=show_progress) - - -def convert( - vcfs, - out_path, - *, - variants_chunk_size=None, - samples_chunk_size=None, - worker_processes=1, - local_alleles=None, - show_progress=False, - icf_path=None, -): - if icf_path is None: - cm = temp_icf_path(prefix="vcf2zarr") - else: - cm = contextlib.nullcontext(icf_path) - - with cm as icf_path: - icf.explode( - icf_path, - vcfs, - worker_processes=worker_processes, - show_progress=show_progress, - ) - encode( - icf_path, - out_path, - variants_chunk_size=variants_chunk_size, - samples_chunk_size=samples_chunk_size, - worker_processes=worker_processes, - show_progress=show_progress, - local_alleles=local_alleles, - ) +class VcfZarr: + def __init__(self, path): + if not (path / ".zmetadata").exists(): + raise ValueError("Not in VcfZarr format") # NEEDS TEST + self.path = path + self.root = zarr.open(path, mode="r") + + def summary_table(self): + data = [] + arrays = [(core.du(self.path / a.basename), a) for _, a in self.root.arrays()] + arrays.sort(key=lambda x: x[0]) + for stored, array in reversed(arrays): + d = { + "name": array.name, + "dtype": str(array.dtype), + "stored": core.display_size(stored), + "size": core.display_size(array.nbytes), + "ratio": core.display_number(array.nbytes / stored), + "nchunks": str(array.nchunks), + "chunk_size": core.display_size(array.nbytes / array.nchunks), + "avg_chunk_stored": core.display_size(int(stored / array.nchunks)), + "shape": str(array.shape), + "chunk_shape": str(array.chunks), + "compressor": str(array.compressor), + "filters": str(array.filters), + } + data.append(d) + return data + + +class VcfZarrIndexer: + """ + Creates an index for efficient region queries in a VCF Zarr dataset. + """ + + def __init__(self, path): + self.path = pathlib.Path(path) + + def create_index(self): + """Create an index to support efficient region queries.""" + root = zarr.open_group(store=self.path, mode="r+") + + if ( + "variant_contig" not in root + or "variant_position" not in root + or "variant_length" not in root + ): + logger.warning("Cannot create index: required arrays not found") + return + + contig = root["variant_contig"] + pos = root["variant_position"] + length = root["variant_length"] + + assert contig.cdata_shape == pos.cdata_shape + + index = [] + logger.info("Creating region index") + for v_chunk in range(pos.cdata_shape[0]): + c = contig.blocks[v_chunk] + p = pos.blocks[v_chunk] + e = p + length.blocks[v_chunk] - 1 -@contextlib.contextmanager -def temp_icf_path(prefix=None): - with tempfile.TemporaryDirectory(prefix=prefix) as tmp: - yield pathlib.Path(tmp) / "icf" + # create a row for each contig in the chunk + d = np.diff(c, append=-1) + c_start_idx = 0 + for c_end_idx in np.nonzero(d)[0]: + assert c[c_start_idx] == c[c_end_idx] + index.append( + ( + v_chunk, # chunk index + c[c_start_idx], # contig ID + p[c_start_idx], # start + p[c_end_idx], # end + np.max(e[c_start_idx : c_end_idx + 1]), # max end + c_end_idx - c_start_idx + 1, # num records + ) + ) + c_start_idx = c_end_idx + 1 + + index = np.array(index, dtype=pos.dtype) + kwargs = {} + if not zarr_utils.zarr_v3(): + kwargs["dimension_separator"] = "/" + array = root.array( + "region_index", + data=index, + shape=index.shape, + chunks=index.shape, + dtype=index.dtype, + compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=0), + fill_value=None, + **kwargs, + ) + array.attrs["_ARRAY_DIMENSIONS"] = [ + "region_index_values", + "region_index_fields", + ] + + logger.info("Consolidating Zarr metadata") + zarr.consolidate_metadata(self.path) diff --git a/bio2zarr/vcf2zarr/verification.py b/bio2zarr/vcz_verification.py similarity index 99% rename from bio2zarr/vcf2zarr/verification.py rename to bio2zarr/vcz_verification.py index d7a5291e..6faf356d 100644 --- a/bio2zarr/vcf2zarr/verification.py +++ b/bio2zarr/vcz_verification.py @@ -6,7 +6,7 @@ from bio2zarr.zarr_utils import first_dim_iter -from .. import constants +from . import constants def assert_all_missing_float(a): diff --git a/pyproject.toml b/pyproject.toml index b0a75207..2dc08a93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ dev = [ ] [tool.setuptools] -packages = ["bio2zarr", "bio2zarr.vcf2zarr"] +packages = ["bio2zarr"] [tool.setuptools_scm] version_file = "bio2zarr/_version.py" @@ -84,9 +84,9 @@ line-length = 88 indent-width = 4 [tool.ruff.lint] -select = ["E", "F", "B", "W", "I", "N", "UP", "A", "RUF", "PT"] +select = ["E", "F", "B", "W", "I", "N", "UP", "A", "PT"] #Allow uppercase names for e.g. call_AD -ignore = ["N806", "N802", "A001", "A002"] +ignore = ["N806", "N802", "A001", "A002", "RUF"] fixable = ["ALL"] unfixable = [] diff --git a/tests/test_cli.py b/tests/test_cli.py index 2253dfaf..c56b2422 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -84,7 +84,7 @@ class TestWithMocks: vcf_path = "tests/data/vcf/sample.vcf.gz" @pytest.mark.parametrize(("progress", "flag"), [(True, "-P"), (False, "-Q")]) - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode(self, mocked, tmp_path, progress, flag): icf_path = tmp_path / "icf" runner = ct.CliRunner(mix_stderr=False) @@ -101,7 +101,7 @@ def test_vcf_explode(self, mocked, tmp_path, progress, flag): mocked.assert_called_once_with(str(icf_path), (self.vcf_path,), **args) @pytest.mark.parametrize("compressor", ["lz4", "zstd"]) - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_compressor(self, mocked, tmp_path, compressor): icf_path = tmp_path / "icf" runner = ct.CliRunner(mix_stderr=False) @@ -124,7 +124,7 @@ def test_vcf_explode_compressor(self, mocked, tmp_path, compressor): ) @pytest.mark.parametrize("compressor", ["lz4", "zstd"]) - @mock.patch("bio2zarr.vcf2zarr.explode_init") + @mock.patch("bio2zarr.icf.explode_init") def test_vcf_dexplode_init_compressor(self, mocked, tmp_path, compressor): icf_path = tmp_path / "icf" runner = ct.CliRunner(mix_stderr=False) @@ -148,7 +148,7 @@ def test_vcf_dexplode_init_compressor(self, mocked, tmp_path, compressor): ) @pytest.mark.parametrize("compressor", ["LZ4", "asdf"]) - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_bad_compressor(self, mocked, tmp_path, compressor): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -161,7 +161,7 @@ def test_vcf_explode_bad_compressor(self, mocked, tmp_path, compressor): assert "Invalid value for '-C'" in result.stderr mocked.assert_not_called() - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_multiple_vcfs(self, mocked, tmp_path): icf_path = tmp_path / "icf" runner = ct.CliRunner(mix_stderr=False) @@ -178,7 +178,7 @@ def test_vcf_explode_multiple_vcfs(self, mocked, tmp_path): ) @pytest.mark.parametrize("response", ["y", "Y", "yes"]) - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_overwrite_icf_confirm_yes(self, mocked, tmp_path, response): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -197,7 +197,7 @@ def test_vcf_explode_overwrite_icf_confirm_yes(self, mocked, tmp_path, response) ) @pytest.mark.parametrize("response", ["y", "Y", "yes"]) - @mock.patch("bio2zarr.vcf2zarr.encode") + @mock.patch("bio2zarr.icf.encode") def test_vcf_encode_overwrite_zarr_confirm_yes(self, mocked, tmp_path, response): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -218,7 +218,7 @@ def test_vcf_encode_overwrite_zarr_confirm_yes(self, mocked, tmp_path, response) ) @pytest.mark.parametrize("force_arg", ["-f", "--force"]) - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_overwrite_icf_force(self, mocked, tmp_path, force_arg): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -236,7 +236,7 @@ def test_vcf_explode_overwrite_icf_force(self, mocked, tmp_path, force_arg): ) @pytest.mark.parametrize("force_arg", ["-f", "--force"]) - @mock.patch("bio2zarr.vcf2zarr.encode") + @mock.patch("bio2zarr.icf.encode") def test_vcf_encode_overwrite_icf_force(self, mocked, tmp_path, force_arg): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -257,7 +257,7 @@ def test_vcf_encode_overwrite_icf_force(self, mocked, tmp_path, force_arg): **DEFAULT_ENCODE_ARGS, ) - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_missing_vcf(self, mocked, tmp_path): icf_path = tmp_path / "icf" runner = ct.CliRunner(mix_stderr=False) @@ -272,7 +272,7 @@ def test_vcf_explode_missing_vcf(self, mocked, tmp_path): mocked.assert_not_called() @pytest.mark.parametrize("response", ["n", "N", "No"]) - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_overwrite_icf_confirm_no(self, mocked, tmp_path, response): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -287,7 +287,7 @@ def test_vcf_explode_overwrite_icf_confirm_no(self, mocked, tmp_path, response): assert "Aborted" in result.stderr mocked.assert_not_called() - @mock.patch("bio2zarr.vcf2zarr.explode") + @mock.patch("bio2zarr.icf.explode") def test_vcf_explode_missing_and_existing_vcf(self, mocked, tmp_path): icf_path = tmp_path / "icf" runner = ct.CliRunner(mix_stderr=False) @@ -302,7 +302,7 @@ def test_vcf_explode_missing_and_existing_vcf(self, mocked, tmp_path): mocked.assert_not_called() @pytest.mark.parametrize(("progress", "flag"), [(True, "-P"), (False, "-Q")]) - @mock.patch("bio2zarr.vcf2zarr.explode_init", return_value=FakeWorkSummary(5)) + @mock.patch("bio2zarr.icf.explode_init", return_value=FakeWorkSummary(5)) def test_vcf_dexplode_init(self, mocked, tmp_path, progress, flag): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -324,7 +324,7 @@ def test_vcf_dexplode_init(self, mocked, tmp_path, progress, flag): ) @pytest.mark.parametrize("num_partitions", ["-1", "0", "asdf", "1.112"]) - @mock.patch("bio2zarr.vcf2zarr.explode_init", return_value=5) + @mock.patch("bio2zarr.icf.explode_init", return_value=5) def test_vcf_dexplode_init_bad_num_partitions( self, mocked, tmp_path, num_partitions ): @@ -339,7 +339,7 @@ def test_vcf_dexplode_init_bad_num_partitions( assert "Invalid value for '-n'" in result.stderr mocked.assert_not_called() - @mock.patch("bio2zarr.vcf2zarr.explode_init", return_value=5) + @mock.patch("bio2zarr.icf.explode_init", return_value=5) def test_vcf_dexplode_init_no_partitions(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -352,7 +352,7 @@ def test_vcf_dexplode_init_no_partitions(self, mocked, tmp_path): assert "-n/--num-partitions must currently be specified" in result.stderr mocked.assert_not_called() - @mock.patch("bio2zarr.vcf2zarr.explode_partition") + @mock.patch("bio2zarr.icf.explode_partition") def test_vcf_dexplode_partition(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -369,7 +369,7 @@ def test_vcf_dexplode_partition(self, mocked, tmp_path): str(icf_path), 1, **DEFAULT_DEXPLODE_PARTITION_ARGS ) - @mock.patch("bio2zarr.vcf2zarr.explode_partition") + @mock.patch("bio2zarr.icf.explode_partition") def test_vcf_dexplode_partition_one_based(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -386,7 +386,7 @@ def test_vcf_dexplode_partition_one_based(self, mocked, tmp_path): str(icf_path), 0, **DEFAULT_DEXPLODE_PARTITION_ARGS ) - @mock.patch("bio2zarr.vcf2zarr.explode_partition") + @mock.patch("bio2zarr.icf.explode_partition") def test_vcf_dexplode_partition_missing_dir(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -401,7 +401,7 @@ def test_vcf_dexplode_partition_missing_dir(self, mocked, tmp_path): mocked.assert_not_called() @pytest.mark.parametrize("partition", ["-- -1", "asdf", "1.112"]) - @mock.patch("bio2zarr.vcf2zarr.explode_partition") + @mock.patch("bio2zarr.icf.explode_partition") def test_vcf_dexplode_partition_bad_partition(self, mocked, tmp_path, partition): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -416,7 +416,7 @@ def test_vcf_dexplode_partition_bad_partition(self, mocked, tmp_path, partition) assert len(result.stdout) == 0 mocked.assert_not_called() - @mock.patch("bio2zarr.vcf2zarr.explode_finalise") + @mock.patch("bio2zarr.icf.explode_finalise") def test_vcf_dexplode_finalise(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) result = runner.invoke( @@ -427,7 +427,7 @@ def test_vcf_dexplode_finalise(self, mocked, tmp_path): assert len(result.stderr) == 0 mocked.assert_called_once_with(str(tmp_path)) - @mock.patch("bio2zarr.vcf2zarr.inspect") + @mock.patch("bio2zarr.icf.inspect") def test_inspect(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) result = runner.invoke( @@ -438,7 +438,7 @@ def test_inspect(self, mocked, tmp_path): assert len(result.stderr) == 0 mocked.assert_called_once_with(str(tmp_path)) - @mock.patch("bio2zarr.vcf2zarr.mkschema") + @mock.patch("bio2zarr.icf.mkschema") def test_mkschema(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) result = runner.invoke( @@ -455,7 +455,7 @@ def test_mkschema(self, mocked, tmp_path): mocked.assert_called_once() @pytest.mark.parametrize(("progress", "flag"), [(True, "-P"), (False, "-Q")]) - @mock.patch("bio2zarr.vcf2zarr.encode") + @mock.patch("bio2zarr.icf.encode") def test_encode(self, mocked, tmp_path, progress, flag): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -478,7 +478,7 @@ def test_encode(self, mocked, tmp_path, progress, flag): ) @pytest.mark.parametrize(("progress", "flag"), [(True, "-P"), (False, "-Q")]) - @mock.patch("bio2zarr.vcf2zarr.encode_init", return_value=FakeWorkSummary(10)) + @mock.patch("bio2zarr.icf.encode_init", return_value=FakeWorkSummary(10)) def test_dencode_init(self, mocked, tmp_path, progress, flag): icf_path = tmp_path / "icf" icf_path.mkdir() @@ -501,7 +501,7 @@ def test_dencode_init(self, mocked, tmp_path, progress, flag): **args, ) - @mock.patch("bio2zarr.vcf2zarr.encode_init", return_value=5) + @mock.patch("bio2zarr.icf.encode_init", return_value=5) def test_vcf_dencode_init_no_partitions(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) icf_path = tmp_path / "icf" @@ -516,7 +516,7 @@ def test_vcf_dencode_init_no_partitions(self, mocked, tmp_path): assert "-n/--num-partitions must currently be specified" in result.stderr mocked.assert_not_called() - @mock.patch("bio2zarr.vcf2zarr.encode_partition") + @mock.patch("bio2zarr.icf.encode_partition") def test_vcf_dencode_partition(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) zarr_path = tmp_path / "zarr" @@ -533,7 +533,7 @@ def test_vcf_dencode_partition(self, mocked, tmp_path): str(zarr_path), 1, **DEFAULT_DENCODE_PARTITION_ARGS ) - @mock.patch("bio2zarr.vcf2zarr.encode_partition") + @mock.patch("bio2zarr.icf.encode_partition") def test_vcf_dencode_partition_one_based(self, mocked, tmp_path): runner = ct.CliRunner(mix_stderr=False) zarr_path = tmp_path / "zarr" @@ -551,7 +551,7 @@ def test_vcf_dencode_partition_one_based(self, mocked, tmp_path): ) @pytest.mark.parametrize(("progress", "flag"), [(True, "-P"), (False, "-Q")]) - @mock.patch("bio2zarr.vcf2zarr.encode_finalise") + @mock.patch("bio2zarr.icf.encode_finalise") def test_vcf_dencode_finalise(self, mocked, tmp_path, progress, flag): runner = ct.CliRunner(mix_stderr=False) result = runner.invoke( @@ -567,7 +567,7 @@ def test_vcf_dencode_finalise(self, mocked, tmp_path, progress, flag): mocked.assert_called_once_with(str(tmp_path), **args) @pytest.mark.parametrize(("progress", "flag"), [(True, "-P"), (False, "-Q")]) - @mock.patch("bio2zarr.vcf2zarr.convert") + @mock.patch("bio2zarr.icf.convert") def test_convert_vcf(self, mocked, progress, flag): runner = ct.CliRunner(mix_stderr=False) result = runner.invoke( @@ -587,7 +587,7 @@ def test_convert_vcf(self, mocked, progress, flag): ) @pytest.mark.parametrize("response", ["n", "N", "No"]) - @mock.patch("bio2zarr.vcf2zarr.convert") + @mock.patch("bio2zarr.icf.convert") def test_vcf_convert_overwrite_zarr_confirm_no(self, mocked, tmp_path, response): zarr_path = tmp_path / "zarr" zarr_path.mkdir() @@ -617,7 +617,7 @@ def test_convert_plink(self, mocked, progress, flag): mocked.assert_called_once_with("in", "out", **args) @pytest.mark.parametrize("response", ["y", "Y", "yes"]) - @mock.patch("bio2zarr.vcf2zarr.convert") + @mock.patch("bio2zarr.icf.convert") def test_vcf_convert_overwrite_zarr_confirm_yes(self, mocked, tmp_path, response): zarr_path = tmp_path / "zarr" zarr_path.mkdir() diff --git a/tests/test_icf.py b/tests/test_icf.py index 088be7ab..3297b3f6 100644 --- a/tests/test_icf.py +++ b/tests/test_icf.py @@ -6,8 +6,8 @@ import numpy.testing as nt import pytest -from bio2zarr import provenance, vcf2zarr, vcf_utils -from bio2zarr.vcf2zarr import icf as icf_mod +from bio2zarr import icf as icf_mod +from bio2zarr import provenance, vcf_utils, vcz class TestSmallExample: @@ -25,7 +25,7 @@ class TestSmallExample: @pytest.fixture(scope="class") def icf(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.exploded" - return vcf2zarr.explode(out, [self.data_path]) + return icf_mod.explode(out, [self.data_path]) def test_format_version(self, icf): assert icf.metadata.format_version == icf_mod.ICF_METADATA_FORMAT_VERSION @@ -38,10 +38,10 @@ def test_provenance(self, icf): def test_mkschema(self, tmp_path, icf): schema_file = tmp_path / "schema.json" with open(schema_file, "w") as f: - vcf2zarr.mkschema(icf.path, f) + icf_mod.mkschema(icf.path, f) with open(schema_file) as f: - schema1 = vcf2zarr.VcfZarrSchema.fromjson(f.read()) - schema2 = vcf2zarr.VcfZarrSchema.generate(icf) + schema1 = vcz.VcfZarrSchema.fromjson(f.read()) + schema2 = icf.generate_schema() assert schema1 == schema2 def test_summary_table(self, icf): @@ -50,7 +50,7 @@ def test_summary_table(self, icf): assert tuple(sorted(fields)) == self.fields def test_inspect(self, icf): - assert icf.summary_table() == vcf2zarr.inspect(icf.path) + assert icf.summary_table() == icf_mod.inspect(icf.path) def test_mapping_methods(self, icf): assert len(icf) == len(self.fields) @@ -97,7 +97,7 @@ class TestWithGtHeaderNoGenotypes: @pytest.fixture(scope="class") def icf(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.exploded" - return vcf2zarr.explode(out, [self.data_path]) + return icf_mod.explode(out, [self.data_path]) def test_gts(self, icf): values = icf["FORMAT/GT"].values @@ -119,7 +119,7 @@ class TestIcfWriterExample: def test_init_paths(self, tmp_path): icf_path = tmp_path / "x.icf" assert not icf_path.exists() - summary = vcf2zarr.explode_init(icf_path, [self.data_path]) + summary = icf_mod.explode_init(icf_path, [self.data_path]) assert summary.num_partitions == 3 assert icf_path.exists() wip_path = icf_path / "wip" @@ -132,50 +132,50 @@ def test_init_paths(self, tmp_path): def test_finalise_paths(self, tmp_path): icf_path = tmp_path / "x.icf" wip_path = icf_path / "wip" - summary = vcf2zarr.explode_init(icf_path, [self.data_path]) + summary = icf_mod.explode_init(icf_path, [self.data_path]) assert icf_path.exists() for j in range(summary.num_partitions): - vcf2zarr.explode_partition(icf_path, j) + icf_mod.explode_partition(icf_path, j) assert wip_path.exists() - vcf2zarr.explode_finalise(icf_path) + icf_mod.explode_finalise(icf_path) assert icf_path.exists() assert not wip_path.exists() def test_finalise_no_partitions_fails(self, tmp_path): icf_path = tmp_path / "x.icf" - vcf2zarr.explode_init(icf_path, [self.data_path]) + icf_mod.explode_init(icf_path, [self.data_path]) with pytest.raises(FileNotFoundError, match="3 partitions: \\[0, 1, 2\\]"): - vcf2zarr.explode_finalise(icf_path) + icf_mod.explode_finalise(icf_path) @pytest.mark.parametrize("partition", [0, 1, 2]) def test_finalise_missing_partition_fails(self, tmp_path, partition): icf_path = tmp_path / "x.icf" - vcf2zarr.explode_init(icf_path, [self.data_path]) + icf_mod.explode_init(icf_path, [self.data_path]) for j in range(3): if j != partition: - vcf2zarr.explode_partition(icf_path, j) + icf_mod.explode_partition(icf_path, j) with pytest.raises(FileNotFoundError, match=f"1 partitions: \\[{partition}\\]"): - vcf2zarr.explode_finalise(icf_path) + icf_mod.explode_finalise(icf_path) @pytest.mark.parametrize("partition", [0, 1, 2]) def test_explode_partition(self, tmp_path, partition): icf_path = tmp_path / "x.icf" - vcf2zarr.explode_init(icf_path, [self.data_path]) + icf_mod.explode_init(icf_path, [self.data_path]) summary_file = icf_path / "wip" / f"p{partition}.json" assert not summary_file.exists() - vcf2zarr.explode_partition(icf_path, partition) + icf_mod.explode_partition(icf_path, partition) assert summary_file.exists() def test_double_explode_partition(self, tmp_path): partition = 1 icf_path = tmp_path / "x.icf" - vcf2zarr.explode_init(icf_path, [self.data_path]) + icf_mod.explode_init(icf_path, [self.data_path]) summary_file = icf_path / "wip" / f"p{partition}.json" assert not summary_file.exists() - vcf2zarr.explode_partition(icf_path, partition) + icf_mod.explode_partition(icf_path, partition) with open(summary_file) as f: s1 = f.read() - vcf2zarr.explode_partition(icf_path, partition) + icf_mod.explode_partition(icf_path, partition) with open(summary_file) as f: s2 = f.read() assert s1 == s2 @@ -183,19 +183,19 @@ def test_double_explode_partition(self, tmp_path): @pytest.mark.parametrize("partition", [-1, 3, 100]) def test_explode_partition_out_of_range(self, tmp_path, partition): icf_path = tmp_path / "x.icf" - vcf2zarr.explode_init(icf_path, [self.data_path]) + icf_mod.explode_init(icf_path, [self.data_path]) with pytest.raises(ValueError, match="Partition index not in the valid range"): - vcf2zarr.explode_partition(icf_path, partition) + icf_mod.explode_partition(icf_path, partition) def test_explode_same_file_twice(self, tmp_path): icf_path = tmp_path / "x.icf" with pytest.raises(ValueError, match="Duplicate path provided"): - vcf2zarr.explode(icf_path, [self.data_path, self.data_path]) + icf_mod.explode(icf_path, [self.data_path, self.data_path]) def test_explode_same_data_twice(self, tmp_path): icf_path = tmp_path / "x.icf" with pytest.raises(ValueError, match="Overlapping VCF regions"): - vcf2zarr.explode(icf_path, [self.data_path, "tests/data/vcf/sample.bcf"]) + icf_mod.explode(icf_path, [self.data_path, "tests/data/vcf/sample.bcf"]) class TestGeneratedFieldsExample: @@ -211,11 +211,11 @@ def icf(self, tmp_path_factory): # df = sgkit.load_dataset("tmp/fields.vcf.sg") # print(df["variant_IC2"]) # print(df["variant_IC2"].values) - return vcf2zarr.explode(out, [self.data_path]) + return icf_mod.explode(out, [self.data_path]) @pytest.fixture(scope="class") def schema(self, icf): - return vcf2zarr.VcfZarrSchema.generate(icf) + return icf.generate_schema() @pytest.mark.parametrize( ("name", "dtype", "shape", "dimensions"), @@ -279,16 +279,16 @@ class TestInitProperties: def run_explode(self, tmp_path, **kwargs): icf_path = tmp_path / "icf" - vcf2zarr.explode(icf_path, [self.data_path], **kwargs) - return vcf2zarr.IntermediateColumnarFormat(icf_path) + icf_mod.explode(icf_path, [self.data_path], **kwargs) + return icf_mod.IntermediateColumnarFormat(icf_path) def run_dexplode(self, tmp_path, **kwargs): icf_path = tmp_path / "icf" - summary = vcf2zarr.explode_init(icf_path, [self.data_path], **kwargs) + summary = icf_mod.explode_init(icf_path, [self.data_path], **kwargs) for j in range(summary.num_partitions): - vcf2zarr.explode_partition(icf_path, j) - vcf2zarr.explode_finalise(icf_path) - return vcf2zarr.IntermediateColumnarFormat(icf_path) + icf_mod.explode_partition(icf_path, j) + icf_mod.explode_finalise(icf_path) + return icf_mod.IntermediateColumnarFormat(icf_path) @pytest.mark.parametrize( "compressor", @@ -340,40 +340,40 @@ class TestCorruptionDetection: def test_missing_field(self, tmp_path): icf_path = tmp_path / "icf" - vcf2zarr.explode(icf_path, [self.data_path]) + icf_mod.explode(icf_path, [self.data_path]) shutil.rmtree(icf_path / "POS") - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) + icf = icf_mod.IntermediateColumnarFormat(icf_path) with pytest.raises(FileNotFoundError): icf["POS"].values # noqa B018 def test_missing_chunk_index(self, tmp_path): icf_path = tmp_path / "icf" - vcf2zarr.explode(icf_path, [self.data_path]) + icf_mod.explode(icf_path, [self.data_path]) chunk_index_path = icf_path / "POS" / "p0" / "chunk_index" assert chunk_index_path.exists() chunk_index_path.unlink() - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) + icf = icf_mod.IntermediateColumnarFormat(icf_path) with pytest.raises(FileNotFoundError): icf["POS"].values # noqa B018 def test_missing_chunk_file(self, tmp_path): icf_path = tmp_path / "icf" - vcf2zarr.explode(icf_path, [self.data_path]) + icf_mod.explode(icf_path, [self.data_path]) chunk_file = icf_path / "POS" / "p0" / "2" assert chunk_file.exists() chunk_file.unlink() - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) + icf = icf_mod.IntermediateColumnarFormat(icf_path) with pytest.raises(FileNotFoundError): icf["POS"].values # noqa B018 def test_empty_chunk_file(self, tmp_path): icf_path = tmp_path / "icf" - vcf2zarr.explode(icf_path, [self.data_path]) + icf_mod.explode(icf_path, [self.data_path]) chunk_file = icf_path / "POS" / "p0" / "2" assert chunk_file.exists() with open(chunk_file, "w") as _: pass - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) + icf = icf_mod.IntermediateColumnarFormat(icf_path) with pytest.raises(RuntimeError, match="blosc"): icf["POS"].values # noqa B018 @@ -381,21 +381,21 @@ def test_empty_chunk_file(self, tmp_path): @pytest.mark.parametrize("length", [10, 100, 185]) def test_truncated_chunk_file(self, tmp_path, length): icf_path = tmp_path / "icf" - vcf2zarr.explode(icf_path, [self.data_path]) + icf_mod.explode(icf_path, [self.data_path]) chunk_file = icf_path / "POS" / "p0" / "2" with open(chunk_file, "rb") as f: buff = f.read(length) assert len(buff) == length with open(chunk_file, "wb") as f: f.write(buff) - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) + icf = icf_mod.IntermediateColumnarFormat(icf_path) # Either Blosc or pickling errors happen here with pytest.raises((RuntimeError, pickle.UnpicklingError)): icf["POS"].values # noqa B018 def test_chunk_incorrect_length(self, tmp_path): icf_path = tmp_path / "icf" - vcf2zarr.explode(icf_path, [self.data_path]) + icf_mod.explode(icf_path, [self.data_path]) chunk_file = icf_path / "POS" / "p0" / "2" compressor = numcodecs.Blosc(cname="zstd") with open(chunk_file, "rb") as f: @@ -406,7 +406,7 @@ def test_chunk_incorrect_length(self, tmp_path): pkl = pickle.dumps(x[0]) with open(chunk_file, "wb") as f: f.write(compressor.encode(pkl)) - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) + icf = icf_mod.IntermediateColumnarFormat(icf_path) with pytest.raises(ValueError, match="Corruption detected"): icf["POS"].values # noqa B018 with pytest.raises(ValueError, match="Corruption detected"): @@ -419,7 +419,7 @@ class TestSlicing: @pytest.fixture(scope="class") def icf(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.exploded" - return vcf2zarr.explode( + return icf_mod.explode( out, [self.data_path], column_chunk_size=0.0125, worker_processes=0 ) diff --git a/tests/test_local_alleles.py b/tests/test_local_alleles.py index 91c183ac..66ea49b0 100644 --- a/tests/test_local_alleles.py +++ b/tests/test_local_alleles.py @@ -2,7 +2,7 @@ import numpy.testing as nt import pytest -from bio2zarr.vcf2zarr import vcz +from bio2zarr import vcz class TestComputeLA: diff --git a/tests/test_simulated_data.py b/tests/test_simulated_data.py index 6d995b67..0188a799 100644 --- a/tests/test_simulated_data.py +++ b/tests/test_simulated_data.py @@ -5,7 +5,7 @@ import pytest import sgkit as sg -from bio2zarr import vcf2zarr +from bio2zarr import icf def run_simulation(num_samples=2, ploidy=1, seed=42, sequence_length=100_000): @@ -60,7 +60,7 @@ def test_ploidy(self, ploidy, tmp_path): ts = run_simulation(ploidy=ploidy) vcf_path = write_vcf(ts, tmp_path / "sim.vcf") out = tmp_path / "example.vcf.zarr" - vcf2zarr.convert([vcf_path], out) + icf.convert([vcf_path], out) ds = sg.load_dataset(out) assert_ts_ds_equal(ts, ds, ploidy) @@ -81,7 +81,7 @@ def test_multi_contig(self, contig_ids, tmp_path): def validate_tss_vcf_list(self, contig_ids, tss, vcfs, tmp_path): out = tmp_path / "example.vcf.zarr" - vcf2zarr.convert(vcfs, out) + icf.convert(vcfs, out) ds = sg.load_dataset(out).set_index( variants=("variant_contig", "variant_position") ) @@ -103,7 +103,7 @@ def test_indexed(self, indexed, tmp_path): ts = run_simulation(num_samples=12, seed=34) vcf_path = write_vcf(ts, tmp_path / "sim.vcf", indexed=indexed) out = tmp_path / "example.vcf.zarr" - vcf2zarr.convert([vcf_path], out) + icf.convert([vcf_path], out) ds = sg.load_dataset(out) assert_ts_ds_equal(ts, ds) @@ -138,4 +138,4 @@ def test_different_lengths(self, tmp_path): vcfs.append(vcf_path) out = tmp_path / "example.vcf.zarr" with pytest.raises(ValueError, match="Incompatible contig definitions"): - vcf2zarr.convert(vcfs, out) + icf.convert(vcfs, out) diff --git a/tests/test_vcf_examples.py b/tests/test_vcf_examples.py index 31b03ab0..52919722 100644 --- a/tests/test_vcf_examples.py +++ b/tests/test_vcf_examples.py @@ -9,7 +9,7 @@ import sgkit as sg import xarray.testing as xt -from bio2zarr import constants, provenance, vcf2zarr +from bio2zarr import constants, icf, provenance, vcz_verification def assert_dataset_equal(ds1, ds2, drop_vars=None): @@ -25,7 +25,7 @@ class TestSmallExample: @pytest.fixture(scope="class") def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.vcf.zarr" - vcf2zarr.convert([self.data_path], out) + icf.convert([self.data_path], out) return sg.load_dataset(out) def test_filters(self, ds): @@ -246,7 +246,7 @@ def test_call_HQ(self, ds): def test_no_genotypes(self, ds, tmp_path): path = "tests/data/vcf/sample_no_genotypes.vcf.gz" out = tmp_path / "example.vcf.zarr" - vcf2zarr.convert([path], out) + icf.convert([path], out) ds2 = sg.load_dataset(out) assert len(ds2["sample_id"]) == 0 for field_name in ds: @@ -266,7 +266,7 @@ def test_chunk_size( self, ds, tmp_path, variants_chunk_size, samples_chunk_size, y_chunks, x_chunks ): out = tmp_path / "example.vcf.zarr" - vcf2zarr.convert( + icf.convert( [self.data_path], out, variants_chunk_size=variants_chunk_size, @@ -308,23 +308,23 @@ def test_split(self, ds, tmp_path, worker_processes, rotate): # Rotate the list to check we are OK with different orderings files.rotate(rotate) assert len(files) == 3 - vcf2zarr.convert(files, out, worker_processes=worker_processes) + icf.convert(files, out, worker_processes=worker_processes) ds2 = sg.load_dataset(out) xt.assert_equal(ds, ds2) @pytest.mark.parametrize("worker_processes", [0, 1, 2]) def test_full_pipeline(self, ds, tmp_path, worker_processes): exploded = tmp_path / "example.exploded" - vcf2zarr.explode( + icf.explode( exploded, [self.data_path], worker_processes=worker_processes, ) schema = tmp_path / "schema.json" with open(schema, "w") as f: - vcf2zarr.mkschema(exploded, f) + icf.mkschema(exploded, f) out = tmp_path / "example.zarr" - vcf2zarr.encode(exploded, out, schema, worker_processes=worker_processes) + icf.encode(exploded, out, schema, worker_processes=worker_processes) ds2 = sg.load_dataset(out) xt.assert_equal(ds, ds2) @@ -334,9 +334,9 @@ def test_max_variant_chunks( self, ds, tmp_path, max_variant_chunks, variants_chunk_size ): exploded = tmp_path / "example.exploded" - vcf2zarr.explode(exploded, [self.data_path]) + icf.explode(exploded, [self.data_path]) out = tmp_path / "example.zarr" - vcf2zarr.encode( + icf.encode( exploded, out, variants_chunk_size=variants_chunk_size, @@ -352,7 +352,7 @@ def test_max_variant_chunks( @pytest.mark.parametrize("worker_processes", [0, 1, 2]) def test_worker_processes(self, ds, tmp_path, worker_processes): out = tmp_path / "example.vcf.zarr" - vcf2zarr.convert( + icf.convert( [self.data_path], out, variants_chunk_size=3, @@ -364,12 +364,12 @@ def test_worker_processes(self, ds, tmp_path, worker_processes): def test_inspect(self, tmp_path): # TODO pretty weak test, we should be doing this better somewhere else out = tmp_path / "example.vcf.zarr" - vcf2zarr.convert( + icf.convert( [self.data_path], out, variants_chunk_size=3, ) - data = vcf2zarr.inspect(out) + data = icf.inspect(out) assert len(data) > 0 for row in data: assert "name" in row @@ -387,7 +387,7 @@ def test_missing_contig_vcf(self, ds, tmp_path, path): # but the ordering of contigs has been permuted. This seems to be the # sample across VCF and BCF with tabix and VSI indexes zarr_path = tmp_path / "zarr" - vcf2zarr.convert([path], zarr_path) + icf.convert([path], zarr_path) ds2 = sg.load_dataset(zarr_path) contig_id_2 = ["19", "X", "20"] assert list(ds2["contig_id"].values) == contig_id_2 @@ -452,7 +452,7 @@ def test_region_index(self, ds): def test_small_example_all_missing_gts(self, ds, tmp_path_factory): data_path = "tests/data/vcf/sample_all_missing_gts.vcf.gz" out = tmp_path_factory.mktemp("data") / "example.vcf.zarr" - vcf2zarr.convert([data_path], out, worker_processes=0) + icf.convert([data_path], out, worker_processes=0) ds2 = sg.load_dataset(out) assert_dataset_equal( @@ -479,7 +479,7 @@ class TestSmallExampleLocalAlleles: @pytest.fixture(scope="class") def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.vcf.zarr" - vcf2zarr.convert([self.data_path], out, local_alleles=True) + icf.convert([self.data_path], out, local_alleles=True) return sg.load_dataset(out) def test_call_LA(self, ds): @@ -526,7 +526,7 @@ class TestTriploidExample: def ds(self, tmp_path_factory, request): data_path = f"tests/data/vcf/{request.param}.vcf.gz" out = tmp_path_factory.mktemp("data") / "example.vcf.zarr" - vcf2zarr.convert([data_path], out, local_alleles=False) + icf.convert([data_path], out, local_alleles=False) return sg.load_dataset(out) @pytest.mark.parametrize("name", ["triploid", "triploid2", "triploid3"]) @@ -536,7 +536,7 @@ def test_error_with_local_alleles(self, tmp_path_factory, name): with pytest.raises( ValueError, match=re.escape("Local alleles only supported on diploid") ): - vcf2zarr.convert([data_path], out, local_alleles=True) + icf.convert([data_path], out, local_alleles=True) def test_ok_without_local_alleles(self, ds): nt.assert_array_equal(ds.call_genotype.values, [[[0, 0, 0]]]) @@ -548,7 +548,7 @@ class TestWithGtHeaderNoGenotypes: @pytest.fixture(scope="class") def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.vcf.zarr" - vcf2zarr.convert([self.data_path], out, worker_processes=0) + icf.convert([self.data_path], out, worker_processes=0) return sg.load_dataset(out) def test_gts(self, ds): @@ -561,7 +561,7 @@ class Test1000G2020Example: @pytest.fixture(scope="class") def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.vcf.zarr" - vcf2zarr.convert([self.data_path], out, worker_processes=0) + icf.convert([self.data_path], out, worker_processes=0) return sg.load_dataset(out) def test_position(self, ds): @@ -672,7 +672,7 @@ class Test1000G2020ExampleLocalAlleles: @pytest.fixture(scope="class") def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.vcf.zarr" - vcf2zarr.convert([self.data_path], out, worker_processes=0, local_alleles=True) + icf.convert([self.data_path], out, worker_processes=0, local_alleles=True) return sg.load_dataset(out) def test_position(self, ds): @@ -761,7 +761,7 @@ class Test1000G2020AnnotationsExample: def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.zarr" # TODO capture warnings from htslib here - vcf2zarr.convert([self.data_path], out, worker_processes=0) + icf.convert([self.data_path], out, worker_processes=0) return sg.load_dataset(out) def test_position(self, ds): @@ -995,7 +995,7 @@ class TestGeneratedFieldsExample: @pytest.fixture(scope="class") def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "vcf.zarr" - vcf2zarr.convert([self.data_path], out) + icf.convert([self.data_path], out) return sg.load_dataset(out) def test_info_string1(self, ds): @@ -1037,14 +1037,14 @@ class TestSplitFileErrors: def test_entirely_incompatible(self, tmp_path): path = "tests/data/vcf/" with pytest.raises(ValueError, match="Incompatible"): - vcf2zarr.explode_init( + icf.explode_init( tmp_path / "if", [path + "sample.vcf.gz", path + "1kg_2020_chrM.bcf"] ) def test_duplicate_paths(self, tmp_path): path = "tests/data/vcf/" with pytest.raises(ValueError, match="Duplicate"): - vcf2zarr.explode_init(tmp_path / "if", [path + "sample.vcf.gz"] * 2) + icf.explode_init(tmp_path / "if", [path + "sample.vcf.gz"] * 2) @pytest.mark.parametrize( @@ -1064,8 +1064,8 @@ def test_duplicate_paths(self, tmp_path): def test_by_validating(name, tmp_path): path = f"tests/data/vcf/{name}" out = tmp_path / "test.zarr" - vcf2zarr.convert([path], out, worker_processes=0) - vcf2zarr.verify(path, out) + icf.convert([path], out, worker_processes=0) + vcz_verification.verify(path, out) @pytest.mark.parametrize( @@ -1082,8 +1082,8 @@ def test_by_validating_split(source, suffix, files, tmp_path): source_path = f"tests/data/vcf/{source}" split_files = [f"{source_path}.{suffix}/{f}" for f in files] out = tmp_path / "test.zarr" - vcf2zarr.convert(split_files, out, worker_processes=0) - vcf2zarr.verify(source_path, out) + icf.convert(split_files, out, worker_processes=0) + vcz_verification.verify(source_path, out) def test_split_explode(tmp_path): @@ -1093,16 +1093,16 @@ def test_split_explode(tmp_path): "tests/data/vcf/sample.vcf.gz.3.split/X.vcf.gz", ] out = tmp_path / "test.explode" - work_summary = vcf2zarr.explode_init(out, paths, target_num_partitions=15) + work_summary = icf.explode_init(out, paths, target_num_partitions=15) assert work_summary.num_partitions == 3 with pytest.raises(FileNotFoundError): - pcvcf = vcf2zarr.IntermediateColumnarFormat(out) + pcvcf = icf.IntermediateColumnarFormat(out) for j in range(work_summary.num_partitions): - vcf2zarr.explode_partition(out, j) - vcf2zarr.explode_finalise(out) - pcvcf = vcf2zarr.IntermediateColumnarFormat(out) + icf.explode_partition(out, j) + icf.explode_finalise(out) + pcvcf = icf.IntermediateColumnarFormat(out) summary_d = pcvcf.fields["POS"].vcf_field.summary.asdict() # The compressed size can vary with different numcodecs versions assert summary_d["compressed_size"] in [571, 573, 587] @@ -1114,15 +1114,15 @@ def test_split_explode(tmp_path): "max_value": 1235237, "min_value": 10, } - vcf2zarr.encode(out, tmp_path / "test.zarr") - vcf2zarr.verify("tests/data/vcf/sample.vcf.gz", tmp_path / "test.zarr") + icf.encode(out, tmp_path / "test.zarr") + vcz_verification.verify("tests/data/vcf/sample.vcf.gz", tmp_path / "test.zarr") def test_missing_filter(tmp_path): path = "tests/data/vcf/sample_missing_filter.vcf.gz" zarr_path = tmp_path / "zarr" with pytest.raises(ValueError, match="Filter 'q10' was not defined in the header"): - vcf2zarr.convert([path], zarr_path) + icf.convert([path], zarr_path) class TestOutOfOrderFields: @@ -1133,7 +1133,7 @@ class TestOutOfOrderFields: @pytest.fixture(scope="class") def ds(self, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "ooo_example.vcf.zarr" - vcf2zarr.convert([self.data_path1, self.data_path2], out) + icf.convert([self.data_path1, self.data_path2], out) return sg.load_dataset(out) def test_filters(self, ds): diff --git a/tests/test_vcf_hypothesis.py b/tests/test_vcf_hypothesis.py index 2e90f34b..61d663ef 100644 --- a/tests/test_vcf_hypothesis.py +++ b/tests/test_vcf_hypothesis.py @@ -4,7 +4,7 @@ from hypothesis import HealthCheck, given, note, settings from hypothesis_vcf import vcf -from bio2zarr import vcf2zarr +from bio2zarr import icf # Make sure POS starts at 1, since CSI indexing doesn't seem to support zero-based @@ -31,6 +31,4 @@ def test_hypothesis_generated_vcf(tmp_path, vcf_string): pysam.tabix_index(str(path), preset="vcf", force=True, csi=True) # test that we can convert VCFs to Zarr without error - vcf2zarr.convert( - [str(path) + ".gz"], zarr_path, icf_path=icf_path, worker_processes=0 - ) + icf.convert([str(path) + ".gz"], zarr_path, icf_path=icf_path, worker_processes=0) diff --git a/tests/test_vcz.py b/tests/test_vcz.py index 1d15568b..aafbcde7 100644 --- a/tests/test_vcz.py +++ b/tests/test_vcz.py @@ -8,9 +8,8 @@ import xarray.testing as xt import zarr -from bio2zarr import core, vcf2zarr -from bio2zarr.vcf2zarr import icf as icf_mod -from bio2zarr.vcf2zarr import vcz as vcz_mod +from bio2zarr import core, vcz +from bio2zarr import icf as icf_mod from bio2zarr.zarr_utils import zarr_v3 @@ -22,7 +21,7 @@ def vcf_file(): @pytest.fixture(scope="module") def icf_path(vcf_file, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.exploded" - vcf2zarr.explode(out, [vcf_file]) + icf_mod.explode(out, [vcf_file]) return out @@ -30,14 +29,15 @@ def icf_path(vcf_file, tmp_path_factory): def schema_path(icf_path, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.schema.json" with open(out, "w") as f: - vcf2zarr.mkschema(icf_path, f) + icf_mod.mkschema(icf_path, f) return out @pytest.fixture(scope="module") def schema(schema_path): with open(schema_path) as f: - return vcf2zarr.VcfZarrSchema.fromjson(f.read()) + a = vcz.VcfZarrSchema.fromjson(f.read()) + return a @pytest.fixture(scope="module") @@ -46,15 +46,15 @@ def local_alleles_schema(icf_path, tmp_path_factory): # be much easier. out = tmp_path_factory.mktemp("data") / "example.schema.json" with open(out, "w") as f: - vcf2zarr.mkschema(icf_path, f, local_alleles=True) + icf_mod.mkschema(icf_path, f, local_alleles=True) with open(out) as f: - return vcf2zarr.VcfZarrSchema.fromjson(f.read()) + return vcz.VcfZarrSchema.fromjson(f.read()) @pytest.fixture(scope="module") def zarr_path(icf_path, tmp_path_factory): out = tmp_path_factory.mktemp("data") / "example.zarr" - vcf2zarr.encode(icf_path, out) + icf_mod.encode(icf_path, out) return out @@ -73,13 +73,13 @@ class TestEncodeMaxMemory: ], ) def test_parser(self, arg, expected): - assert vcz_mod.parse_max_memory(arg) == expected + assert core.parse_max_memory(arg) == expected @pytest.mark.parametrize("max_memory", [-1, 0, 1, "100 bytes"]) def test_not_enough_memory(self, tmp_path, icf_path, max_memory): zarr_path = tmp_path / "zarr" with pytest.raises(ValueError, match="Insufficient memory"): - vcf2zarr.encode(icf_path, zarr_path, max_memory=max_memory) + icf_mod.encode(icf_path, zarr_path, max_memory=max_memory) @pytest.mark.parametrize("max_memory", ["315KiB", "500KiB"]) def test_not_enough_memory_for_two( @@ -87,7 +87,7 @@ def test_not_enough_memory_for_two( ): other_zarr_path = tmp_path / "zarr" with caplog.at_level("WARNING"): - vcf2zarr.encode( + icf_mod.encode( icf_path, other_zarr_path, max_memory=max_memory, @@ -107,7 +107,7 @@ def test_zarr_schema_mismatch(self, schema, version): d = schema.asdict() d["format_version"] = version with pytest.raises(ValueError, match="Zarr schema format version mismatch"): - vcf2zarr.VcfZarrSchema.fromdict(d) + vcz.VcfZarrSchema.fromdict(d) @pytest.mark.parametrize("version", ["0.0", "1.0", "xxxxx", 0.1]) def test_exploded_metadata_mismatch(self, tmpdir, icf_path, version): @@ -123,12 +123,12 @@ def test_exploded_metadata_mismatch(self, tmpdir, icf_path, version): @pytest.mark.parametrize("version", ["0.0", "1.0", "xxxxx", 0.1]) def test_encode_metadata_mismatch(self, tmpdir, icf_path, version): zarr_path = tmpdir / "zarr" - vcf2zarr.encode_init(icf_path, zarr_path, 1) + icf_mod.encode_init(icf_path, zarr_path, 1) with open(zarr_path / "wip" / "metadata.json") as f: d = json.load(f) d["format_version"] = version with pytest.raises(ValueError, match="VcfZarrWriter format version mismatch"): - vcz_mod.VcfZarrWriterMetadata.fromdict(d) + vcz.VcfZarrWriterMetadata.fromdict(d) @pytest.mark.skipif( @@ -138,14 +138,14 @@ class TestEncodeDimensionSeparator: @pytest.mark.parametrize("dimension_separator", [None, "/"]) def test_directories(self, tmp_path, icf_path, dimension_separator): zarr_path = tmp_path / "zarr" - vcf2zarr.encode(icf_path, zarr_path, dimension_separator=dimension_separator) + icf_mod.encode(icf_path, zarr_path, dimension_separator=dimension_separator) # print(zarr_path) chunk_file = zarr_path / "call_genotype" / "0" / "0" / "0" assert chunk_file.exists() def test_files(self, tmp_path, icf_path): zarr_path = tmp_path / "zarr" - vcf2zarr.encode(icf_path, zarr_path, dimension_separator=".") + icf_mod.encode(icf_path, zarr_path, dimension_separator=".") chunk_file = zarr_path / "call_genotype" / "0.0.0" assert chunk_file.exists() @@ -153,9 +153,7 @@ def test_files(self, tmp_path, icf_path): def test_bad_value(self, tmp_path, icf_path, dimension_separator): zarr_path = tmp_path / "zarr" with pytest.raises(ValueError, match="dimension_separator must be either"): - vcf2zarr.encode( - icf_path, zarr_path, dimension_separator=dimension_separator - ) + icf_mod.encode(icf_path, zarr_path, dimension_separator=dimension_separator) class TestSchemaChunkSize: @@ -168,9 +166,8 @@ class TestSchemaChunkSize: ], ) def test_chunk_sizes(self, icf_path, samples_chunk_size, variants_chunk_size): - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate( - icf, + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema( variants_chunk_size=variants_chunk_size, samples_chunk_size=samples_chunk_size, ) @@ -187,42 +184,42 @@ def test_chunk_sizes(self, icf_path, samples_chunk_size, variants_chunk_size): assert found > 0 def test_default_chunk_size(self, icf_path): - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() assert schema.samples_chunk_size == 10_000 assert schema.variants_chunk_size == 1000 class TestSchemaJsonRoundTrip: def assert_json_round_trip(self, schema): - schema2 = vcf2zarr.VcfZarrSchema.fromjson(schema.asjson()) + schema2 = vcz.VcfZarrSchema.fromjson(schema.asjson()) assert schema == schema2 def test_generated_no_changes(self, icf_path): - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - self.assert_json_round_trip(vcf2zarr.VcfZarrSchema.generate(icf)) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + self.assert_json_round_trip(icf.generate_schema()) def test_generated_no_fields(self, icf_path): - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() schema.fields.clear() self.assert_json_round_trip(schema) def test_generated_no_samples(self, icf_path): - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() schema.samples.clear() self.assert_json_round_trip(schema) def test_generated_change_dtype(self, icf_path): - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() schema.field_map()["variant_position"].dtype = "i8" self.assert_json_round_trip(schema) def test_generated_change_compressor(self, icf_path): - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() schema.field_map()["variant_position"].compressor = {"cname": "FAKE"} self.assert_json_round_trip(schema) @@ -233,8 +230,8 @@ class TestSchemaEncode: ) def test_codec(self, tmp_path, icf_path, cname, clevel, shuffle): zarr_path = tmp_path / "zarr" - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() for array_spec in schema.fields: array_spec.compressor["cname"] = cname array_spec.compressor["clevel"] = clevel @@ -242,7 +239,7 @@ def test_codec(self, tmp_path, icf_path, cname, clevel, shuffle): schema_path = tmp_path / "schema" with open(schema_path, "w") as f: f.write(schema.asjson()) - vcf2zarr.encode(icf_path, zarr_path, schema_path=schema_path) + icf_mod.encode(icf_path, zarr_path, schema_path=schema_path) root = zarr.open(zarr_path) for array_spec in schema.fields: a = root[array_spec.name] @@ -253,26 +250,26 @@ def test_codec(self, tmp_path, icf_path, cname, clevel, shuffle): @pytest.mark.parametrize("dtype", ["i4", "i8"]) def test_genotype_dtype(self, tmp_path, icf_path, dtype): zarr_path = tmp_path / "zarr" - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() schema.field_map()["call_genotype"].dtype = dtype schema_path = tmp_path / "schema" with open(schema_path, "w") as f: f.write(schema.asjson()) - vcf2zarr.encode(icf_path, zarr_path, schema_path=schema_path) + icf_mod.encode(icf_path, zarr_path, schema_path=schema_path) root = zarr.open(zarr_path) assert root["call_genotype"].dtype == dtype @pytest.mark.parametrize("dtype", ["i4", "i8"]) def test_region_index_dtype(self, tmp_path, icf_path, dtype): zarr_path = tmp_path / "zarr" - icf = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate(icf) + icf = icf_mod.IntermediateColumnarFormat(icf_path) + schema = icf.generate_schema() schema.field_map()["variant_position"].dtype = dtype schema_path = tmp_path / "schema" with open(schema_path, "w") as f: f.write(schema.asjson()) - vcf2zarr.encode(icf_path, zarr_path, schema_path=schema_path) + icf_mod.encode(icf_path, zarr_path, schema_path=schema_path) root = zarr.open(zarr_path) assert root["variant_position"].dtype == dtype assert root["region_index"].dtype == dtype @@ -305,10 +302,8 @@ def test_example_schema(self, schema, field, value): assert field.chunk_nbytes == value def test_chunk_size(self, icf_path, tmp_path): - store = vcf2zarr.IntermediateColumnarFormat(icf_path) - schema = vcf2zarr.VcfZarrSchema.generate( - store, samples_chunk_size=2, variants_chunk_size=3 - ) + store = icf_mod.IntermediateColumnarFormat(icf_path) + schema = store.generate_schema(samples_chunk_size=2, variants_chunk_size=3) fields = schema.field_map() assert fields["call_genotype"].chunk_nbytes == 3 * 2 * 2 assert fields["variant_position"].chunk_nbytes == 3 * 4 @@ -318,7 +313,7 @@ def test_chunk_size(self, icf_path, tmp_path): class TestValidateSchema: @pytest.mark.parametrize("size", [2**31, 2**31 + 1, 2**32]) def test_chunk_too_large(self, schema, size): - schema = vcf2zarr.VcfZarrSchema.fromdict(schema.asdict()) + schema = vcz.VcfZarrSchema.fromdict(schema.asdict()) field = schema.field_map()["variant_H2"] field.shape = (size,) field.chunks = (size,) @@ -327,7 +322,7 @@ def test_chunk_too_large(self, schema, size): @pytest.mark.parametrize("size", [2**31 - 1, 2**30]) def test_chunk_not_too_large(self, schema, size): - schema = vcf2zarr.VcfZarrSchema.fromdict(schema.asdict()) + schema = vcz.VcfZarrSchema.fromdict(schema.asdict()) field = schema.field_map()["variant_H2"] field.shape = (size,) field.chunks = (size,) @@ -336,7 +331,7 @@ def test_chunk_not_too_large(self, schema, size): class TestDefaultSchema: def test_format_version(self, schema): - assert schema.format_version == vcz_mod.ZARR_SCHEMA_FORMAT_VERSION + assert schema.format_version == vcz.ZARR_SCHEMA_FORMAT_VERSION def test_chunk_size(self, schema): assert schema.samples_chunk_size == 10000 @@ -550,7 +545,7 @@ class TestVcfZarrWriterExample: def test_init_paths(self, icf_path, tmp_path): zarr_path = tmp_path / "x.zarr" assert not zarr_path.exists() - summary = vcf2zarr.encode_init(icf_path, zarr_path, 7, variants_chunk_size=3) + summary = icf_mod.encode_init(icf_path, zarr_path, 7, variants_chunk_size=3) assert summary.num_partitions == 3 assert zarr_path.exists() wip_path = zarr_path / "wip" @@ -570,57 +565,57 @@ def test_init_paths(self, icf_path, tmp_path): def test_finalise_paths(self, icf_path, tmp_path): zarr_path = tmp_path / "x.zarr" assert not zarr_path.exists() - summary = vcf2zarr.encode_init(icf_path, zarr_path, 7, variants_chunk_size=3) + summary = icf_mod.encode_init(icf_path, zarr_path, 7, variants_chunk_size=3) wip_path = zarr_path / "wip" assert wip_path.exists() for j in range(summary.num_partitions): - vcf2zarr.encode_partition(zarr_path, j) + icf_mod.encode_partition(zarr_path, j) assert (wip_path / "partitions" / f"p{j}").exists() - vcf2zarr.encode_finalise(zarr_path) + icf_mod.encode_finalise(zarr_path) assert zarr_path.exists() assert not wip_path.exists() def test_finalise_no_partitions_fails(self, icf_path, tmp_path): zarr_path = tmp_path / "x.zarr" - vcf2zarr.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + icf_mod.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) with pytest.raises( FileNotFoundError, match="Partitions not encoded: \\[0, 1, 2\\]" ): - vcf2zarr.encode_finalise(zarr_path) + icf_mod.encode_finalise(zarr_path) @pytest.mark.parametrize("partition", [0, 1, 2]) def test_finalise_missing_partition_fails(self, icf_path, tmp_path, partition): zarr_path = tmp_path / "x.zarr" - vcf2zarr.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + icf_mod.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) for j in range(3): if j != partition: - vcf2zarr.encode_partition(zarr_path, j) + icf_mod.encode_partition(zarr_path, j) with pytest.raises( FileNotFoundError, match=f"Partitions not encoded: \\[{partition}\\]" ): - vcf2zarr.encode_finalise(zarr_path) + icf_mod.encode_finalise(zarr_path) @pytest.mark.parametrize("partition", [0, 1, 2]) def test_encode_partition(self, icf_path, tmp_path, partition): zarr_path = tmp_path / "x.zarr" - vcf2zarr.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + icf_mod.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) partition_path = zarr_path / "wip" / "partitions" / f"p{partition}" assert not partition_path.exists() - vcf2zarr.encode_partition(zarr_path, partition) + icf_mod.encode_partition(zarr_path, partition) assert partition_path.exists() def test_double_encode_partition(self, icf_path, tmp_path, caplog): partition = 1 zarr_path = tmp_path / "x.zarr" - vcf2zarr.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + icf_mod.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) partition_path = zarr_path / "wip" / "partitions" / f"p{partition}" assert not partition_path.exists() - vcf2zarr.encode_partition(zarr_path, partition) + icf_mod.encode_partition(zarr_path, partition) assert partition_path.exists() size = core.du(partition_path) assert size > 0 with caplog.at_level("WARNING"): - vcf2zarr.encode_partition(zarr_path, partition) + icf_mod.encode_partition(zarr_path, partition) assert "Removing existing partition at" in caplog.text assert partition_path.exists() assert core.du(partition_path) == size @@ -628,9 +623,9 @@ def test_double_encode_partition(self, icf_path, tmp_path, caplog): @pytest.mark.parametrize("partition", [-1, 3, 100]) def test_encode_partition_out_of_range(self, icf_path, tmp_path, partition): zarr_path = tmp_path / "x.zarr" - vcf2zarr.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + icf_mod.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) with pytest.raises(ValueError, match="Partition index not in the valid range"): - vcf2zarr.encode_partition(zarr_path, partition) + icf_mod.encode_partition(zarr_path, partition) class TestClobberFixedFields: @@ -677,7 +672,7 @@ def test_variant_fields(self, tmp_path, field): vcf_file = tmp_path / "test.vcf" self.generate_vcf(vcf_file, info_field=field) with pytest.raises(ValueError, match=f"INFO field name.*{field}"): - vcf2zarr.explode(tmp_path / "x.icf", [tmp_path / "test.vcf.gz"]) + icf_mod.explode(tmp_path / "x.icf", [tmp_path / "test.vcf.gz"]) @pytest.mark.parametrize( "field", @@ -691,7 +686,7 @@ def test_call_fields(self, tmp_path, field): vcf_file = tmp_path / "test.vcf" self.generate_vcf(vcf_file, format_field=field) with pytest.raises(ValueError, match=f"FORMAT field name.*{field}"): - vcf2zarr.explode(tmp_path / "x.icf", [tmp_path / "test.vcf.gz"]) + icf_mod.explode(tmp_path / "x.icf", [tmp_path / "test.vcf.gz"]) class TestBadSchemaChanges: @@ -718,12 +713,12 @@ def test_removed_samples(self, tmp_path, schema, icf_path, samples): with open(schema_path, "w") as f: json.dump(d, f) with pytest.raises(ValueError, match="Subsetting or reordering samples"): - vcf2zarr.encode(icf_path, tmp_path / "z", schema_path=schema_path) + icf_mod.encode(icf_path, tmp_path / "z", schema_path=schema_path) class TestInspect: def test_icf(self, icf_path): - df = pd.DataFrame(vcz_mod.inspect(icf_path)) + df = pd.DataFrame(icf_mod.inspect(icf_path)) assert sorted(list(df)) == sorted( [ "name", @@ -765,7 +760,7 @@ def test_icf(self, icf_path): ) def test_vcz(self, zarr_path): - df = pd.DataFrame(vcz_mod.inspect(zarr_path)) + df = pd.DataFrame(icf_mod.inspect(zarr_path)) cols = [ "name", "dtype", @@ -814,9 +809,9 @@ def test_vcz(self, zarr_path): @pytest.mark.parametrize("bad_path", ["/NO_WAY", "TTTTTT"]) def test_no_such_path(self, bad_path): with pytest.raises(ValueError, match=f"Path not found: {bad_path}"): - vcz_mod.inspect(bad_path) + icf_mod.inspect(bad_path) @pytest.mark.parametrize("path", ["./", "tests/data/vcf/sample.vcf.gz"]) def test_unknown_format(self, path): with pytest.raises(ValueError, match="not in ICF or VCF Zarr format"): - vcz_mod.inspect(path) + icf_mod.inspect(path) diff --git a/validation.py b/validation.py index 4397a9ac..d3a56005 100644 --- a/validation.py +++ b/validation.py @@ -6,7 +6,7 @@ import click -from bio2zarr import vcf2zarr +from bio2zarr import icf, vcz_verification # TODO add support here for split vcfs. Perhaps simplest to take a # directory provided as input as indicating this, and then having @@ -44,7 +44,7 @@ def cli(vcfs, worker_processes, force): if force and exploded.exists(): shutil.rmtree(exploded) if not exploded.exists(): - vcf2zarr.explode( + icf.explode( exploded, files, worker_processes=worker_processes, @@ -53,13 +53,13 @@ def cli(vcfs, worker_processes, force): spec = tmp_path / (f.name + ".schema") if force or not spec.exists(): with open(spec, "w") as specfile: - vcf2zarr.mkschema(exploded, specfile) + icf.mkschema(exploded, specfile) zarr = tmp_path / (f.name + ".vcz") if force and zarr.exists(): shutil.rmtree(zarr) if not zarr.exists(): - vcf2zarr.encode( + icf.encode( exploded, zarr, spec, @@ -67,7 +67,7 @@ def cli(vcfs, worker_processes, force): show_progress=True, ) - vcf2zarr.verify(source_file, zarr, show_progress=True) + vcz_verification.verify(source_file, zarr, show_progress=True) if __name__ == "__main__":