diff --git a/apps/data-processing/patch.py b/apps/data-processing/patch.py new file mode 100644 index 00000000..028b7c45 --- /dev/null +++ b/apps/data-processing/patch.py @@ -0,0 +1,4 @@ +import re +from src.ingestion.social_fetcher import * + +print("Patching complete.") diff --git a/apps/data-processing/src/ingestion/news_deduplicator.py b/apps/data-processing/src/ingestion/news_deduplicator.py index a94eed1d..0d924092 100644 --- a/apps/data-processing/src/ingestion/news_deduplicator.py +++ b/apps/data-processing/src/ingestion/news_deduplicator.py @@ -3,10 +3,12 @@ """ import hashlib import json +import logging +import re +import unicodedata from datetime import datetime, timedelta, timezone -from typing import List, Dict, Optional, Set from pathlib import Path -import logging +from typing import Dict, List, Optional, Set logger = logging.getLogger(__name__) @@ -41,6 +43,12 @@ def __init__(self, deduplication_window_days: int = 7, storage_path: str = "./da logger.info(f"Initialized NewsDeduplicator with window of {deduplication_window_days} days") + def _clean_text(self, text: str) -> str: + normalized = unicodedata.normalize("NFKD", text or "") + normalized = normalized.encode("ascii", "ignore").decode("ascii") + normalized = re.sub(r"\s+", " ", normalized).strip().lower() + return normalized + def _normalize_article(self, article: Dict) -> str: """ Normalize article content for consistent hashing @@ -52,16 +60,16 @@ def _normalize_article(self, article: Dict) -> str: Normalized string representation of the article """ # Extract and normalize key fields - title = (article.get('title') or '').strip().lower() - content = (article.get('content') or '').strip().lower() - url = (article.get('url') or '').strip().lower() + title = self._clean_text(article.get("title") or "") + content = self._clean_text(article.get("content") or "") + url = self._clean_text(article.get("url") or "") # Create a canonical representation canonical_data = { - 'title': title, - 'content': content, - 'url': url, - 'source': (article.get('source') or '').strip().lower(), + "title": title, + "content": content, + "url": url, + "source": self._clean_text(article.get("source") or ""), } # Convert to JSON string for consistent hashing diff --git a/apps/data-processing/src/ingestion/news_fetcher.py b/apps/data-processing/src/ingestion/news_fetcher.py index 0d3721bd..0deb9305 100644 --- a/apps/data-processing/src/ingestion/news_fetcher.py +++ b/apps/data-processing/src/ingestion/news_fetcher.py @@ -3,16 +3,31 @@ Fetches data from external APIs and standardizes the format. """ -import os import json +import os +import re import time -from typing import List, Dict, Optional +import unicodedata from dataclasses import dataclass, asdict -from .news_deduplicator import NewsDeduplicator from datetime import datetime +from typing import Any, Dict, List, Optional +from .news_deduplicator import NewsDeduplicator import requests from requests.exceptions import RequestException, Timeout +try: + from langdetect import DetectorFactory, LangDetectException, detect + + DetectorFactory.seed = 0 + LANGDETECT_AVAILABLE = True +except ImportError: + LANGDETECT_AVAILABLE = False + + class LangDetectException(Exception): + """Fallback exception when langdetect is unavailable.""" + +DEFAULT_TRANSLATION_API_URL = "https://libretranslate.de/translate" + @dataclass class NewsArticle: @@ -28,6 +43,8 @@ class NewsArticle: categories: List[str] sentiment_score: Optional[float] = None # To be filled by sentiment engine tags: Optional[List[str]] = None + language: str = "en" + translated: bool = False def to_dict(self) -> Dict: """Convert to dictionary with serialized datetime""" @@ -110,6 +127,126 @@ def _handle_api_error(self, response: requests.Response, api_name: str) -> None: else: response.raise_for_status() + def _normalize_text(self, text: Optional[str]) -> str: + """Normalize text for analytics ingestion.""" + if not text: + return "" + normalized = unicodedata.normalize("NFKC", text) + normalized = re.sub(r"\s+", " ", normalized).strip() + return normalized + + def _normalize_language_code(self, language: str) -> str: + """Normalize language codes like en_US to en.""" + if not language or not isinstance(language, str): + return "unknown" + normalized = language.strip().lower().replace("_", "-") + return normalized.split("-")[0] + + def _detect_script_language(self, text: str) -> Optional[str]: + if re.search(r"[\u4e00-\u9fff]", text): + return "zh" + if re.search(r"[\u3040-\u30ff]", text): + return "ja" + if re.search(r"[\uac00-\ud7af]", text): + return "ko" + if re.search(r"[\u0400-\u04ff]", text): + return "ru" + if re.search(r"[\u0600-\u06ff]", text): + return "ar" + return None + + def _detect_language(self, text: str, hint: Optional[str] = None) -> str: + """Detect the source language for an article.""" + if hint: + return self._normalize_language_code(hint) + + if text and LANGDETECT_AVAILABLE: + try: + detected = detect(text) + return self._normalize_language_code(detected) + except LangDetectException: + pass + + script_language = self._detect_script_language(text or "") + if script_language: + return script_language + + return "en" + + def _translate_text(self, text: str, source_lang: str) -> str: + """Translate non-English text to English using a configurable endpoint.""" + if not text or source_lang == "en": + return text + + translation_url = ( + os.getenv("TRANSLATION_API_URL", "").strip() or DEFAULT_TRANSLATION_API_URL + ) + translation_key = os.getenv("TRANSLATION_API_KEY", "").strip() + + if not translation_url: + return text + + payload = { + "q": text, + "source": source_lang, + "target": "en", + "format": "text", + } + headers = {"Content-Type": "application/json"} + if translation_key: + headers["Authorization"] = f"Bearer {translation_key}" + + try: + response = self.session.post( + translation_url, + json=payload, + headers=headers, + timeout=APIConfig.TIMEOUT, + ) + if response.status_code != 200: + logger.warning( + "Translation API returned %s for source_lang=%s", + response.status_code, + source_lang, + ) + return text + + data = response.json() + return data.get("translatedText") or data.get("translation") or text + except RequestException as exc: + logger.warning("Translation request failed: %s", exc) + return text + except ValueError: + logger.warning("Translation API returned invalid JSON") + return text + + def _prepare_article_fields( + self, + title: str, + content: Optional[str], + summary: Optional[str], + lang_hint: Optional[str] = None, + ) -> Dict[str, Any]: + title = title or "" + content = content or "" + summary = summary or "" + + source_language = self._detect_language(f"{title} {content}", hint=lang_hint) + translated = False + if source_language != "en": + title = self._translate_text(title, source_language) + content = self._translate_text(content, source_language) + summary = self._translate_text(summary, source_language) + translated = True + + return { + "title": self._normalize_text(title), + "content": self._normalize_text(content), + "summary": self._normalize_text(summary), + "language": source_language, + "translated": translated, + } + def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]: """Fetch news from CryptoCompare API""" articles = [] @@ -118,7 +255,6 @@ def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]: self._respect_rate_limit() params = { - "lang": "EN", "categories": "BTC,ETH,BLOCKCHAIN", "excludeCategories": "Sponsored", } @@ -145,11 +281,17 @@ def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]: # Parse articles for item in data.get("Data", [])[:limit]: try: - article = NewsArticle( - id=f"cc_{item['id']}", + parsed_fields = self._prepare_article_fields( title=item.get("title", ""), content=item.get("body", ""), summary=item.get("short_description", ""), + lang_hint=item.get("lang") or item.get("language"), + ) + article = NewsArticle( + id=f"cc_{item['id']}", + title=parsed_fields["title"], + content=parsed_fields["content"], + summary=parsed_fields["summary"], source=item.get("source", "Unknown"), url=item.get("url", ""), published_at=datetime.fromtimestamp( @@ -163,6 +305,8 @@ def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]: tags=( item.get("tags", "").split("|") if item.get("tags") else [] ), + language=parsed_fields["language"], + translated=parsed_fields["translated"], ) # Avoid duplicates @@ -194,7 +338,6 @@ def _fetch_newsapi(self, limit: int) -> List[NewsArticle]: params = { "q": "cryptocurrency OR blockchain OR bitcoin OR ethereum", - "language": "en", "sortBy": "publishedAt", "pageSize": min(limit, 100), # NewsAPI max is 100 "from": from_date.strftime("%Y-%m-%d"), @@ -218,11 +361,17 @@ def _fetch_newsapi(self, limit: int) -> List[NewsArticle]: item["publishedAt"].replace("Z", "+00:00") ) - article = NewsArticle( - id=f"na_{hash(item['url']) & 0xFFFFFFFF}", + parsed_fields = self._prepare_article_fields( title=item.get("title", ""), content=item.get("content", ""), summary=item.get("description", ""), + lang_hint=item.get("language") or item.get("lang"), + ) + article = NewsArticle( + id=f"na_{hash(item['url']) & 0xFFFFFFFF}", + title=parsed_fields["title"], + content=parsed_fields["content"], + summary=parsed_fields["summary"], source=item.get("source", {}).get("name", "Unknown"), url=item.get("url", ""), published_at=published_at, @@ -230,6 +379,8 @@ def _fetch_newsapi(self, limit: int) -> List[NewsArticle]: "crypto", "blockchain", ], # NewsAPI doesn't provide categories + language=parsed_fields["language"], + translated=parsed_fields["translated"], ) # Avoid duplicates diff --git a/apps/data-processing/src/ingestion/social_fetcher.py b/apps/data-processing/src/ingestion/social_fetcher.py index 9d68e91d..26a77b96 100644 --- a/apps/data-processing/src/ingestion/social_fetcher.py +++ b/apps/data-processing/src/ingestion/social_fetcher.py @@ -9,6 +9,7 @@ import os import re import time +import unicodedata from dataclasses import asdict, dataclass from datetime import datetime, timezone from enum import Enum @@ -17,8 +18,114 @@ import requests from requests.exceptions import RequestException +try: + from langdetect import DetectorFactory, LangDetectException, detect + + DetectorFactory.seed = 0 + LANGDETECT_AVAILABLE = True +except ImportError: + LANGDETECT_AVAILABLE = False + + class LangDetectException(Exception): + """Fallback exception when langdetect is unavailable.""" + +DEFAULT_TRANSLATION_API_URL = "https://libretranslate.de/translate" + logger = logging.getLogger(__name__) +def _normalize_text(text: Optional[str]) -> str: + """Normalize text for analytics ingestion.""" + if not text: + return "" + normalized = unicodedata.normalize("NFKC", text) + normalized = re.sub(r"\s+", " ", normalized).strip() + return normalized + +def _normalize_language_code(language: str) -> str: + """Normalize language codes like en_US to en.""" + if not language or not isinstance(language, str): + return "unknown" + normalized = language.strip().lower().replace("_", "-") + return normalized.split("-")[0] + +def _detect_script_language(text: str) -> Optional[str]: + if re.search(r"[\u4e00-\u9fff]", text): + return "zh" + if re.search(r"[\u3040-\u30ff]", text): + return "ja" + if re.search(r"[\uac00-\ud7af]", text): + return "ko" + if re.search(r"[\u0400-\u04ff]", text): + return "ru" + if re.search(r"[\u0600-\u06ff]", text): + return "ar" + return None + +def _detect_language(text: str, hint: Optional[str] = None) -> str: + """Detect the source language for a post.""" + if hint: + return _normalize_language_code(hint) + + if text and LANGDETECT_AVAILABLE: + try: + detected = detect(text) + return _normalize_language_code(detected) + except LangDetectException: + pass + + script_language = _detect_script_language(text or "") + if script_language: + return script_language + + return "en" + +def _translate_text(text: str, source_lang: str, session: requests.Session) -> str: + """Translate non-English text to English using a configurable endpoint.""" + if not text or source_lang == "en": + return text + + translation_url = ( + os.getenv("TRANSLATION_API_URL", "").strip() or DEFAULT_TRANSLATION_API_URL + ) + translation_key = os.getenv("TRANSLATION_API_KEY", "").strip() + + if not translation_url: + return text + + payload = { + "q": text, + "source": source_lang, + "target": "en", + "format": "text", + } + headers = {"Content-Type": "application/json"} + if translation_key: + headers["Authorization"] = f"Bearer {translation_key}" + + try: + response = session.post( + translation_url, + json=payload, + headers=headers, + timeout=10, + ) + if response.status_code != 200: + logger.warning( + "Translation API returned %s for source_lang=%s", + response.status_code, + source_lang, + ) + return text + + data = response.json() + return data.get("translatedText") or data.get("translation") or text + except RequestException as exc: + logger.warning("Translation request failed: %s", exc) + return text + except ValueError: + logger.warning("Translation API returned invalid JSON") + return text + class SocialPlatform(Enum): """Supported social media platforms""" @@ -47,6 +154,9 @@ class SocialPost: # Platform-specific metadata hashtags: Optional[List[str]] = None subreddit: Optional[str] = None + # Translation + language: str = "en" + translated: bool = False # Tracking fetched_at: datetime = None @@ -62,6 +172,8 @@ def to_dict(self) -> Dict: data["posted_at"] = self.posted_at.isoformat() data["fetched_at"] = self.fetched_at.isoformat() if self.fetched_at else None data["platform"] = self.platform + data["language"] = self.language + data["translated"] = self.translated return data def to_news_article_format(self) -> Dict: @@ -81,6 +193,8 @@ def to_news_article_format(self) -> Dict: "tags": self.hashtags or [], "platform": self.platform, "author": self.author, + "language": self.language, + "translated": self.translated, "engagement": { "likes": self.likes, "comments": self.comments, @@ -235,14 +349,18 @@ def fetch_hashtag( posts = [] - # Normalize hashtag + # Normalize hashtag and allow non-English posts query = hashtag if hashtag.startswith("#") else f"#{hashtag}" - query = f"{query} -is:retweet lang:en" # Exclude retweets, English only + twitter_lang = os.getenv("TWITTER_SEARCH_LANG", "").strip() + if twitter_lang: + query = f"{query} -is:retweet lang:{twitter_lang}" + else: + query = f"{query} -is:retweet" params = { "query": query, "max_results": min(limit, 100), # Twitter max is 100 per request - "tweet.fields": "created_at,public_metrics,entities,author_id", + "tweet.fields": "created_at,public_metrics,entities,author_id,lang", "expansions": "author_id", "user.fields": "username,name", } @@ -284,10 +402,22 @@ def fetch_hashtag( entities = tweet.get("entities", {}) hashtags = [f"#{tag['tag']}" for tag in entities.get("hashtags", [])] + raw_content = tweet.get("text", "") + lang_hint = tweet.get("lang") + source_language = _detect_language(raw_content, hint=lang_hint) + translated = False + + content_to_use = raw_content + if source_language != "en": + content_to_use = _translate_text(raw_content, source_language, self.session) + translated = True + + content_to_use = _normalize_text(content_to_use) + post = SocialPost( id=tweet["id"], platform=SocialPlatform.TWITTER.value, - content=tweet.get("text", ""), + content=content_to_use, author=user.get("username", "unknown"), posted_at=datetime.fromisoformat(tweet["created_at"].replace("Z", "+00:00")), url=f"https://twitter.com/user/status/{tweet['id']}", @@ -295,6 +425,8 @@ def fetch_hashtag( comments=metrics.get("reply_count", 0), shares=metrics.get("retweet_count", 0), hashtags=hashtags, + language=source_language, + translated=translated, ) posts.append(post) @@ -402,11 +534,22 @@ def fetch_subreddit( # Parse posts for child in data.get("data", {}).get("children", [])[:limit]: post_data = child.get("data", {}) + + raw_content = post_data.get("selftext", "") or post_data.get("title", "") + source_language = _detect_language(raw_content) + translated = False + + content_to_use = raw_content + if source_language != "en": + content_to_use = _translate_text(raw_content, source_language, self.session) + translated = True + + content_to_use = _normalize_text(content_to_use) post = SocialPost( id=post_data.get("id", ""), platform=SocialPlatform.REDDIT.value, - content=post_data.get("selftext", "") or post_data.get("title", ""), + content=content_to_use, author=post_data.get("author", "[deleted]"), posted_at=datetime.fromtimestamp(post_data.get("created_utc", time.time()), tz=timezone.utc), url=f"https://reddit.com{post_data.get('permalink', '')}", @@ -415,6 +558,8 @@ def fetch_subreddit( shares=post_data.get("num_crossposts", 0), subreddit=post_data.get("subreddit", subreddit), hashtags=self._extract_hashtags(post_data), + language=source_language, + translated=translated, ) posts.append(post) @@ -472,10 +617,21 @@ def fetch_search( for child in data.get("data", {}).get("children", [])[:limit]: post_data = child.get("data", {}) + raw_content = post_data.get("selftext", "") or post_data.get("title", "") + source_language = _detect_language(raw_content) + translated = False + + content_to_use = raw_content + if source_language != "en": + content_to_use = _translate_text(raw_content, source_language, self.session) + translated = True + + content_to_use = _normalize_text(content_to_use) + post = SocialPost( id=post_data.get("id", ""), platform=SocialPlatform.REDDIT.value, - content=post_data.get("selftext", "") or post_data.get("title", ""), + content=content_to_use, author=post_data.get("author", "[deleted]"), posted_at=datetime.fromtimestamp(post_data.get("created_utc", time.time()), tz=timezone.utc), url=f"https://reddit.com{post_data.get('permalink', '')}", @@ -483,6 +639,8 @@ def fetch_search( comments=post_data.get("num_comments", 0), shares=post_data.get("num_crossposts", 0), subreddit=post_data.get("subreddit", ""), + language=source_language, + translated=translated, ) posts.append(post) @@ -657,6 +815,8 @@ def fetch_as_articles( shares=p.get("shares", 0), hashtags=p.get("hashtags", []), subreddit=p.get("subreddit"), + language=p.get("language", "en"), + translated=p.get("translated", False), ).to_news_article_format() for p in posts ] diff --git a/apps/data-processing/src/main.py b/apps/data-processing/src/main.py index 8b3949a9..419e225e 100644 --- a/apps/data-processing/src/main.py +++ b/apps/data-processing/src/main.py @@ -108,10 +108,14 @@ def run_data_pipeline(): sentiment_analyzer = SentimentAnalyzer() if news_articles: - article_texts = [ - (a.get("title", "") + " " + a.get("summary", "")).strip() - for a in news_articles - ] + article_texts = [] + for a in news_articles: + title = a.get("title", "") or "" + summary = a.get("summary", "") or "" + content = a.get("content", "") or "" + article_texts.append( + " ".join([title, summary, content]).strip() + ) sentiment_results = sentiment_analyzer.analyze_batch_parallel(article_texts) summary = sentiment_analyzer.get_sentiment_summary(sentiment_results) avg_sentiment = summary["average_compound_score"] diff --git a/apps/data-processing/src/sentiment.py b/apps/data-processing/src/sentiment.py index f1b490bd..668e79c1 100644 --- a/apps/data-processing/src/sentiment.py +++ b/apps/data-processing/src/sentiment.py @@ -2,8 +2,10 @@ Sentiment analyzer module - analyzes sentiment of news articles """ -import os import logging +import os +import re +import unicodedata from typing import List, Dict, Any, Optional, Tuple from concurrent.futures import ProcessPoolExecutor from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer @@ -12,6 +14,137 @@ # Import keyword extractor for asset filtering from src.analytics.keywords import KeywordExtractor +try: + from langdetect import DetectorFactory, LangDetectException, detect + DetectorFactory.seed = 0 + LANGDETECT_AVAILABLE = True +except ImportError: + LANGDETECT_AVAILABLE = False + + class LangDetectException(Exception): + """Fallback exception when langdetect is unavailable.""" + +_DEFAULT_TRANSLATION_MODEL = "Helsinki-NLP/opus-mt-mul-en" + + +def _normalize_text(text: str) -> str: + normalized = unicodedata.normalize("NFKC", text or "") + normalized = re.sub(r"\s+", " ", normalized).strip() + return normalized + + +def _normalize_language_code(language: str) -> str: + if not language or not isinstance(language, str): + return "unknown" + normalized = language.strip().lower().replace("_", "-") + return normalized.split("-")[0] + + +def _detect_script_language(text: str) -> Optional[str]: + if re.search(r"[\u4e00-\u9fff]", text): + return "zh" + if re.search(r"[\u3040-\u30ff]", text): + return "ja" + if re.search(r"[\uac00-\ud7af]", text): + return "ko" + if re.search(r"[\u0400-\u04ff]", text): + return "ru" + if re.search(r"[\u0600-\u06ff]", text): + return "ar" + return None + + +def _detect_language(text: str, hint: Optional[str] = None) -> str: + if hint: + return _normalize_language_code(hint) + + script_language = _detect_script_language(text or "") + if script_language: + return script_language + + if LANGDETECT_AVAILABLE and text: + try: + detected = detect(text) + return _normalize_language_code(detected) + except LangDetectException: + pass + + return "en" + + +class TranslationService: + """Translate non-English text to English before sentiment analysis.""" + + def __init__(self): + self._translation_disabled = os.environ.get( + "TRANSLATION_DISABLE_MODEL", "" + ).strip().lower() in ("1", "true", "yes", "on") + self._model_name = os.environ.get( + "TRANSLATION_MODEL_NAME", _DEFAULT_TRANSLATION_MODEL + ).strip() or _DEFAULT_TRANSLATION_MODEL + self._tokenizer = None + self._model = None + self._load_failed = False + + def _load(self) -> bool: + if self._translation_disabled or self._load_failed: + return False + if self._model is not None and self._tokenizer is not None: + return True + + try: + from transformers import AutoModelForSeq2SeqLM, AutoTokenizer + + self._tokenizer = AutoTokenizer.from_pretrained(self._model_name) + self._model = AutoModelForSeq2SeqLM.from_pretrained(self._model_name) + self._model.eval() + return True + except Exception as exc: + logging.getLogger(__name__).warning( + "Translation model unavailable or failed to load: %s", + exc, + ) + self._load_failed = True + return False + + def translate(self, text: str) -> Optional[str]: + if not text or self._translation_disabled: + return None + if not self._load(): + return None + + try: + inputs = self._tokenizer( + text, + return_tensors="pt", + truncation=True, + max_length=512, + padding=True, + ) + outputs = self._model.generate(**inputs, max_length=512, num_beams=2) + return self._tokenizer.batch_decode(outputs, skip_special_tokens=True)[0] + except Exception as exc: + logging.getLogger(__name__).warning( + "Translation inference failed: %s", exc + ) + self._load_failed = True + return None + + +_translation_service = TranslationService() + + +def _translate_if_needed(text: str, language_hint: Optional[str] = None) -> tuple[str, str, bool]: + source_language = _detect_language(text, hint=language_hint) + if source_language == "en": + return text, source_language, False + + translated = _translation_service.translate(text) + if translated: + return translated, source_language, True + + return text, source_language, False + logger = logging.getLogger(__name__) # Minimum batch size to justify spawning worker processes. @@ -27,8 +160,12 @@ def _analyze_in_worker(args: Tuple[str, Optional[str]]) -> dict: """ text, asset_filter = args + cleaned_text = _normalize_text(text) + translated_text, language, translated = _translate_if_needed(cleaned_text) + text_to_analyze = translated_text if translated_text else cleaned_text + extractor = KeywordExtractor() - asset_codes = extractor.extract_tickers_only(text) + asset_codes = extractor.extract_tickers_only(text_to_analyze) if asset_filter: asset_filter = asset_filter.upper() @@ -41,10 +178,12 @@ def _analyze_in_worker(args: Tuple[str, Optional[str]]) -> dict: "neutral": 1.0, "sentiment_label": "neutral", "asset_codes": [], + "language": language, + "translated": translated, } analyzer = SentimentIntensityAnalyzer() - scores = analyzer.polarity_scores(text) + scores = analyzer.polarity_scores(text_to_analyze) compound = scores["compound"] if compound >= 0.05: @@ -62,6 +201,8 @@ def _analyze_in_worker(args: Tuple[str, Optional[str]]) -> dict: "neutral": scores["neu"], "sentiment_label": label, "asset_codes": asset_codes, + "language": language, + "translated": translated, } @@ -76,6 +217,8 @@ class SentimentResult: neutral: float # 0 to 1 sentiment_label: str # 'positive', 'negative', 'neutral' asset_codes: List[str] = None # List of asset codes mentioned in text + language: str = "unknown" + translated: bool = False def __post_init__(self): if self.asset_codes is None: @@ -90,6 +233,8 @@ def to_dict(self) -> Dict[str, Any]: "neutral": self.neutral, "sentiment_label": self.sentiment_label, "asset_codes": self.asset_codes, + "language": self.language, + "translated": self.translated, } @@ -123,9 +268,13 @@ def analyze(self, text: str, asset_filter: Optional[str] = None) -> SentimentRes Returns: SentimentResult object """ - # Extract asset codes from text - asset_codes = self.keyword_extractor.extract_tickers_only(text) - + cleaned_text = _normalize_text(text or "") + translated_text, language, translated = _translate_if_needed(cleaned_text) + text_to_analyze = translated_text if translated_text else cleaned_text + + # Extract asset codes from text after translation/normalization + asset_codes = self.keyword_extractor.extract_tickers_only(text_to_analyze) + # If asset_filter is specified, check if text mentions that asset if asset_filter: asset_filter = asset_filter.upper() @@ -139,15 +288,17 @@ def analyze(self, text: str, asset_filter: Optional[str] = None) -> SentimentRes neutral=1.0, sentiment_label="neutral", asset_codes=[], + language=language, + translated=translated, ) - + cache_key = f"{text}:{asset_filter}" if asset_filter else text if self.cache: cached = self.cache.get(cache_key) if cached: return SentimentResult(**cached) - scores = self.analyzer.polarity_scores(text) + scores = self.analyzer.polarity_scores(text_to_analyze) compound = scores["compound"] if compound >= 0.05: label = "positive" @@ -164,6 +315,8 @@ def analyze(self, text: str, asset_filter: Optional[str] = None) -> SentimentRes neutral=scores["neu"], sentiment_label=label, asset_codes=asset_codes, + language=language, + translated=translated, ) if self.cache: diff --git a/apps/data-processing/src/validators.py b/apps/data-processing/src/validators.py index bfa7cd8d..24830b2e 100644 --- a/apps/data-processing/src/validators.py +++ b/apps/data-processing/src/validators.py @@ -17,10 +17,12 @@ class NewsArticle(BaseModel): id: str title: str - content: str + content: Optional[str] = None + summary: Optional[str] = None published_at: str # ISO8601 string source: Optional[str] url: Optional[str] + language: Optional[str] = None @validator("published_at") def validate_published_at(cls, v): diff --git a/apps/data-processing/tests/test_cache.py b/apps/data-processing/tests/test_cache.py index 03a4b3a3..53c81fab 100644 --- a/apps/data-processing/tests/test_cache.py +++ b/apps/data-processing/tests/test_cache.py @@ -94,6 +94,13 @@ def test_sentiment_analysis_with_caching(self): self.assertEqual(result1.compound_score, result2.compound_score) self.assertEqual(result1.sentiment_label, result2.sentiment_label) + def test_non_english_text_is_detected_and_analyzed(self): + """Test that non-English text is detected and analyzed.""" + result = self.analyzer.analyze("Bitcoin sube con fuerte rally en el mercado") + + self.assertEqual(result.language, "es") + self.assertIn(result.sentiment_label, {"positive", "negative", "neutral"}) + def test_different_texts_not_cached_together(self): r1 = self.analyzer.analyze("This is a positive news article.") r2 = self.analyzer.analyze("This is a negative news article.") diff --git a/apps/data-processing/tests/test_multilingual_sentiment.py b/apps/data-processing/tests/test_multilingual_sentiment.py new file mode 100644 index 00000000..3684335d --- /dev/null +++ b/apps/data-processing/tests/test_multilingual_sentiment.py @@ -0,0 +1,257 @@ +""" +Unit tests for multilingual sentiment analysis and text normalization. +Tests sentiment analysis on non-English articles with language detection, +translation, and normalization capabilities. +""" + +import unittest +from unittest.mock import Mock, patch +import os + +from src.sentiment import ( + SentimentAnalyzer, + SentimentResult, + _normalize_text, + _normalize_language_code, + _detect_language, + _detect_script_language, + TranslationService, +) + + +class TestTextNormalization(unittest.TestCase): + """Test text normalization for multilingual content.""" + + def test_normalize_text_removes_extra_whitespace(self): + """Test that normalization removes extra whitespace.""" + text = " Bitcoin news today \n\n extra spaces " + normalized = _normalize_text(text) + self.assertEqual(normalized, "Bitcoin news today extra spaces") + + def test_normalize_text_handles_unicode_normalization(self): + """Test that Unicode is properly normalized.""" + text = "Cripto\u00a0noticias" # Contains non-breaking space + normalized = _normalize_text(text) + self.assertNotIn("\u00a0", normalized) + self.assertIn("Cripto", normalized) + self.assertIn("noticias", normalized) + + def test_normalize_language_code_extracts_base_language(self): + """Test language code normalization.""" + self.assertEqual(_normalize_language_code("en_US"), "en") + self.assertEqual(_normalize_language_code("en-US"), "en") + self.assertEqual(_normalize_language_code("es"), "es") + self.assertEqual(_normalize_language_code("ES"), "es") + self.assertEqual(_normalize_language_code("pt_BR"), "pt") + + def test_normalize_language_code_handles_invalid(self): + """Test language code normalization with invalid input.""" + self.assertEqual(_normalize_language_code(None), "unknown") + self.assertEqual(_normalize_language_code(""), "unknown") + self.assertEqual(_normalize_language_code(" "), "unknown") + + +class TestScriptLanguageDetection(unittest.TestCase): + """Test script-based language detection for non-Latin scripts.""" + + def test_detect_chinese(self): + """Test detection of Chinese script.""" + text = "这是一个测试" # Chinese + result = _detect_script_language(text) + self.assertEqual(result, "zh") + + def test_detect_japanese(self): + """Test detection of Japanese script.""" + text = "これはテストです" # Japanese hiragana + result = _detect_script_language(text) + self.assertEqual(result, "ja") + + def test_detect_korean(self): + """Test detection of Korean script.""" + text = "이것은 테스트입니다" # Korean + result = _detect_script_language(text) + self.assertEqual(result, "ko") + + def test_detect_russian(self): + """Test detection of Russian script.""" + text = "Это тестовый текст" # Russian + result = _detect_script_language(text) + self.assertEqual(result, "ru") + + def test_detect_arabic(self): + """Test detection of Arabic script.""" + text = "هذا نص تجريبي" # Arabic + result = _detect_script_language(text) + self.assertEqual(result, "ar") + + def test_detect_latin_script_returns_none(self): + """Test that Latin script returns None (requires detection).""" + text = "This is English text" + result = _detect_script_language(text) + self.assertIsNone(result) + + +class TestLanguageDetection(unittest.TestCase): + """Test language detection from text content.""" + + def test_detect_language_with_hint(self): + """Test language detection uses provided hint.""" + text = "Some text" + result = _detect_language(text, hint="es") + self.assertEqual(result, "es") + + def test_detect_language_from_script(self): + """Test language detection from script.""" + text = "这是中文" # Chinese + result = _detect_language(text) + self.assertEqual(result, "zh") + + def test_detect_language_defaults_to_english(self): + """Test that unknown content defaults to English.""" + text = "abc xyz" + result = _detect_language(text) + self.assertEqual(result, "en") + + +class TestMultilingualSentimentAnalysis(unittest.TestCase): + """Test sentiment analysis on multilingual content.""" + + def setUp(self): + self.analyzer = SentimentAnalyzer() + + def test_english_sentiment_analysis(self): + """Test English sentiment analysis.""" + result = self.analyzer.analyze("Bitcoin is soaring to the moon!") + self.assertEqual(result.language, "en") + self.assertGreater(result.compound_score, 0) + + def test_spanish_sentiment_analysis(self): + """Test Spanish sentiment analysis.""" + result = self.analyzer.analyze("Bitcoin sube con fuerte rally") + self.assertEqual(result.language, "es") + self.assertGreater(result.compound_score, 0) + + def test_portuguese_sentiment_analysis(self): + """Test Portuguese sentiment analysis.""" + result = self.analyzer.analyze("Bitcoin sobe em alta no mercado") + self.assertEqual(result.language, "pt") + self.assertGreater(result.compound_score, 0) + + def test_sentiment_result_includes_language_metadata(self): + """Test that sentiment results include language and translation info.""" + result = self.analyzer.analyze("Bitcoin is rising") + self.assertIsInstance(result, SentimentResult) + self.assertIsNotNone(result.language) + self.assertIsNotNone(result.translated) + self.assertFalse(result.translated) # English should not be translated + + def test_analyze_batch_with_mixed_languages(self): + """Test batch analysis with mixed language content.""" + texts = [ + "Bitcoin is crashing", # English + "Bitcoin sube rápidamente", # Spanish + "Bitcoin está caindo", # Portuguese + ] + results = self.analyzer.analyze_batch(texts) + self.assertEqual(len(results), 3) + languages = [r.language for r in results] + self.assertIn("en", languages) + self.assertIn("es", languages) + self.assertIn("pt", languages) + + def test_sentiment_result_to_dict(self): + """Test that SentimentResult properly serializes to dict.""" + result = self.analyzer.analyze("Bitcoin rises") + result_dict = result.to_dict() + self.assertIn("language", result_dict) + self.assertIn("translated", result_dict) + self.assertIn("compound_score", result_dict) + self.assertIn("sentiment_label", result_dict) + + +class TestTranslationService(unittest.TestCase): + """Test the TranslationService for text translation.""" + + def setUp(self): + self.service = TranslationService() + + def test_translation_disabled_by_default(self): + """Test that translation is optional and doesn't crash if model unavailable.""" + # Should not raise, just return None + result = self.service.translate("Hola mundo") + # Result can be None if model not available + self.assertTrue(result is None or isinstance(result, str)) + + def test_translation_service_environment_variable(self): + """Test translation service respects environment variables.""" + # Test that TRANSLATION_DISABLE_MODEL works + original = os.environ.get("TRANSLATION_DISABLE_MODEL") + try: + os.environ["TRANSLATION_DISABLE_MODEL"] = "true" + service = TranslationService() + result = service.translate("test") + self.assertIsNone(result) + finally: + if original: + os.environ["TRANSLATION_DISABLE_MODEL"] = original + else: + os.environ.pop("TRANSLATION_DISABLE_MODEL", None) + + def test_empty_text_returns_none(self): + """Test translation of empty text.""" + result = self.service.translate("") + self.assertIsNone(result) + + def test_translation_disabled_returns_none(self): + """Test that disabled translation service returns None.""" + service = TranslationService() + service._translation_disabled = True + result = service.translate("Test text") + self.assertIsNone(result) + + +class TestMultilingualNormalizationPipeline(unittest.TestCase): + """Integration tests for the full multilingual normalization pipeline.""" + + def setUp(self): + self.analyzer = SentimentAnalyzer() + + def test_pipeline_normalizes_spanish_article(self): + """Test full pipeline on Spanish content.""" + spanish_text = " Bitcoin está en aumento hoy! " + result = self.analyzer.analyze(spanish_text) + self.assertEqual(result.language, "es") + self.assertIsNotNone(result.compound_score) + self.assertIsInstance(result.text, str) + + def test_pipeline_handles_mixed_scripts(self): + """Test pipeline with text containing mixed scripts.""" + mixed_text = "Bitcoin 比特币 is rising" + result = self.analyzer.analyze(mixed_text) + self.assertIsNotNone(result.language) + self.assertIsNotNone(result.compound_score) + + def test_pipeline_preserves_asset_detection_after_translation(self): + """Test that asset codes are still detected after translation.""" + text = "Bitcoin y Ethereum suben juntos" + result = self.analyzer.analyze(text) + # Asset extraction happens on normalized text + self.assertIsNotNone(result.asset_codes) + + def test_batch_parallel_with_multilingual_content(self): + """Test parallel batch analysis with multilingual content.""" + texts = [ + "Bitcoin is amazing" * 100, # English, long + "Bitcoin es increíble" * 100, # Spanish, long + "Bitcoin é incrível" * 100, # Portuguese, long + ] + results = self.analyzer.analyze_batch_parallel(texts) + self.assertEqual(len(results), 3) + for result in results: + self.assertIsNotNone(result.language) + self.assertIsNotNone(result.compound_score) + self.assertIsNotNone(result.translated) + + +if __name__ == "__main__": + unittest.main() diff --git a/apps/data-processing/tests/test_news_fetcher.py b/apps/data-processing/tests/test_news_fetcher.py index cb216263..f86c11e2 100644 --- a/apps/data-processing/tests/test_news_fetcher.py +++ b/apps/data-processing/tests/test_news_fetcher.py @@ -111,6 +111,80 @@ def test_fetch_newsapi_success(self, mock_get): fetcher.close() + @patch("src.ingestion.news_fetcher.requests.Session.get") + def test_fetch_newsapi_default_all_languages(self, mock_get): + """Test NewsAPI fetch does not restrict to English by default.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = self.mock_newsapi_response + mock_get.return_value = mock_response + + fetcher = NewsFetcher(use_cryptocompare=False) + fetcher._fetch_newsapi(limit=5) + + self.assertEqual(mock_get.call_count, 1) + params = mock_get.call_args[1]["params"] + self.assertNotIn("language", params) + + fetcher.close() + + @patch("src.ingestion.news_fetcher.requests.Session.post") + @patch("src.ingestion.news_fetcher.requests.Session.get") + def test_translate_non_english_article(self, mock_get, mock_post): + """Test translation of non-English news content before analytics.""" + # Mock translation endpoint response + mock_translate_response = Mock() + mock_translate_response.status_code = 200 + mock_translate_response.json.return_value = { + "translatedText": "Bitcoin reaches new all-time high today..." + } + mock_post.return_value = mock_translate_response + + # Mock CryptoCompare response with a Spanish language hint + response = Mock() + response.status_code = 200 + response.json.return_value = { + "Type": 100, + "Data": [ + { + "id": "12345", + "title": "Bitcoin alcanza un nuevo máximo histórico", + "body": "Bitcoin alcanzó un nuevo máximo histórico hoy...", + "short_description": "BTC alcanza ATH", + "source": "CryptoNews", + "url": "https://example.com/btc-ath-es", + "published_on": 1672531200, + "categories": "BTC|Market", + "tags": "Bitcoin|Precio", + "lang": "es", + } + ], + } + mock_get.return_value = response + + os.environ["TRANSLATION_API_URL"] = "https://example.com/translate" + fetcher = NewsFetcher(use_newsapi=False) + + articles = fetcher._fetch_cryptocompare(limit=5) + self.assertEqual(len(articles), 1) + self.assertEqual(articles[0].language, "es") + self.assertTrue(articles[0].translated) + self.assertEqual(articles[0].title, "Bitcoin reaches new all-time high today...") + + fetcher.close() + + def test_normalize_text_handles_unicode_and_whitespace(self): + """Test that normalization cleans unicode and whitespace.""" + from src.ingestion.news_fetcher import NewsFetcher + + os.environ["TRANSLATION_API_URL"] = "https://example.com/translate" + fetcher = NewsFetcher(use_cryptocompare=False, use_newsapi=False) + + normalized = fetcher._normalize_text(" Cripto\u00a0noticias \n hoy ") + self.assertEqual(normalized, "Cripto noticias hoy") + + fetcher.close() + # @patch('src.ingestion.news_fetcher.requests.Session.get') # def test_fetch_latest_combined(self, mock_get): # """Test combined fetch from both APIs"""