diff --git a/examples/cookbook/unstructured/.env.example b/examples/cookbook/unstructured/.env.example new file mode 100644 index 00000000..de44a697 --- /dev/null +++ b/examples/cookbook/unstructured/.env.example @@ -0,0 +1,3 @@ +MOSS_PROJECT_ID=your-project-id +MOSS_PROJECT_KEY=your-project-key +MOSS_INDEX_NAME=unstructured-docs diff --git a/examples/cookbook/unstructured/README.md b/examples/cookbook/unstructured/README.md new file mode 100644 index 00000000..dc403f9e --- /dev/null +++ b/examples/cookbook/unstructured/README.md @@ -0,0 +1,71 @@ +# Unstructured + Moss Cookbook + +Parse PDFs, Word docs, HTML, images, and other file formats with [Unstructured](https://unstructured.io/) and index the extracted content into [Moss](https://moss.dev) for semantic search. + +This cookbook shows a focused ingestion pipeline: + +1. Partition raw files with Unstructured +2. Chunk extracted elements +3. Preserve source metadata on every chunk +4. Upsert chunks into a Moss index with stable document IDs +5. Query the loaded Moss index + +## Setup + +```bash +cd examples/cookbook/unstructured +uv sync +cp .env.example .env +``` + +Fill in `.env` with your Moss credentials: + +```bash +MOSS_PROJECT_ID=your-project-id +MOSS_PROJECT_KEY=your-project-key +MOSS_INDEX_NAME=unstructured-docs +``` + +## Usage + +Index the sample documents: + +```bash +uv run python ingest.py --input-dir sample_docs +``` + +Index your own folder: + +```bash +uv run python ingest.py --input-dir /path/to/files --index-name company-docs +``` + +Ask a query after ingestion: + +```bash +uv run python ingest.py \ + --input-dir sample_docs \ + --query "What does the onboarding policy say about access?" +``` + +## What Gets Stored + +Each Moss document is one Unstructured chunk: + +```python +DocumentInfo( + id="docs/handbook.pdf::chunk-0003", + text="Extracted chunk text...", + metadata={ + "source_path": "docs/handbook.pdf", + "filename": "handbook.pdf", + "filetype": ".pdf", + "chunk_index": "3", + "category": "CompositeElement", + "page_number": "4", + "text_hash": "a1b2c3d4e5f6", + }, +) +``` + +Chunk IDs are deterministic from the relative file path and chunk index. Rerunning the same ingestion uses `MutationOptions(upsert=True)`, so chunks are updated in place. diff --git a/examples/cookbook/unstructured/ingest.py b/examples/cookbook/unstructured/ingest.py new file mode 100644 index 00000000..a5007a96 --- /dev/null +++ b/examples/cookbook/unstructured/ingest.py @@ -0,0 +1,227 @@ +import argparse +import asyncio +import hashlib +import json +import os +from pathlib import Path +from typing import Any + +from dotenv import load_dotenv +from moss import DocumentInfo, MossClient, MutationOptions, QueryOptions +from unstructured.chunking.title import chunk_by_title +from unstructured.partition.auto import partition + + +DEFAULT_MAX_CHARACTERS = 1_500 +DEFAULT_NEW_AFTER_N_CHARS = 1_200 +DEFAULT_COMBINE_UNDER_N_CHARS = 300 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Parse files with Unstructured and index chunks into Moss." + ) + parser.add_argument( + "--input-dir", + default="sample_docs", + help="Directory containing PDFs, Word docs, HTML, images, or other files.", + ) + parser.add_argument( + "--index-name", + default=os.getenv("MOSS_INDEX_NAME", "unstructured-docs"), + help="Moss index name.", + ) + parser.add_argument( + "--query", + help="Optional query to run after ingestion.", + ) + parser.add_argument( + "--top-k", + type=int, + default=5, + help="Number of Moss search results to return for --query.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Number of chunks to upsert per Moss request.", + ) + return parser.parse_args() + + +def _stringify_metadata_value(value: Any) -> str: + if value is None: + return "" + if isinstance(value, (str, int, float, bool)): + return str(value) + return json.dumps(value, sort_keys=True, default=str) + + +def _element_metadata(element: Any) -> dict[str, str]: + metadata = getattr(element, "metadata", None) + if hasattr(metadata, "to_dict"): + raw = dict(metadata.to_dict()) + elif isinstance(metadata, dict): + raw = dict(metadata) + else: + raw = {} + + raw.pop("orig_elements", None) + return { + str(key): _stringify_metadata_value(value) + for key, value in raw.items() + if value is not None + } + + +def _text_hash(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:12] + + +def _stable_doc_id(relative_path: str, chunk_index: int) -> str: + return f"{relative_path}::chunk-{chunk_index:04d}" + + +def _iter_files(input_dir: Path) -> list[Path]: + return sorted(path for path in input_dir.rglob("*") if path.is_file()) + + +def file_to_documents(path: Path, input_dir: Path) -> list[DocumentInfo]: + relative_path = path.relative_to(input_dir).as_posix() + elements = partition(filename=str(path)) + chunks = chunk_by_title( + elements, + max_characters=DEFAULT_MAX_CHARACTERS, + new_after_n_chars=DEFAULT_NEW_AFTER_N_CHARS, + combine_text_under_n_chars=DEFAULT_COMBINE_UNDER_N_CHARS, + ) + + docs: list[DocumentInfo] = [] + for chunk_index, chunk in enumerate(chunks): + text = str(chunk).strip() + if not text: + continue + + metadata = _element_metadata(chunk) + metadata.update( + { + "source_path": relative_path, + "filename": path.name, + "filetype": path.suffix.lower(), + "chunk_index": str(chunk_index), + "category": str(getattr(chunk, "category", chunk.__class__.__name__)), + "text_hash": _text_hash(text), + } + ) + + element_id = getattr(chunk, "id", None) + if element_id: + metadata["element_id"] = str(element_id) + + docs.append( + DocumentInfo( + id=_stable_doc_id(relative_path, chunk_index), + text=text, + metadata=metadata, + ) + ) + + return docs + + +def parse_documents(input_dir: Path) -> list[DocumentInfo]: + docs: list[DocumentInfo] = [] + for path in _iter_files(input_dir): + file_docs = file_to_documents(path, input_dir) + docs.extend(file_docs) + print(f"Parsed {path.relative_to(input_dir)} -> {len(file_docs)} chunks") + return docs + + +def _batches(docs: list[DocumentInfo], batch_size: int) -> list[list[DocumentInfo]]: + return [docs[i : i + batch_size] for i in range(0, len(docs), batch_size)] + + +async def upsert_documents( + client: MossClient, + index_name: str, + docs: list[DocumentInfo], + batch_size: int, +) -> None: + if not docs: + print("No document chunks to index.") + return + + batches = _batches(docs, batch_size) + upsert_options = MutationOptions(upsert=True) + + try: + await client.create_index(index_name, batches[0]) + print(f"Created index '{index_name}' with {len(batches[0])} chunks") + start = 1 + except RuntimeError as exc: + if "already exists" not in str(exc).lower(): + raise + print(f"Index '{index_name}' already exists; upserting chunks") + start = 0 + + for batch_number, batch in enumerate(batches[start:], start=start + 1): + await client.add_docs(index_name, batch, upsert_options) + print(f"Upserted batch {batch_number}/{len(batches)} ({len(batch)} chunks)") + + +async def query_index( + client: MossClient, + index_name: str, + query: str, + top_k: int, +) -> None: + await client.load_index(index_name) + results = await client.query( + index_name, + query, + QueryOptions(top_k=top_k, alpha=0.7), + ) + + print(f"\nQuery: {query}") + if not results.docs: + print("No results found.") + return + + for i, doc in enumerate(results.docs, 1): + metadata = doc.metadata or {} + source = metadata.get("source_path", "?") + page = metadata.get("page_number") + location = f"{source} page {page}" if page else source + print(f"\n{i}. {location} (score={doc.score:.3f})") + print(doc.text[:500]) + + +async def main() -> None: + load_dotenv() + args = parse_args() + if args.batch_size < 1: + raise RuntimeError("--batch-size must be at least 1.") + + project_id = os.getenv("MOSS_PROJECT_ID") + project_key = os.getenv("MOSS_PROJECT_KEY") + if not project_id or not project_key: + raise RuntimeError("Set MOSS_PROJECT_ID and MOSS_PROJECT_KEY.") + + input_dir = Path(args.input_dir).expanduser().resolve() + if not input_dir.is_dir(): + raise RuntimeError(f"Input directory not found: {input_dir}") + + docs = parse_documents(input_dir) + print(f"\nPrepared {len(docs)} chunks from {input_dir}") + + client = MossClient(project_id, project_key) + await upsert_documents(client, args.index_name, docs, args.batch_size) + + if args.query: + await query_index(client, args.index_name, args.query, args.top_k) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/cookbook/unstructured/pyproject.toml b/examples/cookbook/unstructured/pyproject.toml new file mode 100644 index 00000000..4b05965f --- /dev/null +++ b/examples/cookbook/unstructured/pyproject.toml @@ -0,0 +1,30 @@ +[project] +name = "unstructured-moss" +version = "0.1.0" +description = "Parse files with Unstructured and index them into Moss" +readme = "README.md" +requires-python = ">=3.11,<3.14" +license = { text = "BSD-2-Clause" } +authors = [ + { name = "InferEdge Inc.", email = "contact@moss.dev" } +] +dependencies = [ + "moss>=1.0.0", + "python-dotenv>=1.0", + "unstructured[all-docs]>=0.18", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["ingest.py"] + +[tool.hatch.build.targets.sdist] +include = [ + "README.md", + "ingest.py", + ".env.example", + "sample_docs/*", +] diff --git a/examples/cookbook/unstructured/sample_docs/onboarding.html b/examples/cookbook/unstructured/sample_docs/onboarding.html new file mode 100644 index 00000000..f05138b0 --- /dev/null +++ b/examples/cookbook/unstructured/sample_docs/onboarding.html @@ -0,0 +1,11 @@ + + +
+New hires receive repository access after completing security training.
+Manager approval is required before production credentials are issued.
+ + diff --git a/examples/cookbook/unstructured/sample_docs/release-notes.txt b/examples/cookbook/unstructured/sample_docs/release-notes.txt new file mode 100644 index 00000000..470b6994 --- /dev/null +++ b/examples/cookbook/unstructured/sample_docs/release-notes.txt @@ -0,0 +1,4 @@ +Moss release notes + +Semantic search indexes can be updated incrementally by upserting documents with stable IDs. +Metadata helps filter or explain where each retrieved chunk came from. diff --git a/examples/cookbook/unstructured/test_integration.py b/examples/cookbook/unstructured/test_integration.py new file mode 100644 index 00000000..645b2370 --- /dev/null +++ b/examples/cookbook/unstructured/test_integration.py @@ -0,0 +1,206 @@ +import asyncio +import contextlib +import importlib +import io +import sys +import types +import unittest +from dataclasses import dataclass +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import AsyncMock, patch + + +@dataclass +class FakeDocumentInfo: + id: str + text: str + metadata: dict[str, str] | None = None + + +class FakeMutationOptions: + def __init__(self, upsert: bool = False): + self.upsert = upsert + + +class FakeQueryOptions: + def __init__(self, **kwargs): + self.kwargs = kwargs + + +STUBBED_MODULES = [ + "dotenv", + "moss", + "unstructured", + "unstructured.chunking", + "unstructured.chunking.title", + "unstructured.partition", + "unstructured.partition.auto", +] + + +def _install_dependency_stubs() -> dict[str, types.ModuleType | None]: + previous_modules = {name: sys.modules.get(name) for name in STUBBED_MODULES} + + dotenv = types.ModuleType("dotenv") + dotenv.load_dotenv = lambda: None + + moss = types.ModuleType("moss") + moss.DocumentInfo = FakeDocumentInfo + moss.MossClient = object + moss.MutationOptions = FakeMutationOptions + moss.QueryOptions = FakeQueryOptions + + unstructured = types.ModuleType("unstructured") + chunking = types.ModuleType("unstructured.chunking") + title = types.ModuleType("unstructured.chunking.title") + title.chunk_by_title = lambda *args, **kwargs: [] + partition = types.ModuleType("unstructured.partition") + auto = types.ModuleType("unstructured.partition.auto") + auto.partition = lambda *args, **kwargs: [] + + sys.modules["dotenv"] = dotenv + sys.modules["moss"] = moss + sys.modules["unstructured"] = unstructured + sys.modules["unstructured.chunking"] = chunking + sys.modules["unstructured.chunking.title"] = title + sys.modules["unstructured.partition"] = partition + sys.modules["unstructured.partition.auto"] = auto + return previous_modules + + +def _restore_modules(previous_modules: dict[str, types.ModuleType | None]) -> None: + for name, module in previous_modules.items(): + if module is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = module + + +_PREVIOUS_MODULES = _install_dependency_stubs() + +ingest = importlib.import_module("ingest") + +_restore_modules(_PREVIOUS_MODULES) + + +class FakeMetadata: + def to_dict(self): + return { + "page_number": 4, + "languages": ["eng"], + "orig_elements": ["too large for Moss metadata"], + "empty": None, + } + + +class FakeElement: + category = "CompositeElement" + + def __init__(self, text: str, element_id: str | None = None): + self.text = text + self.id = element_id + self.metadata = FakeMetadata() + + def __str__(self): + return self.text + + +class TestUnstructuredIngestion(unittest.TestCase): + def test_file_to_documents_preserves_metadata_and_stable_ids(self): + with TemporaryDirectory() as temp_dir: + input_dir = Path(temp_dir) + source = input_dir / "policies" / "onboarding.html" + source.parent.mkdir() + source.write_text("