Skip to content

Commit 2b49fbd

Browse files
committed
feat: add observer pattern, streaming runner, writers, and AI docs
Major features: - Observer pattern (ObservableMixin, ValidationEvent, ValidationEventType, ValidationObserver) - Streaming ValidationRunner for memory-efficient large file processing with parallel support - Output writers (CSVFailedWriter, JSONLinesFailedWriter, AuditReportWriter) - Rich console observers for progress display (SimpleProgressObserver, RichDashboardObserver) - AI agent documentation (llms.txt, AGENTS.md) for LLM discoverability - Nested model audit_log_recursive() for hierarchical ValidationBase models - ValidatorPipelineBuilder fluent interface Breaking changes: None Bump version to 0.3.0a1
1 parent 519f111 commit 2b49fbd

File tree

17 files changed

+4861
-8
lines changed

17 files changed

+4861
-8
lines changed

AGENTS.md

Lines changed: 518 additions & 0 deletions
Large diffs are not rendered by default.

llms.txt

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
# abstract-validation-base
2+
3+
> Production-ready validation and process tracking for data transformation pipelines.
4+
5+
## Overview
6+
7+
Build robust ETL pipelines with full audit trails, composable validators, streaming batch processing, and seamless Pydantic integration. Tracks cleaning operations and errors with timestamps for DataFrame export.
8+
9+
## Installation
10+
11+
```bash
12+
pip install abstract-validation-base
13+
# or
14+
uv add abstract-validation-base
15+
```
16+
17+
---
18+
19+
## Core Classes
20+
21+
### ValidationBase
22+
Pydantic base model with automatic process logging and observer support.
23+
24+
```python
25+
from abstract_validation_base import ValidationBase
26+
27+
class Contact(ValidationBase):
28+
name: str
29+
email: str
30+
31+
contact = Contact(name="john", email="[email protected]")
32+
contact.add_cleaning_process("name", "john", "John", "Title case")
33+
contact.add_error("email", "Domain not allowed", contact.email)
34+
35+
print(contact.has_errors) # True
36+
print(contact.audit_log()) # List[dict] for DataFrame
37+
```
38+
39+
**Key methods:**
40+
- `add_error(field, message, value?, context?, raise_exception?)` - Log error
41+
- `add_cleaning_process(field, original, new, reason, operation_type?)` - Log transformation
42+
- `audit_log(source?)` - Export entries as list[dict]
43+
- `audit_log_recursive(source?)` - Include nested ValidationBase models
44+
- `has_errors`, `has_cleaning`, `error_count`, `cleaning_count` - Properties
45+
- `add_observer(observer)`, `remove_observer(observer)`, `notify(event)` - Observer pattern
46+
47+
### BaseValidator[T]
48+
Abstract base for creating type-safe validators.
49+
50+
```python
51+
from abstract_validation_base import BaseValidator, ValidationResult
52+
53+
class EmailValidator(BaseValidator[Contact]):
54+
@property
55+
def name(self) -> str:
56+
return "email_validator"
57+
58+
def validate(self, item: Contact) -> ValidationResult:
59+
result = ValidationResult(is_valid=True)
60+
if "@" not in item.email:
61+
result.add_error("email", "Invalid format", item.email)
62+
return result
63+
```
64+
65+
**Required:**
66+
- `name` property (str) - Validator identifier
67+
- `validate(item: T) -> ValidationResult` - Validation logic
68+
69+
### CompositeValidator[T]
70+
Combine multiple validators into a pipeline.
71+
72+
```python
73+
from abstract_validation_base import CompositeValidator
74+
75+
pipeline = CompositeValidator[Contact](
76+
validators=[EmailValidator(), PhoneValidator()],
77+
name="contact_pipeline",
78+
fail_fast=False # Run all validators (default)
79+
)
80+
result = pipeline.validate(contact)
81+
```
82+
83+
**Methods:**
84+
- `validate(item)` - Run all validators, merge results
85+
- `add_validator(validator)` - Add dynamically
86+
- `remove_validator(name)` - Remove by name
87+
- `has_validator(name)`, `get_validator(name)` - Query validators
88+
89+
### ValidatorPipelineBuilder[T]
90+
Fluent builder for validation pipelines.
91+
92+
```python
93+
from abstract_validation_base import ValidatorPipelineBuilder
94+
95+
pipeline = (
96+
ValidatorPipelineBuilder[Contact]("pipeline")
97+
.add(EmailValidator())
98+
.add(PhoneValidator())
99+
.fail_fast()
100+
.build()
101+
)
102+
```
103+
104+
### ValidationResult
105+
Container for validation status and errors.
106+
107+
```python
108+
from abstract_validation_base import ValidationResult
109+
110+
result = ValidationResult(is_valid=True)
111+
result.add_error("field", "message", "value") # Sets is_valid=False
112+
result.merge(other_result) # Combine results
113+
```
114+
115+
### ValidatedRecord
116+
SQLModel integration - define once, get validation + DB model.
117+
118+
```python
119+
from abstract_validation_base import ValidatedRecord
120+
121+
class CustomerRecord(ValidatedRecord, table_name="customers"):
122+
email: str
123+
name: str
124+
125+
customer = CustomerRecord(email="[email protected]", name="Test")
126+
customer.add_error("email", "Invalid domain")
127+
128+
db_record = customer.to_db() # SQLModel instance
129+
CustomerDB = CustomerRecord.db_model() # SQLModel class
130+
```
131+
132+
---
133+
134+
## Streaming Runner (Large Files)
135+
136+
### ValidationRunner[T]
137+
Memory-efficient batch validation using iterators.
138+
139+
```python
140+
from abstract_validation_base import ValidationRunner
141+
import csv
142+
143+
with open("large_file.csv") as f:
144+
reader = csv.DictReader(f)
145+
runner = ValidationRunner(
146+
data=reader,
147+
model_class=Contact,
148+
validators=pipeline, # Optional custom validators
149+
total_hint=1_000_000, # For progress percentage
150+
)
151+
152+
# Streaming - results not stored in memory
153+
for result in runner.run():
154+
if result.is_valid:
155+
save(result.model)
156+
else:
157+
log_errors(result.error_summary)
158+
159+
# Stats available after iteration
160+
print(runner.stats.success_rate)
161+
print(runner.audit_report())
162+
```
163+
164+
**Convenience methods:**
165+
- `run_collect_valid()` - Yield only valid models
166+
- `run_collect_failed()` - Yield only failed RowResults
167+
- `run_batch_valid(batch_size=100)` - Yield batches for bulk insert
168+
169+
**Parallel processing:**
170+
```python
171+
for result in runner.run(workers=4, chunk_size=10000):
172+
process(result)
173+
```
174+
175+
### RowResult[T]
176+
Result for a single row validation.
177+
178+
- `is_valid` - Passed all validation
179+
- `model` - Validated model (if valid)
180+
- `raw_data` - Original dict
181+
- `error_summary` - List of (field, message) tuples
182+
183+
### RunnerStats
184+
Statistics tracked during validation.
185+
186+
- `total_rows`, `valid_rows`, `error_rows`
187+
- `success_rate` - Percentage (0-100)
188+
- `top_errors(n=10)` - Most common errors
189+
- `duration_ms` - Processing time
190+
191+
---
192+
193+
## Observer Pattern
194+
195+
### ValidationEventType
196+
Events emitted during validation:
197+
- `ERROR_ADDED` - Error logged on model
198+
- `CLEANING_ADDED` - Cleaning operation logged
199+
- `VALIDATION_STARTED` - Validation begins
200+
- `VALIDATION_COMPLETED` - Validation ends
201+
- `ROW_PROCESSED` - Single row processed (runner)
202+
- `BATCH_STARTED`, `BATCH_COMPLETED` - Batch events
203+
204+
### ValidationObserver Protocol
205+
```python
206+
from abstract_validation_base import ValidationObserver, ValidationEvent
207+
208+
class LoggingObserver:
209+
def on_event(self, event: ValidationEvent) -> None:
210+
print(f"{event.event_type.name}: {event.data}")
211+
212+
model.add_observer(LoggingObserver())
213+
runner.add_observer(LoggingObserver())
214+
```
215+
216+
### Rich Observers (requires `pip install rich`)
217+
218+
```python
219+
from abstract_validation_base import SimpleProgressObserver, RichDashboardObserver
220+
from rich.progress import Progress
221+
222+
# Simple progress bar
223+
with Progress() as progress:
224+
observer = SimpleProgressObserver(progress)
225+
runner.add_observer(observer)
226+
for result in runner.run():
227+
process(result)
228+
229+
# Full dashboard with live error table
230+
with RichDashboardObserver() as observer:
231+
runner.add_observer(observer)
232+
for result in runner.run():
233+
process(result)
234+
```
235+
236+
---
237+
238+
## Output Writers
239+
240+
### CSVFailedWriter
241+
Export failed records to CSV.
242+
243+
```python
244+
from abstract_validation_base import CSVFailedWriter
245+
246+
writer = CSVFailedWriter("failed_records.csv")
247+
count = writer.write_all(runner.run_collect_failed())
248+
```
249+
250+
### JSONLinesFailedWriter
251+
Export failed records to JSON Lines.
252+
253+
```python
254+
from abstract_validation_base import JSONLinesFailedWriter
255+
256+
writer = JSONLinesFailedWriter("failed.jsonl")
257+
count = writer.write_all(runner.run_collect_failed())
258+
```
259+
260+
### AuditReportWriter
261+
Export validation summary reports.
262+
263+
```python
264+
from abstract_validation_base import AuditReportWriter
265+
266+
writer = AuditReportWriter("audit.json") # or "audit.csv"
267+
writer.write(runner.audit_report())
268+
```
269+
270+
---
271+
272+
## Key Patterns
273+
274+
### Basic Validation Pipeline
275+
```python
276+
from abstract_validation_base import (
277+
ValidationBase,
278+
BaseValidator,
279+
CompositeValidator,
280+
ValidationResult,
281+
)
282+
283+
class MyModel(ValidationBase):
284+
field: str
285+
286+
class MyValidator(BaseValidator[MyModel]):
287+
@property
288+
def name(self) -> str:
289+
return "my_validator"
290+
291+
def validate(self, item: MyModel) -> ValidationResult:
292+
result = ValidationResult(is_valid=True)
293+
if not item.field:
294+
result.add_error("field", "Required")
295+
return result
296+
297+
pipeline = CompositeValidator[MyModel](validators=[MyValidator()])
298+
result = pipeline.validate(MyModel(field="test"))
299+
```
300+
301+
### ETL with Audit Trail
302+
```python
303+
import csv
304+
import pandas as pd
305+
from abstract_validation_base import ValidationRunner, CSVFailedWriter
306+
307+
with open("input.csv") as f:
308+
runner = ValidationRunner(csv.DictReader(f), MyModel, validators=pipeline)
309+
310+
valid_records = []
311+
for result in runner.run():
312+
if result.is_valid and result.model:
313+
valid_records.append(result.model)
314+
315+
# Export audit
316+
audit_df = pd.DataFrame(runner.audit_report()["top_errors"])
317+
```
318+
319+
---
320+
321+
## Important Constraints
322+
323+
1. **Validators return ValidationResult, never raise exceptions** for validation failures
324+
2. **Use `result.add_error()`** which automatically sets `is_valid=False`
325+
3. **Don't modify items inside `validate()`** - validators should be side-effect free
326+
4. **ValidationRunner accepts iterators** - don't materialize large files into lists
327+
5. **process_log is excluded from serialization** - use `audit_log()` to export
328+
6. **Type parameter T must match** between validator and model being validated

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "abstract-validation-base"
3-
version = "0.2.0a1"
3+
version = "0.3.0a1"
44
description = "Validation and process tracking for data transformation pipelines"
55
readme = "README.md"
66
license = { text = "MIT" }
@@ -21,6 +21,7 @@ classifiers = [
2121
requires-python = ">=3.12.8"
2222
dependencies = [
2323
"pydantic>=2.12.5",
24+
"rich>=14.2.0",
2425
"sqlmodel>=0.0.27",
2526
]
2627

0 commit comments

Comments
 (0)