A comprehensive Twitter-style recommendation system implementing advanced algorithms for social media viral propagation simulation. Built with dual-tower architecture, exploration-exploitation balance, and comprehensive A/B testing framework.
git clone https://github.com/Social-Arena/Recommendation.git
cd Recommendation
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate
# Initialize submodules (Agent and Feed libraries)
git submodule update --init --recursive
# Install the package and all dependencies
pip install -e .
pip install -e external/Feed
pip install -e external/Agentfrom recommendation import CentralizedRecommendationSystem, BalancedStrategy
import feed
from agent import Agent
# Initialize recommendation system
rec_system = CentralizedRecommendationSystem(
strategy=BalancedStrategy(explore_ratio=0.2)
)
# Create agents
agent = Agent(agent_id="001", username="alice", bio="Tech enthusiast")
rec_system.add_agent("001", {"interests": ["tech", "AI"]})
# Create and ingest content
new_feed = feed.Feed(
id=feed.generate_feed_id(),
text="Hello Social Arena! #AI #Python",
author_id="001",
feed_type="post"
)
rec_system.ingest_feed(new_feed)
# Get personalized recommendations
recommendations = rec_system.fetch("001", {"max_feeds": 10})
print(f"Showing {len(recommendations['feeds'])} personalized feeds")- 7-Stage Pipeline: Complete replication of Twitter's recommendation system
- Candidate Generation (In-Network + Out-of-Network)
- Light Ranking (Fast scoring of candidates)
- Heavy Ranking (Multi-task deep learning model)
- Exploration Engine (Ξ΅-greedy, UCB, Thompson Sampling)
- Diversity Injection (Content and author diversity)
- Safety Filtering (Content moderation)
- Real-time Serving (Low-latency delivery)
- Dual-Tower Model: Separate user and content embedding towers
- Multi-Task Learning: Simultaneous optimization for engagement, satisfaction, and safety
- Real-Time Inference: Sub-100ms response times
- Scalable Design: Handles millions of candidates efficiently
- A/B Testing: Hot-swappable recommendation strategies
- Multi-Armed Bandits: Dynamic strategy selection
- Performance Monitoring: Real-time metrics and alerts
- Strategy Comparison: Side-by-side algorithm evaluation
Recommendation/
βββ recommendation/ # Core recommendation package
β βββ __init__.py # Package exports
β βββ base.py # Base classes and protocols
β βββ system.py # Main recommendation system
β βββ strategies.py # Ranking strategies
β βββ example.py # Usage example
βββ external/ # External dependencies
β βββ Agent/ # AI agent framework
β βββ Feed/ # Twitter data structures
βββ utils/ # Logging and utilities
β βββ logger.py # Centralized logging
β βββ decorators.py # Performance tracking
β βββ log_analyzer.py # Log analysis tools
β βββ trace_request.py # Request tracing
βββ trace/ # Runtime logs
βββ logs/ # Component-specific logs
# 1. Candidate Generation
candidates = candidate_generator.generate(
user_id="user_123",
in_network_size=1000,
out_network_size=500
)
# 2. Light Ranking
light_scores = light_ranker.score(candidates, user_features)
# 3. Heavy Ranking
heavy_scores = heavy_ranker.score(
top_candidates=light_scores[:100],
user_embedding=user_tower(user_features),
content_embeddings=content_tower(candidate_features)
)
# 4. Exploration
explored_scores = exploration_engine.apply(
scores=heavy_scores,
strategy="epsilon_greedy",
epsilon=0.1
)
# 5. Diversity Injection
diverse_results = diversity_injector.inject(
scored_candidates=explored_scores,
diversity_weight=0.3
)
# 6. Safety Filtering
safe_results = safety_filter.filter(diverse_results)
# 7. Serving
recommendations = serving_engine.format_response(safe_results)CRITICAL: All debugging uses file-based logging - NO console output.
trace/logs/
βββ candidate/ # Candidate generation logs
βββ ranking/ # Ranking system logs
βββ exploration/ # Exploration engine logs
βββ diversity/ # Diversity injection logs
βββ serving/ # Real-time serving logs
βββ ab_test/ # A/B testing logs
βββ feedback/ # User feedback logs
βββ errors/ # All error logs
βββ performance/ # Performance metrics
# Test logging system
python test_logging.py
# Trace specific request
python utils/trace_request.py req_12345
# Analyze errors
python utils/log_analyzer.py errors --component ranking
# Performance analysis
python utils/log_analyzer.py performance --timeframe 1hfrom utils import get_logger, log_performance, LogContext
logger = get_logger("HeavyRanker", component="ranking")
@log_performance()
def rank_candidates(candidates, user_features):
with LogContext(request_id="req_123"):
logger.info(f"Ranking {len(candidates)} candidates")
# Ranking logic here
logger.debug("Model inference completed")
return ranked_results- Engagement Rate: Likes, retweets, replies per recommendation
- Click-Through Rate: Content consumption metrics
- Dwell Time: Time spent viewing recommended content
- Diversity Score: Content and author diversity metrics
- Exploration Rate: Novel content discovery percentage
- Safety Score: Content moderation effectiveness
# Example A/B test comparison
{
"epsilon_greedy_0.1": {
"engagement_rate": 0.045,
"diversity_score": 0.73,
"exploration_rate": 0.12
},
"thompson_sampling": {
"engagement_rate": 0.048,
"diversity_score": 0.71,
"exploration_rate": 0.15
}
}from recommendation_engine import RecommendationEngine
engine = RecommendationEngine(
model_path="models/twitter_v2.pkl",
config_path="config/production.yaml"
)
recommendations = engine.get_recommendations(
user_id="user_123",
num_recommendations=20,
strategy="thompson_sampling"
)from strategies import CustomExplorationStrategy
class MyStrategy(CustomExplorationStrategy):
def apply(self, scores, context):
# Your custom exploration logic
return modified_scores
engine.register_strategy("my_strategy", MyStrategy())# Collect user feedback for online learning
engine.record_feedback(
user_id="user_123",
content_id="tweet_456",
action="like",
timestamp=datetime.now()
)
# Update model with feedback
engine.update_from_feedback(batch_size=1000)# config/production.yaml
model:
user_tower_dim: 256
content_tower_dim: 256
hidden_layers: [512, 256, 128]
dropout_rate: 0.3
exploration:
strategy: "thompson_sampling"
exploration_rate: 0.1
update_frequency: 3600
diversity:
content_diversity_weight: 0.3
author_diversity_weight: 0.2
temporal_diversity_weight: 0.1
safety:
toxicity_threshold: 0.8
misinformation_threshold: 0.7
spam_threshold: 0.9# Production serving configuration
SERVING_CONFIG = {
"batch_size": 32,
"max_latency_ms": 100,
"cache_ttl_seconds": 300,
"fallback_strategy": "popular_content",
"monitoring_enabled": True
}from models.base_ranker import BaseRanker
class MyCustomRanker(BaseRanker):
def score(self, candidates, user_features, context):
# Your custom scoring logic
return scores
# Register the new ranker
engine.register_ranker("my_ranker", MyCustomRanker())from features import FeatureExtractor
class MyFeatureExtractor(FeatureExtractor):
def extract(self, content, user, context):
# Extract custom features
return feature_vector
engine.add_feature_extractor(MyFeatureExtractor())- Logging: Use centralized logging system, no print statements
- Performance: Add @log_performance decorators to key functions
- Testing: Include unit tests for all new components
- Documentation: Document all public APIs
- Create component in appropriate directory
- Add comprehensive logging with component name
- Include performance monitoring
- Add unit tests in
tests/ - Update configuration files
- Document in this README
- Check
trace/logs/errors/for error logs - Use
trace_request.pyto follow request flow - Analyze performance with
log_analyzer.py - Never debug with console prints - use logs only
- Twitter's Recommendation Algorithm
- Deep Learning for Recommender Systems
- Multi-Armed Bandits in Recommendation
- Content Diversity in Social Media
- Follow the logging and performance tracking guidelines
- Add comprehensive tests for new features
- Update documentation for API changes
- Ensure all logs are helpful for debugging
- Test A/B framework with new strategies
See LICENSE file for details.
Part of the Social Arena ecosystem - Building next-generation social media simulation and recommendation systems.