Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/data-processing/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import re
from src.ingestion.social_fetcher import *

print("Patching complete.")
26 changes: 17 additions & 9 deletions apps/data-processing/src/ingestion/news_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"""
import hashlib
import json
import logging
import re
import unicodedata
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Set
from pathlib import Path
import logging
from typing import Dict, List, Optional, Set

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,6 +43,12 @@ def __init__(self, deduplication_window_days: int = 7, storage_path: str = "./da

logger.info(f"Initialized NewsDeduplicator with window of {deduplication_window_days} days")

def _clean_text(self, text: str) -> str:
normalized = unicodedata.normalize("NFKD", text or "")
normalized = normalized.encode("ascii", "ignore").decode("ascii")
normalized = re.sub(r"\s+", " ", normalized).strip().lower()
return normalized

def _normalize_article(self, article: Dict) -> str:
"""
Normalize article content for consistent hashing
Expand All @@ -52,16 +60,16 @@ def _normalize_article(self, article: Dict) -> str:
Normalized string representation of the article
"""
# Extract and normalize key fields
title = (article.get('title') or '').strip().lower()
content = (article.get('content') or '').strip().lower()
url = (article.get('url') or '').strip().lower()
title = self._clean_text(article.get("title") or "")
content = self._clean_text(article.get("content") or "")
url = self._clean_text(article.get("url") or "")

# Create a canonical representation
canonical_data = {
'title': title,
'content': content,
'url': url,
'source': (article.get('source') or '').strip().lower(),
"title": title,
"content": content,
"url": url,
"source": self._clean_text(article.get("source") or ""),
}

# Convert to JSON string for consistent hashing
Expand Down
169 changes: 160 additions & 9 deletions apps/data-processing/src/ingestion/news_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,31 @@
Fetches data from external APIs and standardizes the format.
"""

import os
import json
import os
import re
import time
from typing import List, Dict, Optional
import unicodedata
from dataclasses import dataclass, asdict
from .news_deduplicator import NewsDeduplicator
from datetime import datetime
from typing import Any, Dict, List, Optional
from .news_deduplicator import NewsDeduplicator
import requests
from requests.exceptions import RequestException, Timeout

try:
from langdetect import DetectorFactory, LangDetectException, detect

DetectorFactory.seed = 0
LANGDETECT_AVAILABLE = True
except ImportError:
LANGDETECT_AVAILABLE = False

class LangDetectException(Exception):
"""Fallback exception when langdetect is unavailable."""

DEFAULT_TRANSLATION_API_URL = "https://libretranslate.de/translate"


@dataclass
class NewsArticle:
Expand All @@ -28,6 +43,8 @@ class NewsArticle:
categories: List[str]
sentiment_score: Optional[float] = None # To be filled by sentiment engine
tags: Optional[List[str]] = None
language: str = "en"
translated: bool = False

def to_dict(self) -> Dict:
"""Convert to dictionary with serialized datetime"""
Expand Down Expand Up @@ -110,6 +127,126 @@ def _handle_api_error(self, response: requests.Response, api_name: str) -> None:
else:
response.raise_for_status()

def _normalize_text(self, text: Optional[str]) -> str:
"""Normalize text for analytics ingestion."""
if not text:
return ""
normalized = unicodedata.normalize("NFKC", text)
normalized = re.sub(r"\s+", " ", normalized).strip()
return normalized

def _normalize_language_code(self, language: str) -> str:
"""Normalize language codes like en_US to en."""
if not language or not isinstance(language, str):
return "unknown"
normalized = language.strip().lower().replace("_", "-")
return normalized.split("-")[0]

def _detect_script_language(self, text: str) -> Optional[str]:
if re.search(r"[\u4e00-\u9fff]", text):
return "zh"
if re.search(r"[\u3040-\u30ff]", text):
return "ja"
if re.search(r"[\uac00-\ud7af]", text):
return "ko"
if re.search(r"[\u0400-\u04ff]", text):
return "ru"
if re.search(r"[\u0600-\u06ff]", text):
return "ar"
return None

def _detect_language(self, text: str, hint: Optional[str] = None) -> str:
"""Detect the source language for an article."""
if hint:
return self._normalize_language_code(hint)

if text and LANGDETECT_AVAILABLE:
try:
detected = detect(text)
return self._normalize_language_code(detected)
except LangDetectException:
pass

script_language = self._detect_script_language(text or "")
if script_language:
return script_language

return "en"

def _translate_text(self, text: str, source_lang: str) -> str:
"""Translate non-English text to English using a configurable endpoint."""
if not text or source_lang == "en":
return text

translation_url = (
os.getenv("TRANSLATION_API_URL", "").strip() or DEFAULT_TRANSLATION_API_URL
)
translation_key = os.getenv("TRANSLATION_API_KEY", "").strip()

if not translation_url:
return text

payload = {
"q": text,
"source": source_lang,
"target": "en",
"format": "text",
}
headers = {"Content-Type": "application/json"}
if translation_key:
headers["Authorization"] = f"Bearer {translation_key}"

try:
response = self.session.post(
translation_url,
json=payload,
headers=headers,
timeout=APIConfig.TIMEOUT,
)
if response.status_code != 200:
logger.warning(
"Translation API returned %s for source_lang=%s",
response.status_code,
source_lang,
)
return text

data = response.json()
return data.get("translatedText") or data.get("translation") or text
except RequestException as exc:
logger.warning("Translation request failed: %s", exc)
return text
except ValueError:
logger.warning("Translation API returned invalid JSON")
return text

def _prepare_article_fields(
self,
title: str,
content: Optional[str],
summary: Optional[str],
lang_hint: Optional[str] = None,
) -> Dict[str, Any]:
title = title or ""
content = content or ""
summary = summary or ""

source_language = self._detect_language(f"{title} {content}", hint=lang_hint)
translated = False
if source_language != "en":
title = self._translate_text(title, source_language)
content = self._translate_text(content, source_language)
summary = self._translate_text(summary, source_language)
translated = True

return {
"title": self._normalize_text(title),
"content": self._normalize_text(content),
"summary": self._normalize_text(summary),
"language": source_language,
"translated": translated,
}

def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]:
"""Fetch news from CryptoCompare API"""
articles = []
Expand All @@ -118,7 +255,6 @@ def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]:
self._respect_rate_limit()

params = {
"lang": "EN",
"categories": "BTC,ETH,BLOCKCHAIN",
"excludeCategories": "Sponsored",
}
Expand All @@ -145,11 +281,17 @@ def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]:
# Parse articles
for item in data.get("Data", [])[:limit]:
try:
article = NewsArticle(
id=f"cc_{item['id']}",
parsed_fields = self._prepare_article_fields(
title=item.get("title", ""),
content=item.get("body", ""),
summary=item.get("short_description", ""),
lang_hint=item.get("lang") or item.get("language"),
)
article = NewsArticle(
id=f"cc_{item['id']}",
title=parsed_fields["title"],
content=parsed_fields["content"],
summary=parsed_fields["summary"],
source=item.get("source", "Unknown"),
url=item.get("url", ""),
published_at=datetime.fromtimestamp(
Expand All @@ -163,6 +305,8 @@ def _fetch_cryptocompare(self, limit: int) -> List[NewsArticle]:
tags=(
item.get("tags", "").split("|") if item.get("tags") else []
),
language=parsed_fields["language"],
translated=parsed_fields["translated"],
)

# Avoid duplicates
Expand Down Expand Up @@ -194,7 +338,6 @@ def _fetch_newsapi(self, limit: int) -> List[NewsArticle]:

params = {
"q": "cryptocurrency OR blockchain OR bitcoin OR ethereum",
"language": "en",
"sortBy": "publishedAt",
"pageSize": min(limit, 100), # NewsAPI max is 100
"from": from_date.strftime("%Y-%m-%d"),
Expand All @@ -218,18 +361,26 @@ def _fetch_newsapi(self, limit: int) -> List[NewsArticle]:
item["publishedAt"].replace("Z", "+00:00")
)

article = NewsArticle(
id=f"na_{hash(item['url']) & 0xFFFFFFFF}",
parsed_fields = self._prepare_article_fields(
title=item.get("title", ""),
content=item.get("content", ""),
summary=item.get("description", ""),
lang_hint=item.get("language") or item.get("lang"),
)
article = NewsArticle(
id=f"na_{hash(item['url']) & 0xFFFFFFFF}",
title=parsed_fields["title"],
content=parsed_fields["content"],
summary=parsed_fields["summary"],
source=item.get("source", {}).get("name", "Unknown"),
url=item.get("url", ""),
published_at=published_at,
categories=[
"crypto",
"blockchain",
], # NewsAPI doesn't provide categories
language=parsed_fields["language"],
translated=parsed_fields["translated"],
)

# Avoid duplicates
Expand Down
Loading
Loading