Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/cookbook/unstructured/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
MOSS_PROJECT_ID=your-project-id
MOSS_PROJECT_KEY=your-project-key
MOSS_INDEX_NAME=unstructured-docs
71 changes: 71 additions & 0 deletions examples/cookbook/unstructured/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Unstructured + Moss Cookbook

Parse PDFs, Word docs, HTML, images, and other file formats with [Unstructured](https://unstructured.io/) and index the extracted content into [Moss](https://moss.dev) for semantic search.

This cookbook shows a focused ingestion pipeline:

1. Partition raw files with Unstructured
2. Chunk extracted elements
3. Preserve source metadata on every chunk
4. Upsert chunks into a Moss index with stable document IDs
5. Query the loaded Moss index

## Setup

```bash
cd examples/cookbook/unstructured
uv sync
cp .env.example .env
```

Fill in `.env` with your Moss credentials:

```bash
MOSS_PROJECT_ID=your-project-id
MOSS_PROJECT_KEY=your-project-key
MOSS_INDEX_NAME=unstructured-docs
```

## Usage

Index the sample documents:

```bash
uv run python ingest.py --input-dir sample_docs
```

Index your own folder:

```bash
uv run python ingest.py --input-dir /path/to/files --index-name company-docs
```

Ask a query after ingestion:

```bash
uv run python ingest.py \
--input-dir sample_docs \
--query "What does the onboarding policy say about access?"
```

## What Gets Stored

Each Moss document is one Unstructured chunk:

```python
DocumentInfo(
id="docs/handbook.pdf::chunk-0003",
text="Extracted chunk text...",
metadata={
"source_path": "docs/handbook.pdf",
"filename": "handbook.pdf",
"filetype": ".pdf",
"chunk_index": "3",
"category": "CompositeElement",
"page_number": "4",
"text_hash": "a1b2c3d4e5f6",
},
)
```

Chunk IDs are deterministic from the relative file path and chunk index. Rerunning the same ingestion uses `MutationOptions(upsert=True)`, so chunks are updated in place.
227 changes: 227 additions & 0 deletions examples/cookbook/unstructured/ingest.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Other cookbook examples include test files; this one does not

The langchain cookbook has test_integration.py, haystack has test_live.py, and crewai has test_live.py. This PR's unstructured cookbook has no test file. The CONTRIBUTING.md states "If you've added code that should be tested, add tests." For a cookbook/example script this is somewhat subjective — the dspy cookbook also lacks a test file — but it is a deviation from the majority pattern. Consider adding at least a minimal integration/smoke test for consistency.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import argparse
import asyncio
import hashlib
import json
import os
from pathlib import Path
from typing import Any

from dotenv import load_dotenv
from moss import DocumentInfo, MossClient, MutationOptions, QueryOptions
from unstructured.chunking.title import chunk_by_title
from unstructured.partition.auto import partition


DEFAULT_MAX_CHARACTERS = 1_500
DEFAULT_NEW_AFTER_N_CHARS = 1_200
DEFAULT_COMBINE_UNDER_N_CHARS = 300


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Parse files with Unstructured and index chunks into Moss."
)
parser.add_argument(
"--input-dir",
default="sample_docs",
help="Directory containing PDFs, Word docs, HTML, images, or other files.",
)
parser.add_argument(
"--index-name",
default=os.getenv("MOSS_INDEX_NAME", "unstructured-docs"),
help="Moss index name.",
)
parser.add_argument(
"--query",
help="Optional query to run after ingestion.",
)
parser.add_argument(
"--top-k",
type=int,
default=5,
help="Number of Moss search results to return for --query.",
)
parser.add_argument(
"--batch-size",
type=int,
default=100,
help="Number of chunks to upsert per Moss request.",
)
return parser.parse_args()


def _stringify_metadata_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, (str, int, float, bool)):
return str(value)
return json.dumps(value, sort_keys=True, default=str)


def _element_metadata(element: Any) -> dict[str, str]:
metadata = getattr(element, "metadata", None)
if hasattr(metadata, "to_dict"):
raw = dict(metadata.to_dict())
elif isinstance(metadata, dict):
raw = dict(metadata)
else:
raw = {}

raw.pop("orig_elements", None)
return {
str(key): _stringify_metadata_value(value)
for key, value in raw.items()
if value is not None
}


def _text_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:12]


def _stable_doc_id(relative_path: str, chunk_index: int) -> str:
return f"{relative_path}::chunk-{chunk_index:04d}"


def _iter_files(input_dir: Path) -> list[Path]:
return sorted(path for path in input_dir.rglob("*") if path.is_file())


def file_to_documents(path: Path, input_dir: Path) -> list[DocumentInfo]:
relative_path = path.relative_to(input_dir).as_posix()
elements = partition(filename=str(path))
chunks = chunk_by_title(
elements,
max_characters=DEFAULT_MAX_CHARACTERS,
new_after_n_chars=DEFAULT_NEW_AFTER_N_CHARS,
combine_text_under_n_chars=DEFAULT_COMBINE_UNDER_N_CHARS,
)

docs: list[DocumentInfo] = []
for chunk_index, chunk in enumerate(chunks):
text = str(chunk).strip()
if not text:
continue

metadata = _element_metadata(chunk)
metadata.update(
{
"source_path": relative_path,
"filename": path.name,
"filetype": path.suffix.lower(),
"chunk_index": str(chunk_index),
"category": str(getattr(chunk, "category", chunk.__class__.__name__)),
"text_hash": _text_hash(text),
}
)

element_id = getattr(chunk, "id", None)
if element_id:
metadata["element_id"] = str(element_id)

docs.append(
DocumentInfo(
id=_stable_doc_id(relative_path, chunk_index),
text=text,
metadata=metadata,
)
)

return docs


def parse_documents(input_dir: Path) -> list[DocumentInfo]:
docs: list[DocumentInfo] = []
for path in _iter_files(input_dir):
file_docs = file_to_documents(path, input_dir)
docs.extend(file_docs)
print(f"Parsed {path.relative_to(input_dir)} -> {len(file_docs)} chunks")
return docs


def _batches(docs: list[DocumentInfo], batch_size: int) -> list[list[DocumentInfo]]:
return [docs[i : i + batch_size] for i in range(0, len(docs), batch_size)]


async def upsert_documents(
client: MossClient,
index_name: str,
docs: list[DocumentInfo],
batch_size: int,
) -> None:
if not docs:
print("No document chunks to index.")
return

batches = _batches(docs, batch_size)
upsert_options = MutationOptions(upsert=True)

try:
await client.create_index(index_name, batches[0])
print(f"Created index '{index_name}' with {len(batches[0])} chunks")
start = 1
except RuntimeError as exc:
if "already exists" not in str(exc).lower():
raise
print(f"Index '{index_name}' already exists; upserting chunks")
start = 0

for batch_number, batch in enumerate(batches[start:], start=start + 1):
await client.add_docs(index_name, batch, upsert_options)
print(f"Upserted batch {batch_number}/{len(batches)} ({len(batch)} chunks)")


async def query_index(
client: MossClient,
index_name: str,
query: str,
top_k: int,
) -> None:
await client.load_index(index_name)
results = await client.query(
index_name,
query,
QueryOptions(top_k=top_k, alpha=0.7),
)

print(f"\nQuery: {query}")
if not results.docs:
print("No results found.")
return

for i, doc in enumerate(results.docs, 1):
metadata = doc.metadata or {}
source = metadata.get("source_path", "?")
page = metadata.get("page_number")
location = f"{source} page {page}" if page else source
print(f"\n{i}. {location} (score={doc.score:.3f})")
print(doc.text[:500])


async def main() -> None:
load_dotenv()
args = parse_args()
if args.batch_size < 1:
raise RuntimeError("--batch-size must be at least 1.")

project_id = os.getenv("MOSS_PROJECT_ID")
project_key = os.getenv("MOSS_PROJECT_KEY")
if not project_id or not project_key:
raise RuntimeError("Set MOSS_PROJECT_ID and MOSS_PROJECT_KEY.")

input_dir = Path(args.input_dir).expanduser().resolve()
if not input_dir.is_dir():
raise RuntimeError(f"Input directory not found: {input_dir}")

docs = parse_documents(input_dir)
print(f"\nPrepared {len(docs)} chunks from {input_dir}")

client = MossClient(project_id, project_key)
await upsert_documents(client, args.index_name, docs, args.batch_size)

if args.query:
await query_index(client, args.index_name, args.query, args.top_k)


if __name__ == "__main__":
asyncio.run(main())
30 changes: 30 additions & 0 deletions examples/cookbook/unstructured/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[project]
name = "unstructured-moss"
version = "0.1.0"
description = "Parse files with Unstructured and index them into Moss"
readme = "README.md"
requires-python = ">=3.11,<3.14"
license = { text = "BSD-2-Clause" }
authors = [
{ name = "InferEdge Inc.", email = "contact@moss.dev" }
]
dependencies = [
"moss>=1.0.0",
"python-dotenv>=1.0",
"unstructured[all-docs]>=0.18",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["ingest.py"]

[tool.hatch.build.targets.sdist]
include = [
"README.md",
"ingest.py",
".env.example",
"sample_docs/*",
]
11 changes: 11 additions & 0 deletions examples/cookbook/unstructured/sample_docs/onboarding.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!doctype html>
<html>
<head>
<title>Onboarding Policy</title>
</head>
<body>
<h1>Onboarding Policy</h1>
<p>New hires receive repository access after completing security training.</p>
<p>Manager approval is required before production credentials are issued.</p>
</body>
</html>
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Moss release notes

Semantic search indexes can be updated incrementally by upserting documents with stable IDs.
Metadata helps filter or explain where each retrieved chunk came from.
Loading