diff --git a/.env.example b/.env.example index 370d98b0..db9a1e64 100644 --- a/.env.example +++ b/.env.example @@ -26,3 +26,7 @@ SLACK_CHANNEL= WRGL_CLIENT_ID= WRGL_CLIENT_SECRET= IPNO_API_KEY= + +# OpenAI settings +OPENAI_API_KEY= +OPENAI_MISCONDUCT_CONFIDENCE_THRESHOLD=0.7 diff --git a/ipno/config/settings/base.py b/ipno/config/settings/base.py index 833e6559..4cfee610 100644 --- a/ipno/config/settings/base.py +++ b/ipno/config/settings/base.py @@ -303,4 +303,4 @@ CSV_DATA_PATH = "./ipno/csv_data" -IPNO_API_KEY = env.str("IPNO_API_KEY") +IPNO_API_KEY = env.str("IPNO_API_KEY", None) diff --git a/ipno/news_articles/management/commands/process_llm_batches.py b/ipno/news_articles/management/commands/process_llm_batches.py new file mode 100644 index 00000000..1a58bc90 --- /dev/null +++ b/ipno/news_articles/management/commands/process_llm_batches.py @@ -0,0 +1,17 @@ +from django.core.management import BaseCommand +from news_articles.services.process_llm_analysis import ProcessLLMAnalysis + + +class Command(BaseCommand): + help = "Process completed LLM analysis batches" + + def handle(self, *args, **options): + processor = ProcessLLMAnalysis() + + self.stdout.write("Starting to process completed batches...") + batches = processor.client.batches.list(limit=100) + completed_batches = [b for b in batches.data if b.status == "completed" and b.output_file_id] + + self.stdout.write(f"Found {len(completed_batches)} completed batches to process") + processor.process_completed_batches() + self.stdout.write("Finished processing completed batches") diff --git a/ipno/news_articles/management/commands/run_llm_analysis.py b/ipno/news_articles/management/commands/run_llm_analysis.py new file mode 100644 index 00000000..55d26b6e --- /dev/null +++ b/ipno/news_articles/management/commands/run_llm_analysis.py @@ -0,0 +1,11 @@ +from django.core.management import BaseCommand + +from news_articles.services.process_llm_analysis import ProcessLLMAnalysis + + +class Command(BaseCommand): + help = "Process unanalyzed news articles with OpenAI LLM to detect police misconduct content" + + def handle(self, *args, **options): + processor = ProcessLLMAnalysis() + processor.process_unanalyzed_articles() diff --git a/ipno/news_articles/management/commands/submit_llm_analysis.py b/ipno/news_articles/management/commands/submit_llm_analysis.py new file mode 100644 index 00000000..18c4bb91 --- /dev/null +++ b/ipno/news_articles/management/commands/submit_llm_analysis.py @@ -0,0 +1,13 @@ +from django.core.management import BaseCommand +from news_articles.services.process_llm_analysis import ProcessLLMAnalysis + + +class Command(BaseCommand): + help = "Submit unprocessed news articles for batch LLM analysis" + + def handle(self, *args, **options): + processor = ProcessLLMAnalysis() + submitted_batches = processor.submit_unanalyzed_articles() + self.stdout.write(f"Submitted {len(submitted_batches)} articles for processing") + for batch_id, article_id in submitted_batches: + self.stdout.write(f"Article {article_id} submitted as batch {batch_id}") diff --git a/ipno/news_articles/models/news_article.py b/ipno/news_articles/models/news_article.py index bf2c49aa..2b584a14 100644 --- a/ipno/news_articles/models/news_article.py +++ b/ipno/news_articles/models/news_article.py @@ -11,9 +11,12 @@ class NewsArticle(TimeStampsModel): published_date = models.DateField() author = models.CharField(max_length=255, blank=True, null=True) url = models.CharField(max_length=255, blank=True, null=True) - is_hidden = models.BooleanField(default=False) + is_hidden = models.BooleanField(default=True) # Default to hidden until LLM processed + hide_reason = models.CharField(max_length=50, null=True, blank=True) is_processed = models.BooleanField(default=False) + is_llm_processed = models.BooleanField(default=False) + llm_analysis_result = models.JSONField(null=True, blank=True) source = models.ForeignKey( "news_articles.NewsArticleSource", null=True, diff --git a/ipno/news_articles/services/process_llm_analysis.py b/ipno/news_articles/services/process_llm_analysis.py new file mode 100644 index 00000000..0aa5376d --- /dev/null +++ b/ipno/news_articles/services/process_llm_analysis.py @@ -0,0 +1,177 @@ +import json +from typing import List +from django.conf import settings +from django.utils import timezone +from pydantic import BaseModel, Field +from openai import OpenAI +from news_articles.models import NewsArticle + + +class MisconductAnalysis(BaseModel): + contains_misconduct: bool = Field(description="Indicates if police misconduct is mentioned in the article") + confidence_score: float = Field(ge=0, le=1, description="Confidence score between 0 and 1") + explanation: str = Field(description="Brief explanation of the decision") + + @classmethod + def model_json_schema(cls, *args, **kwargs) -> dict: + """Print and return the JSON schema that will be used by OpenAI.""" + schema = super().model_json_schema(*args, **kwargs) + print("\nGenerated OpenAI Schema:") + print("------------------------") + print("type:", schema.get("type")) + print("properties:", schema.get("properties")) + print("required:", schema.get("required")) + print("additionalProperties:", schema.get("additionalProperties", False)) + print("------------------------\n") + return schema + + +class ProcessLLMAnalysis: + def __init__(self): + self.client = OpenAI(api_key=settings.OPENAI_API_KEY) + + def _create_single_request(self, article: NewsArticle) -> str: + """Create a JSONL file for a single article batch request.""" + request = { + "custom_id": str(article.id), + "method": "POST", + "url": "/v1/chat/completions", + "body": { + "model": "gpt-4o-mini", + "messages": [ + {"role": "system", "content": "You are a helpful assistant analyzing news articles for police misconduct content."}, + {"role": "user", "content": f"Analyze the following news article text and determine if it contains any reference to police misconduct.\n\nArticle text:\n{article.content}"} + ], + "temperature": 0, + "response_format": { + "type": "json_schema", + "json_schema": MisconductAnalysis.model_json_schema() + } + } + } + + filename = f"batch_request_{article.id}_{timezone.now().strftime('%Y%m%d_%H%M%S')}.jsonl" + with open(filename, 'w') as f: + f.write(json.dumps(request)) + + return filename + + def _process_batch_results(self, batch_id: str): + """Process results from a completed batch.""" + batch = self.client.batches.retrieve(batch_id) + + if batch.status == "completed" and batch.output_file_id: + # Download and process results + output = self.client.files.retrieve_content(batch.output_file_id) + + for line in output.splitlines(): + result = json.loads(line) + article_id = result['custom_id'] + response = result['response'] + + try: + article = NewsArticle.objects.get(id=article_id) + + if response.get('error'): + print(f"Error processing article {article_id}: {response['error']}") + article.hide_reason = "LLM processing error" + article.save() + continue + + analysis_result = response['body']['choices'][0]['message']['content'] + article.llm_analysis_result = analysis_result + article.is_llm_processed = True + + # Unhide article if misconduct confidence is above threshold + if analysis_result['confidence_score'] >= settings.OPENAI_MISCONDUCT_CONFIDENCE_THRESHOLD: + article.is_hidden = False + else: + article.hide_reason = "Below confidence threshold" + + article.save() + + except NewsArticle.DoesNotExist: + print(f"Article {article_id} not found") + except Exception as e: + print(f"Error processing result for article {article_id}: {str(e)}") + + def _cleanup_file(self, filename: str): + """Clean up temporary files.""" + import os + try: + os.remove(filename) + except Exception as e: + print(f"Error cleaning up file {filename}: {str(e)}") + + def submit_unanalyzed_articles(self): + """Submit each unanalyzed article as a single-item batch.""" + articles = NewsArticle.objects.filter(is_llm_processed=False) + submitted_batch_ids = [] + + for article in articles: + try: + # Create and upload batch file for single article + batch_file = self._create_single_request(article) + file = self.client.files.create( + file=open(batch_file, "rb"), + purpose="batch" + ) + + # Create batch processing job + batch = self.client.batches.create( + input_file_id=file.id, + endpoint="/v1/chat/completions", + completion_window="24h" + ) + + submitted_batch_ids.append((batch.id, article.id)) + self._cleanup_file(batch_file) + + except Exception as e: + print(f"Error submitting article {article.id}: {str(e)}") + if 'batch_file' in locals(): + self._cleanup_file(batch_file) + + return submitted_batch_ids + + def process_completed_batches(self): + """Process results from completed single-item batches.""" + batches = self.client.batches.list(limit=100) + + for batch in batches.data: + if batch.status == "completed" and batch.output_file_id: + try: + output = self.client.files.retrieve_content(batch.output_file_id) + result = json.loads(output) # Single item, no need to iterate + + article_id = result['custom_id'] + response = result['response'] + + try: + article = NewsArticle.objects.get(id=article_id) + + if response.get('error'): + print(f"Error processing article {article_id}: {response['error']}") + article.hide_reason = "LLM processing error" + article.save() + continue + + analysis_result = response['body']['choices'][0]['message']['content'] + article.llm_analysis_result = analysis_result + article.is_llm_processed = True + + # Unhide article if misconduct confidence is above threshold + if analysis_result['confidence_score'] >= settings.OPENAI_MISCONDUCT_CONFIDENCE_THRESHOLD: + article.is_hidden = False + else: + article.hide_reason = "Below confidence threshold" + + article.save() + + except NewsArticle.DoesNotExist: + print(f"Article {article_id} not found") + except Exception as e: + print(f"Error processing result for article {article_id}: {str(e)}") + + except Exception as e: + print(f"Error processing batch {batch.id}: {str(e)}") diff --git a/ipno/tasks/constants.py b/ipno/tasks/constants.py index 42cd5ccc..e0345b51 100644 --- a/ipno/tasks/constants.py +++ b/ipno/tasks/constants.py @@ -1,6 +1,10 @@ +HOURLY_TASK = "hourly_task" DAILY_TASK = "daily_task" -TASK_TYPES = ((DAILY_TASK, "Daily task"),) +TASK_TYPES = ( + (HOURLY_TASK, "Hourly task"), + (DAILY_TASK, "Daily task"), +) APP_TASKS = [ { @@ -13,6 +17,16 @@ "command": "run_news_articles_crawlers", "task_type": DAILY_TASK, }, + { + "task_name": "Submit news articles for LLM analysis", + "command": "submit_llm_analysis", + "task_type": HOURLY_TASK, # Submit new articles frequently + }, + { + "task_name": "Process completed LLM analysis batches", + "command": "process_llm_batches", + "task_type": DAILY_TASK, # Process results once per day + }, { "task_name": "Run news articles' officers matching", "command": "run_news_articles_officers_matching", diff --git a/requirements/base.txt b/requirements/base.txt index 55fa400c..8332ed61 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -40,4 +40,6 @@ celery[redis]==5.2.7 django-celery-results==2.4.0 cryptography==38.0.4 django-db-geventpool==4.0.1 -slack_sdk==3.19.5 \ No newline at end of file +slack_sdk==3.19.5 +openai==1.3.0 +pydantic>=2.0.0