A high-performance, distributed document processing system written in Go that leverages OpenAI's language models for intelligent text extraction and analysis. The system implements a robust pipeline architecture with load balancing, health monitoring, and graceful error handling.
- Distributed Processing: Concurrent document processing with configurable worker pools
- Load Balanced LLM Integration: Multi-server OpenAI client with automatic failover and health checks
- Advanced Search: Integration with MeiliSearch for efficient document querying
- Persistent Storage: MongoDB for document and pipeline management
- Queue Management: Redis-based work queue for batch processing
- Graceful Shutdown: Managed service shutdown with timeout controls
- Robust Logging: Colored, structured logging with different severity levels
- Health Monitoring: Automatic health checks and server status tracking
- Resource Management: Semaphore-based concurrency control
-
FileService: Main orchestrator managing document processing
- Handles concurrent document processing
- Manages extraction pipelines
- Controls resource allocation
-
LoadBalancedOpenAIClient: Distributed LLM processing
- Multiple server support
- Automatic health checks
- Round-robin load balancing
- Failover handling
-
Worker: Background processing engine
- Continuous batch processing
- Graceful shutdown support
- Error recovery
-
Repositories:
FileRepository
: Document managementPipelineRepository
: Pipeline configurationSearchService
: Document search operations
require (
"github.com/go-redis/redis/v8"
"github.com/joho/godotenv"
"github.com/meilisearch/meilisearch-go"
"github.com/sashabaranov/go-openai"
"go.mongodb.org/mongo-driver/mongo"
)
type Config struct {
MaxConcurrentDocuments int
MaxConcurrentExtractions int
ExtractionTimeout time.Duration
ShutdownTimeout time.Duration
}
Required environment variables:
NEBUIA_LLM_KEY
: OpenAI API keyMONGO_URI
: MongoDB connection stringMONGO_DB
: MongoDB database nameMEILI_HOST
: MeiliSearch hostMEILI_KEY
: MeiliSearch API keyREDIS_HOST
: Redis hostREDIS_PORT
: Redis portREDIS_PASSWORD
: Redis password
- Documents enter the system via Redis queue
- Worker picks up batches for processing
- Documents are processed concurrently within configured limits
- Each document goes through multiple extraction pipelines
- Results are stored in MongoDB
- Search index is updated via MeiliSearch
graph TD
A[Document Batch] --> B[Worker]
B --> C{Concurrent Processing}
C --> D[Pipeline 1]
C --> E[Pipeline 2]
C --> F[Pipeline n]
D --> G[LLM Extraction]
E --> G
F --> G
G --> H[Result Aggregation]
H --> I[MongoDB Storage]
func main() {
// Initialize services
fileService := NewFileService(fileRepo, pipelineRepo, searchClient)
worker := NewWorker(fileService)
// Start processing
worker.Start()
}
Pipelines are configured in MongoDB with the following structure:
{
"name": "pipeline_name",
"key": "extraction_key",
"value": "extraction_value",
"match": "search_pattern",
"schema": "extraction_schema",
"use_llama": false
}
- Configurable concurrency limits for document processing
- Semaphore-based extraction control
- Timeout management for LLM requests
- Health monitoring with automatic server exclusion
- Resource cleanup during shutdown
The system implements comprehensive error handling:
- Pipeline processing errors
- Database connection issues
- LLM extraction failures
- Search query errors
- Queue management errors
Each error is logged with appropriate severity and context.
The system continuously monitors:
- LLM server health
- Database connectivity
- Redis queue status
- Worker state
- Pipeline performance
Structured logging with severity levels:
- INFO: General operational information
- WARNING: Potential issues
- ERROR: Processing failures
- DEBUG: Detailed operational data
- SUCCESS: Completion notifications
- CRITICAL: System-level issues
The system tracks:
- Document processing time
- Pipeline execution duration
- Extraction success rates
- Server health status
- Queue length
- Concurrent operations
- Go 1.19+
- MongoDB
- Redis
- MeiliSearch
- OpenAI API access
- Clone the repository
- Copy
.env.example
to.env
- Configure environment variables
- Run
go mod download
- Start the service with
go run main.go
- Fork the repository
- Create a feature branch
- Commit changes
- Push to the branch
- Create a Pull Request