Lightweight batch processing utilities for the ndev-kit ecosystem.
nbatch provides a foundation for batch processing operations. It's designed to work seamlessly with napari plugins but has no napari or Qt dependencies.
@batchdecorator - Transform single-item functions into batch-capable functionsBatchContext- Track progress through batch operationsBatchRunner- Orchestrate batch operations with threading, progress callbacks, and cancellationdiscover_files()- Flexible file discovery with natural sorting (like file explorers)batch_logger- Scoped logging for batch operations with headers/footers- Minimal dependencies - Only requires natsort for natural file ordering
- Optional napari integration - Uses napari's threading when available, falls back to standard threads
pip install nbatchFor development:
pip install -e . --group devThe @batch decorator transforms a function that processes a single item into one that handles both single items and batches:
from pathlib import Path
from nbatch import batch
@batch
def process_image(path: Path) -> str:
# Your processing logic here
return path.stem.upper()
# Single item - returns result directly
result = process_image(Path("image.tif"))
# Returns: "IMAGE"
# List of items - returns generator
results = process_image([Path("a.tif"), Path("b.tif")])
list(results)
# Returns: ["A", "B"]
# Directory - discovers files and returns generator
results = process_image(Path("/data/images"))
# Processes all files in directoryUse with_context=True to get progress information:
@batch(with_context=True)
def process_image(path: Path) -> str:
return path.stem
for result, ctx in process_image(files):
print(f"{ctx.progress:.0%} complete: {result}")
# 10% complete: image1
# 20% complete: image2
# ...The BatchContext provides:
ctx.index- Zero-based index of current itemctx.total- Total number of itemsctx.item- The current item being processedctx.progress- Progress as fraction (0.0 to 1.0)ctx.is_first/ctx.is_last- Boolean flags
Control how errors are handled with on_error:
# 'raise' (default) - Re-raise exceptions immediately
@batch(on_error='raise')
def strict_process(path): ...
# 'continue' - Log error and yield None for failed items
@batch(on_error='continue')
def lenient_process(path): ...
# Results: ["good", None, "ok"]
# 'skip' - Log error and skip failed items entirely
@batch(on_error='skip')
def skip_errors(path): ...
# Results: ["good", "ok"]Control which files are processed:
# Custom glob patterns
@batch(patterns='*.tif')
def process_tiffs(path): ...
# Multiple patterns
@batch(patterns=['*.tif', '*.tiff', '*.png'])
def process_images(path): ...
# Non-recursive (top-level only)
@batch(recursive=False)
def process_top_level(path): ...Or use discover_files() directly:
from nbatch import discover_files
# From directory with patterns
files = discover_files("/data/images", patterns=["*.tif", "*.png"])
# From explicit list
files = discover_files([path1, path2, path3])Use batch_logger for structured logging. By default, it outputs to the console (stderr). Optionally log to a file:
from nbatch import batch, batch_logger
@batch(with_context=True)
def process(path):
return path.stem
# Console only (default)
with batch_logger() as log:
for result, ctx in process(files):
log(ctx, f"Processed: {result}")
# With file logging (appends by default)
with batch_logger(log_file="output/process.log", header={"Files": 100}) as log:
for result, ctx in process(files):
log(ctx, f"Processed: {result}")
# Or use log.info(), log.warning(), log.error()
# File only (no console output)
with batch_logger(log_file="output/quiet.log", console=False) as log:
for result, ctx in process(files):
log(ctx, f"Processed: {result}")Log file output:
============================================================
Batch processing started at 2025-01-29 10:30:00
------------------------------------------------------------
Files: 100
============================================================
2025-01-29 10:30:01 - INFO - [1/100] image1.tif - Processed: image1
2025-01-29 10:30:02 - INFO - [2/100] image2.tif - Processed: image2
...
============================================================
Batch processing completed at 2025-01-29 10:35:00
============================================================
BatchRunner provides clean orchestration for widgets with threading, progress callbacks, and cancellation:
from nbatch import batch, BatchRunner
# Define your processing function (pure, testable)
@batch(on_error='continue')
def process_image(path, model, output_dir):
result = model.predict(load_image(path))
save_result(result, output_dir / path.name)
return result
# In your widget class
class MyWidget:
def __init__(self, viewer):
self._viewer = viewer
# Create runner once - reusable for all batches
self.runner = BatchRunner(
on_start=self._on_batch_start,
on_item_complete=self._on_item_complete,
on_complete=self._on_batch_complete,
on_error=self._on_item_error,
on_cancel=self._on_cancelled,
)
self._run_button.clicked.connect(self.run_batch)
self._cancel_button.clicked.connect(self.runner.cancel)
def _on_batch_start(self, total):
"""Called when batch starts with total item count."""
self._progress_bar.setValue(0)
self._progress_bar.setMaximum(total)
def _on_item_complete(self, result, ctx):
"""Called after each item completes."""
self._progress_bar.setValue(ctx.index + 1)
# Optionally add result to viewer
if result is not None:
self._viewer.add_image(result, name=f"Result {ctx.index}")
def _on_batch_complete(self):
errors = self.runner.error_count
if errors > 0:
self._progress_bar.label = f"Done with {errors} errors"
else:
self._progress_bar.label = "Complete!"
def _on_item_error(self, ctx, exception):
self._progress_bar.label = f"Error on {ctx.item.name}"
def _on_cancelled(self):
self._progress_bar.label = "Cancelled"
def run_batch(self):
"""Triggered by 'Run' button - just one line!"""
self.runner.run(
process_image,
self.files,
model=self.model,
output_dir=self.output_dir,
log_file=self.output_dir / "batch.log",
)For more control, use napari's @thread_worker with the @batch decorator:
from napari.qt.threading import thread_worker
from nbatch import batch, batch_logger
@batch(with_context=True, on_error='continue')
def process_image(path, model, output_dir):
# Your processing logic
result = model.predict(load_image(path))
save_result(result, output_dir / path.name)
return result
# In your widget
def run_batch(self):
@thread_worker
def _run():
with batch_logger(log_file=self.output_dir / 'log.txt') as log:
for result, ctx in process_image(
self.input_dir,
model=self.model,
output_dir=self.output_dir,
):
log(ctx, f"Processed: {ctx.item.name}")
yield ctx # Enables progress updates
worker = _run()
worker.yielded.connect(
lambda ctx: self.progress_bar.setValue(int(ctx.progress * 100))
)
worker.start()@batch(
on_error: Literal['raise', 'continue', 'skip'] = 'raise',
with_context: bool = False,
patterns: str | Sequence[str] = '*',
recursive: bool = False,
)@dataclass(frozen=True)
class BatchContext:
index: int # Zero-based index
total: int # Total items
item: Any # Current item
@property
def progress(self) -> float: ... # (index + 1) / total
@property
def is_first(self) -> bool: ... # index == 0
@property
def is_last(self) -> bool: ... # index == total - 1def discover_files(
source: str | Path | Iterable[str | Path],
patterns: str | Sequence[str] = '*',
recursive: bool = False,
) -> list[Path]: ...@contextmanager
def batch_logger(
log_file: str | Path | None = None, # Optional file path
header: Mapping[str, object] | None = None, # Metadata to write at start
level: int = logging.INFO,
console: bool = True, # Output to stderr
file_mode: Literal['w', 'a'] = 'a', # Append by default
) -> Generator[BatchLogger, None, None]: ...class BatchRunner:
def __init__(
self,
on_start: Callable[[int], None] | None = None,
on_item_complete: Callable[[Any, BatchContext], None] | None = None,
on_complete: Callable[[], None] | None = None,
on_error: Callable[[BatchContext, Exception], None] | None = None,
on_cancel: Callable[[], None] | None = None,
): ...
def run(
self,
func: Callable,
items: Any,
*args,
threaded: bool = True,
log_file: str | Path | None = None,
log_header: Mapping[str, object] | None = None,
patterns: str | Sequence[str] = '*',
recursive: bool = False,
**kwargs, # Passed to func!
) -> None: ...
def cancel(self) -> None: ...
@property
def is_running(self) -> bool: ...
@property
def was_cancelled(self) -> bool: ...
@property
def error_count(self) -> int: ... # Errors in current/last batchContributions are welcome! Please ensure tests pass before submitting a pull request:
pytest --cov=src/nbatchDistributed under the terms of the BSD-3 license.
nbatch is part of the ndev-kit ecosystem for no-code bioimage analysis in napari.