diff --git a/.gitignore b/.gitignore index a4133fb..709a1c0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .DS_Store __pycache__/* +scrapers/telegram-scraper/config.yml +scrapers/telegram-scraper/*session \ No newline at end of file diff --git a/scrapers/telegram-scraper/README.md b/scrapers/telegram-scraper/README.md new file mode 100644 index 0000000..2317bc7 --- /dev/null +++ b/scrapers/telegram-scraper/README.md @@ -0,0 +1,185 @@ +# Telegram Scraper + +This Python script uses [Telethon](https://github.com/LonamiWebs/Telethon) to scrape messages, images, and videos from a public or private Telegram channel or group. It supports filtering by date range, forward/backward traversal, media downloads, and exports structured JSON data for analysis. + +--- + +## ๐Ÿ“ฆ Features + +- Download messages from a specific Telegram channel or group +- Optionally download images and/or videos +- Filter messages by start and end date +- Store grouped metadata (message ID, grouped ID, timestamp, content) +- Output three structured files: + - Message data + - Channel-wide metadata + - Metadata for the current scraping run + +--- + +## ๐Ÿ›  Requirements + +- Python 3.8+ +- [Telethon](https://docs.telethon.dev/en/stable/) +- PyYAML + +Install dependencies with: + +```bash +pip install -r requirements.txt +``` + +--- + +## ๐Ÿ“„ Configuration + +Create a file named `config.yml` in the same directory as the script with the following structure: + +```yaml +telegram: + api_id: YOUR_API_ID + api_hash: YOUR_API_HASH + session_name: my_session + +scraping: + target_group: https://t.me/your_channel_or_group + limit: 1000 + download_images: true + download_videos: true + start_date: 2024-01-01 + end_date: 2024-12-31 + scrape_forward: false + offset_id: null + offset_date: null +``` + +> โš ๏ธ Do **not** commit your real `config.yml`. Instead, commit the `config_template.yml` to share structure. + +--- + +## ๐Ÿš€ Usage + +```bash +python telegram_scraper.py +``` + +After running, the script will generate a folder like: + +``` +_/ +โ”œโ”€โ”€ images/ (if enabled) +โ”œโ”€โ”€ videos/ (if enabled) +โ”œโ”€โ”€ _.json # message data +โ”œโ”€โ”€ __run_metadata.json +โ”œโ”€โ”€ __channel_metadata.json +``` + +--- + +## ๐Ÿง  How It Works + +- `start_date` and `end_date` are optional filters +- `reverse = false`: Scrape backward from latest messages (default) +- `reverse = true`: Scrape forward from `offset_date` or the oldest message +- Messages are stored in JSON grouped by their `grouped_id` when available + +--- + +## ๐Ÿงผ Example Output + +### Content and message metadata + +```json +{ + "1441684517": { + "7200": { + "grouped_id": 13954964386683269, + "datetime": "2025-04-11T11:22:28+00:00", + "content": "", + "media_saved": [ + "image" + ] + }, + "7199": { + "grouped_id": 13954964386683269, + "datetime": "2025-04-11T11:22:28+00:00", + "content": "", + "media_saved": [ + "image", "video" + ] + }, + "7198": { + "grouped_id": 13954964386683269, + "datetime": "2025-04-11T11:22:28+00:00", + "content": "", + "media_saved": [ + "image" + ] + }, + } +} +``` + +### Current run metadata + +```json +{ + "channel_name": "", + "channel_id": 1441684517, + "channel_url": "https://t.me/url", + "first_message_datetime": "2025-04-10T10:19:20+00:00", + "last_message_datetime": "2025-04-11T11:22:28+00:00", + "message_count": 11, + "image_count": 10, + "video_count": 1, + "config_used": { + "target_group": "https://telegram.me/url", + "limit": 1000, + "start_date": "2025-04-10T00:00:00+00:00", + "end_date": "2025-04-12T00:00:00+00:00", + "scrape_forward": false, + "offset_id": null, + "offset_date": "2025-04-10T00:00:00+00:00", + "download_images": true, + "download_videos": true + } +} +``` + +### Channel metadata + +```json +{ + "channel_name": "name", + "channel_id": 1441684517, + "channel_url": "https://t.me/name", + "total_message_count": 6691, + "first_message_id": 1, + "first_message_datetime": "2022-09-24T07:42:27+00:00", + "last_message_id": 7220, + "last_message_datetime": "2025-04-17T19:00:08+00:00" +} +``` + +--- + +## โœ… Tips + +- Use `offset_date` only when `scrape_forward: true` +- You can test scraping metadata without downloading media by setting: + ```yaml + download_images: false + download_videos: false + ``` + +--- + +## ๐Ÿ›ก Security + +- Your `api_id` and `api_hash` are **sensitive** +- Use `.gitignore` to prevent `config.yml` from being committed: + +```gitignore +scrapers/telegram-scraper/config.yml +scrapers/telegram-scraper/*.session +``` diff --git a/scrapers/telegram-scraper/config_template.yml b/scrapers/telegram-scraper/config_template.yml new file mode 100644 index 0000000..c57ccc4 --- /dev/null +++ b/scrapers/telegram-scraper/config_template.yml @@ -0,0 +1,16 @@ +# Copy (or rename) this file to config.yml and fill in the required fields +telegram: + api_id: 123456 + api_hash: 'f123x123' + session_name: 'name of your session' + +scraping: + target_group: 'https://telegram.me/group_name' + limit: 1000 + scrape_forward: false # Set to True to scrape from oldest to newest. The default order is from newest to oldest + download_images: true + download_videos: true + start_date: '2025-04-10' # Optional: format YYYY-MM-DD + end_date: '2025-04-12' # Optional: format YYYY-MM-DD, exclusive of last date + offset_id: null # Optional: Use specific message ID to start from + offset_date: '2025-04-10' # Optional: Overrides start_date as pagination hint diff --git a/scrapers/telegram-scraper/requirements.txt b/scrapers/telegram-scraper/requirements.txt new file mode 100644 index 0000000..b3008a1 --- /dev/null +++ b/scrapers/telegram-scraper/requirements.txt @@ -0,0 +1,5 @@ +pyaes==1.6.1 +pyasn1==0.6.1 +PyYAML==6.0.2 +rsa==4.9 +Telethon==1.39.0 diff --git a/scrapers/telegram-scraper/telegram_scraper.py b/scrapers/telegram-scraper/telegram_scraper.py new file mode 100644 index 0000000..9629653 --- /dev/null +++ b/scrapers/telegram-scraper/telegram_scraper.py @@ -0,0 +1,176 @@ +import os +import json +import yaml +from datetime import datetime, timezone +from telethon.sync import TelegramClient +from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument + + +def load_config(path='config.yml'): + with open(path, 'r') as file: + return yaml.safe_load(file) + +def parse_date(date_str): + return datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc) if date_str else None + +def generate_fallback_photo_id(message): + photo = getattr(message.media, 'photo', None) + if photo and hasattr(photo, 'id'): + return str(photo.id) + else: + return str(int(message.date.timestamp())) + + +if __name__ == "__main__": + config = load_config() + + # Telegram API credentials + api_id = config['telegram']['api_id'] + api_hash = config['telegram']['api_hash'] + session_name = config['telegram']['session_name'] + + # Scraping parameters + target_group = config['scraping']['target_group'] + limit = config['scraping'].get('limit', 1000) + download_images = config['scraping'].get('download_images', False) + download_videos = config['scraping'].get('download_videos', False) + start_date = parse_date(config['scraping'].get('start_date')) + end_date = parse_date(config['scraping'].get('end_date')) + reverse = config['scraping'].get('scrape_forward') + offset_id = config['scraping'].get('offset_id') + offset_date = parse_date(config['scraping'].get('offset_date')) or start_date + + if start_date and end_date and start_date > end_date: + raise ValueError("start_date cannot be after end_date.") + + with TelegramClient(session_name, api_id, api_hash) as client: + entity = client.get_entity(target_group) + group_id = entity.id + group_name = entity.username or entity.title or "unknown" + group_url = f"https://t.me/{entity.username}" if entity.username else None + + folder_name = f"{group_id}_{group_name}".replace(" ", "_") + os.makedirs(folder_name, exist_ok=True) + + images_folder = os.path.join(folder_name, 'images') + videos_folder = os.path.join(folder_name, 'videos') + if download_images: + os.makedirs(images_folder, exist_ok=True) + if download_videos: + os.makedirs(videos_folder, exist_ok=True) + + data = {str(group_id): {}} + first_msg_time = None + last_msg_time = None + image_count = 0 + video_count = 0 + message_count = 0 + + iter_args = {"entity": entity, "limit": limit, "reverse": reverse} + if offset_id is not None: + iter_args["offset_id"] = offset_id + elif offset_date is not None: + # Use offset_date only if reverse is True (forward) + if reverse: + iter_args["offset_date"] = offset_date + + + for message in client.iter_messages(**iter_args): + if not message: + continue + if start_date and message.date < start_date: + continue + if end_date and message.date > end_date: + continue + + msg_id = message.id + grouped_id = getattr(message, 'grouped_id', None) + msg_time = message.date + content = message.message + media_saved = [] + + if isinstance(message.media, MessageMediaPhoto) and download_images: + media_id = generate_fallback_photo_id(message) + filename = os.path.join(images_folder, f"{group_id}_{msg_id}_{media_id}_photo.jpg") + result = client.download_media(message, file=filename) + if result: + image_count += 1 + media_saved.append("image") + + elif isinstance(message.media, MessageMediaDocument) and message.media.document.mime_type.startswith("video/") and download_videos: + media_id = str(getattr(message.media.document, 'id', int(msg_time.timestamp()))) + filename = os.path.join(videos_folder, f"{group_id}_{msg_id}_{media_id}_video.mp4") + result = client.download_media(message, file=filename) + if result: + video_count += 1 + media_saved.append("video") + + if media_saved: + message_count += 1 + first_msg_time = min(first_msg_time or msg_time, msg_time) + last_msg_time = max(last_msg_time or msg_time, msg_time) + + data[str(group_id)][str(msg_id)] = { + "grouped_id": grouped_id, + "datetime": msg_time.isoformat(), + "content": content, + "media_saved": media_saved + } + + first_msg = client.iter_messages(entity, reverse=True).__next__() + last_msg = client.iter_messages(entity).__next__() + total_msg_count = client.get_messages(entity, limit=0).total + + channel_metadata = { + "channel_name": group_name, + "channel_id": group_id, + "channel_url": group_url, + "total_message_count": total_msg_count, + "first_message_id": first_msg.id, + "first_message_datetime": first_msg.date.isoformat(), + "last_message_id": last_msg.id, + "last_message_datetime": last_msg.date.isoformat() + } + + channel_meta_json = os.path.join(folder_name, f"{folder_name}_channel_metadata.json") + with open(channel_meta_json, 'w', encoding='utf-8') as f: + json.dump(channel_metadata, f, ensure_ascii=False, indent=4) + + message_json = os.path.join(folder_name, f"{folder_name}.json") + with open(message_json, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=4) + + run_meta = { + "channel_name": group_name, + "channel_id": group_id, + "channel_url": group_url, + "first_message_datetime": first_msg_time.isoformat() if first_msg_time else None, + "last_message_datetime": last_msg_time.isoformat() if last_msg_time else None, + "message_count": message_count, + "image_count": image_count, + "video_count": video_count, + "config_used": { + "target_group": target_group, + "limit": limit, + "start_date": start_date.isoformat() if start_date else None, + "end_date": end_date.isoformat() if end_date else None, + "scrape_forward": reverse, + "offset_id": offset_id, + "offset_date": offset_date.isoformat() if offset_date else None, + "download_images": download_images, + "download_videos": download_videos + } + } + + run_meta_json = os.path.join(folder_name, f"{folder_name}_run_metadata.json") + with open(run_meta_json, 'w', encoding='utf-8') as f: + json.dump(run_meta, f, ensure_ascii=False, indent=4) + + print(f"\nโœ… Scraping complete.") + print(f"- Channel metadata saved to: {channel_meta_json}") + print(f"- Messages saved to: {message_json}") + print(f"- Metadata for the current run saved to: {run_meta_json}") + if download_images: + print(f"- Images saved in: {images_folder}") + if download_videos: + print(f"- Videos saved in: {videos_folder}") \ No newline at end of file diff --git a/scrapers/twitter-scraper/README.md b/scrapers/twitter-scraper/README.md new file mode 100644 index 0000000..e745534 --- /dev/null +++ b/scrapers/twitter-scraper/README.md @@ -0,0 +1,86 @@ +# ๐Ÿฆ Twitter Scraper (Vercel-style Token + Selenium) + +This Python script scrapes tweet metadata and media (images, videos) from a public or authenticated Twitter/X profile using: + +- ๐Ÿง  **Vercel-style token workaround** to access embedded tweet data +- ๐Ÿ•ต๏ธ **Selenium** to scrape tweet IDs directly from a user's timeline + + + +"Vercel's react-tweet now has a bit of a workaround. They figured out that you can use the Twitter embed API to get data from any tweet. Usually, you'd need a special token to get any data but they reverse engineered the token and you can generate it yourself using the tweet id." + +[Discussion thread](https://github.com/JustAnotherArchivist/snscrape/issues/996) + +[Vercel workaround source](https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts) + +## ๐Ÿ“ฆ Features + +- Extracts tweet text, datetime, and media (images/videos) +- Saves media locally and metadata as structured JSON +- Supports headless login for scraping private/protected timelines +- Outputs organized into a per-user folder + +## ๐Ÿ“ Output Structure + +For a target user like `@BU_Tweets`, the script creates: + +``` +BU_Tweets/ +โ”œโ”€โ”€ tweets.json # Structured tweet data +โ”œโ”€โ”€ tweets_run_metadata.json # Run summary and config +โ”œโ”€โ”€ images/ # Downloaded images +โ””โ”€โ”€ videos/ # Downloaded videos +``` + +## ๐Ÿ”ง Configuration + +Create a `config.yml` file in the project directory: + +```yaml +auth: + username: your_x_username + password: your_x_password + +scraping: + username: BU_Tweets # Twitter/X username (no @ needed) + limit: 30 # Number of tweets to scrape + tweet_ids: null # Optional: provide list to skip Selenium +``` + +## ๐Ÿš€ Usage + +Install dependencies: + +```bash +pip install -r requirements.txt +``` + +Download and install [ChromeDriver](https://sites.google.com/chromium.org/driver/) compatible with your Chrome version. + +Then run the script: + +```bash +python twitter_scraper.py +``` + +## ๐Ÿ’ก Notes + +- Media presence is recorded as: + +```json +"media_saved": { + "image": true, + "video": false +} +``` + +- Random delays are added to mimic human scrolling and reduce rate-limiting risk. +- Works best for accounts that don't require heavy login-based protection. + +## โš ๏ธ Disclaimer + +This project is for educational purposes. Use responsibly and comply with Twitter/X's Terms of Service. + +--- + +Created with ๐Ÿ’ป by [Ta-Chi Lin] diff --git a/scrapers/twitter-scraper/config_template.yml b/scrapers/twitter-scraper/config_template.yml new file mode 100644 index 0000000..7e3fa19 --- /dev/null +++ b/scrapers/twitter-scraper/config_template.yml @@ -0,0 +1,15 @@ +# config_template.yml +auth: + username: your_x_username + password: your_x_password + +scraping: + # You can either provide a username to fetch tweets from... + username: "BU_Tweets" + + # ...or specify tweet IDs directly. + # If tweet_ids is provided, username is ignored. + tweet_ids: null + + # Maximum number of tweets to fetch from the user (if username is used) + limit: 20 diff --git a/scrapers/twitter-scraper/twitter_scraper.py b/scrapers/twitter-scraper/twitter_scraper.py new file mode 100644 index 0000000..b6d795d --- /dev/null +++ b/scrapers/twitter-scraper/twitter_scraper.py @@ -0,0 +1,221 @@ +# twitter_scraper.py using Vercel-style token workaround + Selenium for tweet ID scraping +import os +import json +import yaml +import requests +from datetime import datetime, timezone +from urllib.parse import urlparse +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +import time +import re +import random + +# ---------------------------- +# Twitter Login with Selenium +# ---------------------------- +def login_to_twitter(driver, username, password): + driver.get("https://x.com/login") + wait = WebDriverWait(driver, 15) + + user_input = wait.until(EC.presence_of_element_located((By.NAME, "text"))) + time.sleep(random.uniform(1.5, 5.5)) + user_input.send_keys(username) + user_input.send_keys(Keys.RETURN) + + pwd_input = wait.until(EC.presence_of_element_located((By.NAME, "password"))) + time.sleep(random.uniform(1.5, 5.5)) + pwd_input.send_keys(password) + pwd_input.send_keys(Keys.RETURN) + + wait.until(EC.invisibility_of_element((By.NAME, "password"))) + +# ---------------------------- +# Configuration and Utilities +# ---------------------------- +def load_config(path='config.yml'): + with open(path, 'r') as file: + return yaml.safe_load(file) + +def parse_date(date_str): + return datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc) if date_str else None + +def base36(num): + chars = '0123456789abcdefghijklmnopqrstuvwxyz' + result = '' + while num > 0: + num, i = divmod(num, 36) + result = chars[i] + result + return result or '0' + +def get_token(tweet_id: str) -> str: + token = ((int(tweet_id) / 1e15) * 3.141592653589793) + return base36(int(token * 1e6)) + +# ---------------------------- +# Tweet Data Retrieval +# ---------------------------- +def fetch_tweet_data(tweet_id): + token = get_token(tweet_id) + url = f"https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token={token}" + response = requests.get(url) + if response.status_code == 200: + return response.json() + else: + print(f"[WARN] Failed to fetch tweet {tweet_id}: HTTP {response.status_code}") + return None + +def download_media(url, folder): + parsed = urlparse(url) + filename = os.path.basename(parsed.path) + path = os.path.join(folder, filename) + try: + r = requests.get(url) + if r.status_code == 200: + with open(path, 'wb') as f: + f.write(r.content) + return filename + except Exception as e: + print(f"[ERROR] Downloading media failed: {e}") + return None + +# ---------------------------- +# Tweet ID Scraper via Selenium +# ---------------------------- +def get_tweet_ids_from_profile(username, limit=50, login_credentials=None): + print(f"[INFO] Scraping tweet IDs from https://x.com/{username} ...") + tweet_ids = set() + options = Options() + options.add_argument('--headless') + options.add_argument('--disable-gpu') + options.add_argument('--no-sandbox') + options.add_argument("user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36") + driver = webdriver.Chrome(options=options) + + try: + if login_credentials: + login_to_twitter(driver, login_credentials['username'], login_credentials['password']) + time.sleep(random.uniform(2, 4)) + + url = f"https://x.com/{username}" + driver.get(url) + driver.save_screenshot("debug_screenshot.png") + time.sleep(random.uniform(2, 4)) + + last_height = driver.execute_script("return document.body.scrollHeight") + while len(tweet_ids) < limit: + driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + time.sleep(random.uniform(2, 3)) + page_source = driver.page_source + matches = re.findall(r'/status/(\d+)', page_source) + tweet_ids.update(matches) + if len(tweet_ids) >= limit: + break + new_height = driver.execute_script("return document.body.scrollHeight") + if new_height == last_height: + break + last_height = new_height + finally: + driver.quit() + + tweet_ids = list(tweet_ids)[:limit] + print(f"[INFO] Collected tweet IDs: {tweet_ids}") + return tweet_ids + +# ---------------------------- +# Main Scraping Logic +# ---------------------------- +def scrape_tweets(config): + tweet_ids = config['scraping'].get('tweet_ids') + username = config['scraping'].get('username') + limit = config['scraping'].get('limit', 100) + login_credentials = config.get('auth') + + if username and not tweet_ids: + tweet_ids = get_tweet_ids_from_profile(username, limit=limit, login_credentials=login_credentials) + + folder_name = username or "tweet_scrape" + folder_name = folder_name.replace("@", "").strip().lower() + os.makedirs(folder_name, exist_ok=True) + + images_folder = os.path.join(folder_name, 'images') + videos_folder = os.path.join(folder_name, 'videos') + os.makedirs(images_folder, exist_ok=True) + os.makedirs(videos_folder, exist_ok=True) + + data = {"tweets": {}} + tweet_count = 0 + image_count = 0 + video_count = 0 + + for tweet_id in tweet_ids: + tweet_data = fetch_tweet_data(tweet_id) + if tweet_data is None: + continue + + tweet_time = parse_date(tweet_data.get('created_at', '')[:10]) + tweet_text = tweet_data.get('text', '') + has_image = False + has_video = False + + media_entities = tweet_data.get('photos') or [] + for media in media_entities: + media_url = media.get('url') + if not media_url: + continue + filename = download_media(media_url, images_folder) + if filename: + has_image = True + image_count += 1 + + video_url = tweet_data.get('video_url') + if video_url: + filename = download_media(video_url, videos_folder) + if filename: + has_video = True + video_count += 1 + + data["tweets"][str(tweet_id)] = { + "datetime": tweet_time.isoformat() if tweet_time else None, + "content": tweet_text, + "media_saved": { + "image": has_image, + "video": has_video + } + } + + tweet_count += 1 + + message_json = os.path.join(folder_name, "tweets.json") + with open(message_json, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=4) + + run_meta = { + "tweet_count": tweet_count, + "image_count": image_count, + "video_count": video_count, + "config_used": { + "username": username, + "tweet_ids": tweet_ids, + "limit": limit + } + } + + run_meta_json = os.path.join(folder_name, "tweets_run_metadata.json") + with open(run_meta_json, 'w', encoding='utf-8') as f: + json.dump(run_meta, f, ensure_ascii=False, indent=4) + + print("\nโœ… Tweet scrape complete.") + print(f"- Tweets saved to: {message_json}") + print(f"- Metadata saved to: {run_meta_json}") + print(f"- Images saved to: {images_folder}") + print(f"- Videos saved to: {videos_folder}") + + +if __name__ == '__main__': + config = load_config() + scrape_tweets(config) \ No newline at end of file