#!/usr/bin/env python3 """ Extract tweet contents from given Tweet IDs and save them as TOML files. This script uses the twitter-api-client library to fetch tweet data and saves it in TOML format with optional media downloads and recursive extraction. """ import json import os import sys import time import argparse import urllib.request import urllib.parse from datetime import datetime from pathlib import Path from typing import Dict, List, Set, Tuple, Optional, Any try: import tomlkit TOML_WRITE_MODE = 'text' TOML_LIB = 'tomlkit' except ImportError: try: import tomli_w TOML_WRITE_MODE = 'binary' TOML_LIB = 'tomli_w' tomlkit = tomli_w except ImportError: print("Error: tomlkit or tomli-w is required. Install with: pip install tomlkit") sys.exit(1) from twitter.scraper import Scraper def print_json(data): """Pretty print JSON data.""" print(json.dumps(data, indent=2)) def is_rate_limit_error(error): """ Check if an error is a rate limit error (429 Too Many Requests). Args: error: Exception object or error message Returns: True if it's a rate limit error, False otherwise """ error_str = str(error).lower() rate_limit_indicators = [ '429', 'too many requests', 'rate limit', 'rate_limit', 'exceeded', 'quota', 'limit exceeded' ] return any(indicator in error_str for indicator in rate_limit_indicators) def handle_rate_limit_error(error, retry_count, base_wait_time=60): """ Handle rate limit errors with exponential backoff. Args: error: The exception that occurred retry_count: Number of times we've retried base_wait_time: Base wait time in seconds (default 60s = 1 minute) Returns: Wait time in seconds before retrying """ wait_time = base_wait_time * (2 ** retry_count) wait_time = min(wait_time, 900) # Cap at 15 minutes print(f"\n ⚠ Rate limit detected (attempt {retry_count + 1})") print(f" ⏳ Waiting {wait_time}s ({wait_time/60:.1f} minutes) before retry...") return wait_time def parse_tweet_ids_from_args(tweet_ids_str: Optional[str], tweet_ids_files: Optional[str]) -> Set[str]: """ Parse tweet IDs from CLI arguments. Args: tweet_ids_str: Comma-separated tweet IDs string tweet_ids_files: Comma-separated file paths Returns: Set of tweet IDs (deduplicated) """ all_tweet_ids = set() # Parse comma-separated tweet IDs if tweet_ids_str: ids = [tid.strip() for tid in tweet_ids_str.split(',') if tid.strip()] all_tweet_ids.update(ids) # Parse tweet IDs from files if tweet_ids_files: file_paths = [f.strip() for f in tweet_ids_files.split(',') if f.strip()] for file_path in file_paths: file_path = os.path.expanduser(file_path) if not os.path.isabs(file_path): file_path = os.path.join(os.getcwd(), file_path) if not os.path.exists(file_path): print(f"⚠ Warning: File not found: {file_path}") continue try: ids = parse_tweet_ids_from_file(file_path) all_tweet_ids.update(ids) except Exception as e: print(f"⚠ Warning: Error parsing file {file_path}: {e}") continue return all_tweet_ids def parse_tweet_ids_from_file(file_path: str) -> List[str]: """ Parse tweet IDs from a file. Supports: - Plain text file with one Tweet ID per line - JSON file containing a list (array) of Tweet IDs - Scrape summary JSON file (from scrape_user_tweet_ids.py) Args: file_path: Path to the file Returns: List of tweet IDs """ tweet_ids = [] # Check file extension _, ext = os.path.splitext(file_path.lower()) if ext == '.json': # Try to parse as JSON with open(file_path, 'r') as f: data = json.load(f) # Check if it's a scrape summary file if isinstance(data, dict) and 'tweet_ids_file' in data: # It's a scrape summary file tweet_ids_file = data['tweet_ids_file'] if not os.path.isabs(tweet_ids_file): # Make relative to the summary file's directory summary_dir = os.path.dirname(file_path) tweet_ids_file = os.path.join(summary_dir, tweet_ids_file) # Recursively parse the tweet IDs file return parse_tweet_ids_from_file(tweet_ids_file) # Check if it's a list of tweet IDs elif isinstance(data, list): tweet_ids = [str(tid) for tid in data if tid] else: raise ValueError(f"Unexpected JSON structure in {file_path}") else: # Assume plain text file with one tweet ID per line with open(file_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): tweet_ids.append(line) return tweet_ids def extract_tweet_from_response(response_data: Any, tweet_id: str) -> Optional[Dict]: """ Extract tweet data from API response. Args: response_data: Response data from scraper tweet_id: The tweet ID we're looking for Returns: Tweet data dictionary or None if not found """ try: # Handle list response if isinstance(response_data, list): if len(response_data) == 0: return None data = response_data[0] elif isinstance(response_data, dict): data = response_data else: return None # Navigate through the nested structure # Try different possible paths tweet_result = None # Path 1: TweetDetail GraphQL response structure # Check for threaded_conversation_with_injections_v2 structure if 'data' in data: threaded_conversation = data.get('data', {}).get('threaded_conversation_with_injections_v2', {}) instructions = threaded_conversation.get('instructions', []) for instruction in instructions: if instruction.get('type') == 'TimelineAddEntries': entries = instruction.get('entries', []) for entry in entries: content = entry.get('content', {}) if content.get('entryType') == 'TimelineTimelineItem': item_content = content.get('itemContent', {}) if item_content.get('itemType') == 'TimelineTweet': result = item_content.get('tweet_results', {}).get('result', {}) if result.get('rest_id') == tweet_id: tweet_result = result break if tweet_result: break if tweet_result: break # Path 2: Timeline structure (for user tweets) if not tweet_result and 'data' in data: timeline = data.get('data', {}).get('user', {}).get('result', {}).get('timeline_v2', {}).get('timeline', {}) instructions = timeline.get('instructions', []) for instruction in instructions: if instruction.get('type') == 'TimelineAddEntries': entries = instruction.get('entries', []) for entry in entries: content = entry.get('content', {}) if content.get('entryType') == 'TimelineTimelineItem': item_content = content.get('itemContent', {}) if item_content.get('itemType') == 'TimelineTweet': result = item_content.get('tweet_results', {}).get('result', {}) if result.get('rest_id') == tweet_id: tweet_result = result break if tweet_result: break if tweet_result: break # Path 3: Direct tweet lookup (recursive search) if not tweet_result: def find_tweet_recursive(obj, target_id): if isinstance(obj, dict): # Check if this is a tweet result with matching ID if obj.get('rest_id') == target_id and obj.get('__typename') == 'Tweet': return obj # Also check legacy.id_str for older format legacy = obj.get('legacy', {}) if legacy and legacy.get('id_str') == target_id: return obj # Recursively search for value in obj.values(): result = find_tweet_recursive(value, target_id) if result: return result elif isinstance(obj, list): for item in obj: result = find_tweet_recursive(item, target_id) if result: return result return None tweet_result = find_tweet_recursive(data, tweet_id) return tweet_result except Exception as e: print(f" ⚠ Warning: Error extracting tweet {tweet_id}: {e}") import traceback traceback.print_exc() return None def extract_tweet_data(tweet_result: Dict, bare_scrape: bool = False, advanced_info: bool = False) -> Dict: """ Extract tweet data from tweet result structure. Args: tweet_result: Tweet result dictionary from API bare_scrape: If True, only extract bare minimum fields advanced_info: If True, extract additional optional fields Returns: Dictionary with tweet data """ tweet_data = {} # Extract tweet ID (bare) tweet_data['id'] = tweet_result.get('rest_id') # Extract legacy data (main tweet content) legacy = tweet_result.get('legacy', {}) # Extract full text (bare) tweet_data['full_text'] = legacy.get('full_text', '') # Extract is_quote_status (bare) tweet_data['is_quote_status'] = legacy.get('is_quote_status', False) # Extract entities (always included) entities = legacy.get('entities', {}) tweet_data['entities'] = { 'hashtags': entities.get('hashtags', []), 'urls': entities.get('urls', []), 'user_mentions': entities.get('user_mentions', []), 'symbols': entities.get('symbols', []), 'media': entities.get('media', []) if not bare_scrape else [] } # Extract optional fields if not bare scrape if not bare_scrape: # Optional: creation date if advanced_info: tweet_data['created_at'] = legacy.get('created_at') # Optional: bookmark count if advanced_info: tweet_data['bookmark_count'] = legacy.get('bookmark_count', 0) # Optional: favorite count if advanced_info: tweet_data['favorite_count'] = legacy.get('favorite_count', 0) # Optional: quote count if advanced_info: tweet_data['quote_count'] = legacy.get('quote_count', 0) # Optional: reply count if advanced_info: tweet_data['reply_count'] = legacy.get('reply_count', 0) # Optional: retweet count if advanced_info: tweet_data['retweet_count'] = legacy.get('retweet_count', 0) # Optional: retweeted status if advanced_info: tweet_data['retweeted'] = legacy.get('retweeted', False) # Optional: edit_tweet_ids if advanced_info: edit_control = tweet_result.get('edit_control', {}) edit_tweet_ids = edit_control.get('edit_tweet_ids', []) if edit_tweet_ids: tweet_data['edit_tweet_ids'] = edit_tweet_ids # Extract author information core = tweet_result.get('core', {}) user_results = core.get('user_results', {}) user_result = user_results.get('result', {}) legacy_user = user_result.get('legacy', {}) # Author ID (bare) tweet_data['author'] = { 'id': user_result.get('rest_id'), 'name': legacy_user.get('name', ''), 'screen_name': legacy_user.get('screen_name', '') } # Author optional fields if not bare_scrape: # Avatar URL (always included if downloading avatars) profile_image_url = legacy_user.get('profile_image_url_https', '') tweet_data['author']['avatar_url'] = profile_image_url # Optional: verified status if advanced_info: tweet_data['author']['is_verified'] = user_result.get('is_blue_verified', False) # Optional: follower count if advanced_info: tweet_data['author']['followers_count'] = legacy_user.get('followers_count', 0) # Extract retweeted status if present # Check both top-level and legacy level retweeted_status_result = tweet_result.get('retweeted_status_result', {}) if not retweeted_status_result: retweeted_status_result = legacy.get('retweeted_status_result', {}) if retweeted_status_result: retweeted_result = retweeted_status_result.get('result', {}) if retweeted_result: # Extract bare minimum for retweeted tweet tweet_data['retweeted_status'] = extract_tweet_data( retweeted_result, bare_scrape=True, # Always bare for retweeted tweets advanced_info=False ) # Extract quoted status if present quoted_status_id_str = legacy.get('quoted_status_id_str') if quoted_status_id_str: tweet_data['quoted_status_id'] = quoted_status_id_str # Extract replied-to tweet ID if present in_reply_to_status_id_str = legacy.get('in_reply_to_status_id_str') if in_reply_to_status_id_str: tweet_data['in_reply_to_status_id'] = in_reply_to_status_id_str return tweet_data def download_file(url: str, output_path: str, retry_count: int = 0) -> bool: """ Download a file from URL to output path. Args: url: URL to download from output_path: Path to save the file retry_count: Number of retries attempted Returns: True if successful, False otherwise """ try: os.makedirs(os.path.dirname(output_path), exist_ok=True) # Create request with user agent req = urllib.request.Request(url) req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=30) as response: with open(output_path, 'wb') as f: f.write(response.read()) return True except Exception as e: if retry_count < 2: time.sleep(2) return download_file(url, output_path, retry_count + 1) print(f" ⚠ Warning: Failed to download {url}: {e}") return False def download_tweet_media(tweet_data: Dict, tweet_id: str, media_dir: str) -> List[str]: """ Download media files for a tweet. Args: tweet_data: Tweet data dictionary media_dir: Directory to save media files Returns: List of local file paths for downloaded media """ media_paths = [] entities = tweet_data.get('entities', {}) media_list = entities.get('media', []) if not media_list: return media_paths tweet_media_dir = os.path.join(media_dir, tweet_id) for idx, media_item in enumerate(media_list): media_url = media_item.get('media_url_https') or media_item.get('media_url') if not media_url: continue # Determine file extension ext = 'jpg' # Default if 'type' in media_item: media_type = media_item['type'] if media_type == 'video': # Try to get video URL video_info = media_item.get('video_info', {}) variants = video_info.get('variants', []) if variants: # Get the highest bitrate variant best_variant = max(variants, key=lambda v: v.get('bitrate', 0)) media_url = best_variant.get('url', media_url) ext = 'mp4' elif media_type == 'animated_gif': ext = 'gif' # Extract extension from URL if possible parsed_url = urllib.parse.urlparse(media_url) path_ext = os.path.splitext(parsed_url.path)[1] if path_ext: ext = path_ext.lstrip('.') filename = f"media_{idx + 1}.{ext}" output_path = os.path.join(tweet_media_dir, filename) if download_file(media_url, output_path): media_paths.append(output_path) # Update tweet data with local path media_item['local_path'] = os.path.relpath(output_path, os.path.dirname(media_dir)) return media_paths def download_avatar(avatar_url: str, author_id: str, avatars_dir: str) -> Optional[str]: """ Download avatar image for an author. Args: avatar_url: URL of the avatar image author_id: Author's user ID avatars_dir: Directory to save avatars Returns: Local file path if successful, None otherwise """ if not avatar_url: return None # Determine file extension ext = 'jpg' # Default parsed_url = urllib.parse.urlparse(avatar_url) path_ext = os.path.splitext(parsed_url.path)[1] if path_ext: ext = path_ext.lstrip('.') # Remove '_normal' from filename to get higher resolution if available avatar_url_hq = avatar_url.replace('_normal', '') filename = f"{author_id}.{ext}" output_path = os.path.join(avatars_dir, filename) # Try high quality first, fallback to normal if download_file(avatar_url_hq, output_path): return output_path elif download_file(avatar_url, output_path): return output_path return None def fetch_tweet_by_id(scraper: Scraper, tweet_id: str, retry_count: int = 0, delay_between_requests: float = 2.0) -> Optional[Dict]: """ Fetch a single tweet by ID with rate limit handling. Uses the twitter-api-client library's methods to fetch tweet details. Tries multiple approaches to handle different library versions. Args: scraper: Scraper instance tweet_id: Tweet ID to fetch retry_count: Current retry count delay_between_requests: Delay between requests Returns: Tweet result dictionary or None if not found """ try: response_data = None last_error = None # Try different methods based on what's available in the library # Method 1: Try tweets_details() if available (note: plural "tweets") if hasattr(scraper, 'tweets_details'): try: response_data = scraper.tweets_details([tweet_id]) if response_data: print(f" ✓ Fetched using tweets_details()") except Exception as e: last_error = e if retry_count == 0: print(f" ⚠ tweets_details() failed: {e}") pass # Method 2: Try tweet() method if available if response_data is None and hasattr(scraper, 'tweet'): try: response_data = scraper.tweet(tweet_id) if response_data: print(f" ✓ Fetched using tweet()") except Exception as e: last_error = e pass # Method 3: Try using GraphQL API directly if response_data is None and hasattr(scraper, 'graphql'): try: variables = { "focalTweetId": tweet_id, "with_rux_injections": False, "includePromotedContent": False, "withCommunity": True, "withQuickPromoteEligibilityTweetFields": True, "withBirdwatchNotes": True, "withSuperFollowsUserFields": True, "withDownvotePerspective": False, "withReactionsMetadata": False, "withReactionsPerspective": False, "withReplays": True, "withVoice": True, "withV2Timeline": True } features = { "rweb_tipjar_consumption_enabled": True, "responsive_web_graphql_exclude_directive_enabled": True, "verified_phone_label_enabled": False, "creator_subscriptions_quote_tweet_preview_enabled": True, "responsive_web_graphql_timeline_navigation_enabled": True, "responsive_web_graphql_skip_user_profile_image_size_enabled": False, "communities_web_enable_tweet_community_results_fetch": True, "c9s_tweet_anatomy_moderator_badge_enabled": True, "articles_preview_enabled": True, "responsive_web_edit_tweet_api_enabled": True, "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True, "view_counts_everywhere_api_enabled": True, "longform_notetweets_consumption_enabled": True, "responsive_web_twitter_article_tweet_consumption_enabled": True, "tweet_awards_web_tipping_enabled": False, "freedom_of_speech_not_reach_fetch_enabled": True, "standardized_nudges_misinfo": True, "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True, "longform_notetweets_rich_text_read_enabled": True, "longform_notetweets_inline_media_enabled": True, "responsive_web_enhance_cards_enabled": False } response_data = scraper.graphql("TweetDetail", variables, features) if response_data: print(f" ✓ Fetched using graphql()") except Exception as e: last_error = e # Don't silently pass - log the error for debugging if retry_count == 0: # Only print on first attempt to avoid spam print(f" ⚠ Debug: graphql() failed: {e}") pass # Method 4: Try using the scraper's session directly to make a GraphQL request if response_data is None and hasattr(scraper, 'session'): try: # Use the TweetDetail GraphQL endpoint # The endpoint hash might vary, but this is a common one url = "https://twitter.com/i/api/graphql/VWx37vRycLNpJY1qH7a6ow/TweetDetail" variables = { "focalTweetId": tweet_id, "with_rux_injections": False, "includePromotedContent": False, "withCommunity": True, "withQuickPromoteEligibilityTweetFields": True, "withBirdwatchNotes": True, "withSuperFollowsUserFields": True, "withDownvotePerspective": False, "withReactionsMetadata": False, "withReactionsPerspective": False, "withReplays": True, "withVoice": True, "withV2Timeline": True } features = { "rweb_tipjar_consumption_enabled": True, "responsive_web_graphql_exclude_directive_enabled": True, "verified_phone_label_enabled": False, "creator_subscriptions_quote_tweet_preview_enabled": True, "responsive_web_graphql_timeline_navigation_enabled": True, "responsive_web_graphql_skip_user_profile_image_size_enabled": False, "communities_web_enable_tweet_community_results_fetch": True, "c9s_tweet_anatomy_moderator_badge_enabled": True, "articles_preview_enabled": True, "responsive_web_edit_tweet_api_enabled": True, "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True, "view_counts_everywhere_api_enabled": True, "longform_notetweets_consumption_enabled": True, "responsive_web_twitter_article_tweet_consumption_enabled": True, "tweet_awards_web_tipping_enabled": False, "freedom_of_speech_not_reach_fetch_enabled": True, "standardized_nudges_misinfo": True, "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True, "longform_notetweets_rich_text_read_enabled": True, "longform_notetweets_inline_media_enabled": True, "responsive_web_enhance_cards_enabled": False } params = { "variables": json.dumps(variables), "features": json.dumps(features) } response = scraper.session.get(url, params=params) if response.status_code == 200: response_data = response.json() if response_data: print(f" ✓ Fetched using direct GraphQL request") else: error_text = response.text[:200] if hasattr(response, 'text') and response.text else str(response.status_code) last_error = Exception(f"GraphQL request failed with status {response.status_code}: {error_text}") if retry_count == 0: print(f" ⚠ Debug: Direct GraphQL request failed: {last_error}") except Exception as e: last_error = e pass if response_data is None: # Debug: print available methods available_methods = [m for m in dir(scraper) if not m.startswith('_') and callable(getattr(scraper, m, None))] print(f" ⚠ Debug: Available scraper methods: {', '.join(available_methods[:10])}...") if last_error: print(f" ⚠ Debug: Last error: {last_error}") error_msg = f"Could not fetch tweet {tweet_id} using any available method. " error_msg += f"Tried: tweets_details, tweet, graphql, direct GraphQL request. " if last_error: error_msg += f"Last error: {last_error}" raise Exception(error_msg) # Extract tweet from response tweet_result = extract_tweet_from_response(response_data, tweet_id) if tweet_result: return tweet_result else: # Debug: print response structure print(f" ⚠ Debug: Response structure keys: {list(response_data.keys()) if isinstance(response_data, dict) else 'Not a dict'}") if isinstance(response_data, list) and len(response_data) > 0: print(f" ⚠ Debug: Response is list, first item keys: {list(response_data[0].keys()) if isinstance(response_data[0], dict) else 'Not a dict'}") print(f" ⚠ Warning: Tweet {tweet_id} not found in response") return None except Exception as e: error_msg = str(e) # Check if it's a rate limit error if is_rate_limit_error(e): wait_time = handle_rate_limit_error(e, retry_count) time.sleep(wait_time) if retry_count < 5: # Max 5 retries for rate limits return fetch_tweet_by_id(scraper, tweet_id, retry_count + 1, delay_between_requests) else: print(f" ❌ Max retries reached for tweet {tweet_id}") return None else: # For other errors, retry once if retry_count < 1: time.sleep(delay_between_requests * 3) return fetch_tweet_by_id(scraper, tweet_id, retry_count + 1, delay_between_requests) else: print(f" ⚠ Warning: Error fetching tweet {tweet_id}: {error_msg}") return None def extract_related_tweet_ids(tweet_data: Dict) -> List[str]: """ Extract related tweet IDs (quoted, retweeted, replied-to) from tweet data. Args: tweet_data: Tweet data dictionary Returns: List of related tweet IDs """ related_ids = [] # Check for quoted status quoted_status_id = tweet_data.get('quoted_status_id') if quoted_status_id: related_ids.append(quoted_status_id) # Check for retweeted status retweeted_status = tweet_data.get('retweeted_status') if retweeted_status: retweet_id = retweeted_status.get('id') if retweet_id: related_ids.append(retweet_id) # Check for replied-to status in_reply_to_status_id = tweet_data.get('in_reply_to_status_id') if in_reply_to_status_id: related_ids.append(in_reply_to_status_id) return related_ids def scrape_tweets_recursive( scraper: Scraper, tweet_id: str, scraped_tweets: Dict[str, Dict], output_dir: str, media_dir: str, avatars_dir: str, depth: int, max_depth: int, bare_scrape: bool, advanced_info: bool, download_media: bool, download_avatars: bool, recursive: bool, scrape_replied_to_tweet: bool, recursive_replied_to_tweets: bool, recursive_replied_to_tweets_quotes_retweets: bool, download_replied_to_tweets_media: bool, max_replied_to_tweets_recursion_depth: int, delay_between_requests: float, replied_to_depth: int = 0 ) -> None: """ Recursively scrape tweets (quoted, retweeted, replied-to). Args: scraper: Scraper instance tweet_id: Tweet ID to scrape scraped_tweets: Dictionary of already scraped tweets output_dir: Output directory for TOML files media_dir: Media directory avatars_dir: Avatars directory depth: Current recursion depth max_depth: Maximum recursion depth bare_scrape: Whether to do bare scraping advanced_info: Whether to include advanced info download_media: Whether to download media download_avatars: Whether to download avatars recursive: Whether to recursively scrape quotes/retweets scrape_replied_to_tweet: Whether to scrape replied-to tweets recursive_replied_to_tweets: Whether to recursively scrape replied-to tweets recursive_replied_to_tweets_quotes_retweets: Whether to scrape quotes/retweets of replied-to tweets download_replied_to_tweets_media: Whether to download media for replied-to tweets max_replied_to_tweets_recursion_depth: Max depth for replied-to tweets delay_between_requests: Delay between requests replied_to_depth: Current replied-to recursion depth """ # Skip if already scraped if tweet_id in scraped_tweets: return # Check depth limits if depth >= max_depth: return if replied_to_depth >= max_replied_to_tweets_recursion_depth: return # Fetch tweet print(f" {' ' * depth}→ Fetching tweet {tweet_id}...") tweet_result = fetch_tweet_by_id(scraper, tweet_id, delay_between_requests=delay_between_requests) if not tweet_result: print(f" {' ' * depth}⚠ Warning: Could not fetch tweet {tweet_id} (deleted or private?)") return # Extract tweet data is_replied_to_tweet = (replied_to_depth > 0) current_bare_scrape = bare_scrape and not is_replied_to_tweet current_advanced_info = advanced_info and not is_replied_to_tweet tweet_data = extract_tweet_data(tweet_result, bare_scrape=current_bare_scrape, advanced_info=current_advanced_info) # Download avatar if enabled if download_avatars and not is_replied_to_tweet: author_id = tweet_data.get('author', {}).get('id') avatar_url = tweet_data.get('author', {}).get('avatar_url', '') if author_id and avatar_url: avatar_path = download_avatar(avatar_url, author_id, avatars_dir) if avatar_path: tweet_data['author']['avatar_local_path'] = os.path.relpath( avatar_path, output_dir ) # Download media if enabled should_download_media = download_media and not is_replied_to_tweet if not should_download_media and is_replied_to_tweet: should_download_media = download_replied_to_tweets_media if should_download_media: download_tweet_media(tweet_data, tweet_id, media_dir) # Save tweet to TOML file toml_file = os.path.join(output_dir, f"tweet-{tweet_id}.toml") try: if TOML_LIB == 'tomlkit': # tomlkit: parse empty string to get document, then update it doc = tomlkit.parse('') # Convert dict to tomlkit document recursively def dict_to_tomlkit(d, doc_obj): for key, value in d.items(): if isinstance(value, dict): doc_obj[key] = dict_to_tomlkit(value, tomlkit.table()) elif isinstance(value, list): arr = tomlkit.array() for item in value: if isinstance(item, dict): arr.append(dict_to_tomlkit(item, tomlkit.table())) else: arr.append(item) doc_obj[key] = arr else: doc_obj[key] = value return doc_obj doc = dict_to_tomlkit(tweet_data, doc) with open(toml_file, 'w') as f: f.write(tomlkit.dumps(doc)) else: # tomli_w uses binary mode with open(toml_file, 'wb') as f: tomlkit.dump(tweet_data, f) except Exception as e: print(f" {' ' * depth}⚠ Warning: Failed to save TOML file for tweet {tweet_id}: {e}") return # Mark as scraped scraped_tweets[tweet_id] = tweet_data # Rate limiting if delay_between_requests > 0: time.sleep(delay_between_requests) # Recursively scrape related tweets if recursive and depth < max_depth - 1: related_ids = extract_related_tweet_ids(tweet_data) for related_id in related_ids: if related_id not in scraped_tweets: scrape_tweets_recursive( scraper, related_id, scraped_tweets, output_dir, media_dir, avatars_dir, depth + 1, max_depth, bare_scrape, advanced_info, download_media, download_avatars, recursive, scrape_replied_to_tweet, recursive_replied_to_tweets, recursive_replied_to_tweets_quotes_retweets, download_replied_to_tweets_media, max_replied_to_tweets_recursion_depth, delay_between_requests, replied_to_depth ) # Handle replied-to tweets if scrape_replied_to_tweet or recursive_replied_to_tweets: in_reply_to_status_id = tweet_data.get('in_reply_to_status_id') if in_reply_to_status_id and in_reply_to_status_id not in scraped_tweets: new_replied_to_depth = replied_to_depth + 1 if recursive_replied_to_tweets else replied_to_depth # Determine if we should recursively scrape quotes/retweets of replied-to tweets should_recurse_quotes_retweets = ( recursive_replied_to_tweets_quotes_retweets and new_replied_to_depth < max_replied_to_tweets_recursion_depth ) scrape_tweets_recursive( scraper, in_reply_to_status_id, scraped_tweets, output_dir, media_dir, avatars_dir, depth, max_depth, bare_scrape, advanced_info, download_media, download_avatars, should_recurse_quotes_retweets, scrape_replied_to_tweet, recursive_replied_to_tweets, recursive_replied_to_tweets_quotes_retweets, download_replied_to_tweets_media, max_replied_to_tweets_recursion_depth, delay_between_requests, new_replied_to_depth ) def load_scraped_tweets(output_dir: str) -> Dict[str, Dict]: """ Load already scraped tweets from TOML files (for resume capability). Args: output_dir: Output directory Returns: Dictionary mapping tweet IDs to tweet data """ scraped_tweets = {} if not os.path.exists(output_dir): return scraped_tweets for filename in os.listdir(output_dir): if filename.startswith('tweet-') and filename.endswith('.toml'): tweet_id = filename[6:-5] # Remove 'tweet-' prefix and '.toml' suffix scraped_tweets[tweet_id] = {'id': tweet_id} # Mark as scraped return scraped_tweets def main(): """Main function.""" parser = argparse.ArgumentParser( description='Extract tweet contents from Tweet IDs and save as TOML files.' ) # Tweet ID inputs parser.add_argument( '--tweet-ids', type=str, help='Comma-separated Tweet IDs, e.g. "12345,67890,13579"' ) parser.add_argument( '--tweet-ids-file', type=str, help='Path(s) to file(s) containing Tweet IDs (comma-separated), ' 'e.g. "path/to/tweet_ids.txt,path/to/second/file.json"' ) # Output directories parser.add_argument( '--output-dir', type=str, default='scraped-tweets', help='Directory to save tweet TOML files (default: scraped-tweets)' ) parser.add_argument( '--media-dir', type=str, help='Directory to save media files (default: /media)' ) # Media and avatar downloads parser.add_argument( '--download-media', action='store_true', help='Download media files (images, videos, GIFs) attached to tweets' ) avatar_group = parser.add_mutually_exclusive_group() avatar_group.add_argument( '--download-avatars', action='store_true', default=True, help='Download avatars of tweet authors (default: True)' ) avatar_group.add_argument( '--no-download-avatars', dest='download_avatars', action='store_false', help='Do not download avatars' ) # Recursion settings recursion_group = parser.add_mutually_exclusive_group() recursion_group.add_argument( '--recursive', action='store_true', default=True, help='Recursively extract quoted or retweeted tweets (default: True)' ) recursion_group.add_argument( '--no-recursive', dest='recursive', action='store_false', help='Do not recursively extract quoted or retweeted tweets' ) parser.add_argument( '--max-recursion-depth', type=int, default=10, help='Maximum recursion depth for quoted/retweeted tweets (default: 10)' ) # Replied-to tweet settings parser.add_argument( '--scrape-replied-to-tweet', action='store_true', help='Also extract the tweet that the author replied to' ) parser.add_argument( '--recursive-replied-to-tweets', action='store_true', help='Recursively extract replied-to tweets' ) parser.add_argument( '--recursive-replied-to-tweets-quotes-retweets', action='store_true', help='Recursively extract quoted or retweeted tweets of replied-to tweets' ) parser.add_argument( '--download-replied-to-tweets-media', action='store_true', help='Download media for replied-to tweets as well' ) parser.add_argument( '--max-replied-to-tweets-recursion-depth', type=int, default=5, help='Maximum depth for replied-to tweets recursion (default: 5)' ) # Scraping modes parser.add_argument( '--advanced-info', action='store_true', help='Extract additional optional information about tweets' ) parser.add_argument( '--bare-scrape', action='store_true', help='Only extract bare minimum information about tweets' ) # Rate limiting parser.add_argument( '--delay-between-requests', type=float, default=2.0, help='Delay in seconds between requests (default: 2.0)' ) # Credentials parser.add_argument( '--credentials-file', type=str, help='Path to credentials file (default: creds.txt in current directory)' ) parser.add_argument( '--credentials-string', type=str, help='Credentials string directly (cannot be used with --credentials-file)' ) args = parser.parse_args() # Validate arguments if not args.tweet_ids and not args.tweet_ids_file: parser.error("Either --tweet-ids or --tweet-ids-file must be provided") if args.bare_scrape and args.advanced_info: parser.error("--bare-scrape and --advanced-info are mutually exclusive") if args.credentials_file and args.credentials_string: parser.error("--credentials-file and --credentials-string cannot be specified at the same time") # Parse tweet IDs print("Parsing tweet IDs...") tweet_ids = parse_tweet_ids_from_args(args.tweet_ids, args.tweet_ids_file) if not tweet_ids: print("❌ No tweet IDs found. Exiting.") return print(f"✓ Found {len(tweet_ids)} unique tweet ID(s)") # Set up directories output_dir = os.path.abspath(args.output_dir) os.makedirs(output_dir, exist_ok=True) if args.media_dir: media_dir = os.path.abspath(args.media_dir) else: media_dir = os.path.join(output_dir, 'media') avatars_dir = os.path.join(media_dir, 'avatars') os.makedirs(avatars_dir, exist_ok=True) # Load cookies if args.credentials_string: # Use credentials string directly cookie_str = args.credentials_string.strip() elif args.credentials_file: # Use specified credentials file creds_file = os.path.abspath(args.credentials_file) if not os.path.exists(creds_file): print(f"❌ Error: Credentials file not found: {creds_file}") return with open(creds_file, 'r') as f: cookie_str = f.read().strip() else: # Default: look for creds.txt in current directory creds_file = os.path.join(os.getcwd(), 'creds.txt') if not os.path.exists(creds_file): print(f"❌ Error: creds.txt not found in current directory ({os.getcwd()}). " f"Please create it with your Twitter cookies, or use --credentials-file or --credentials-string.") return with open(creds_file, 'r') as f: cookie_str = f.read().strip() # Parse cookie string into dictionary cookie_dict = dict(item.split("=", 1) for item in cookie_str.split(";")) # Initialize scraper scraper = Scraper(cookies=cookie_dict, save=False) # Load already scraped tweets (for resume) scraped_tweets = load_scraped_tweets(output_dir) initial_count = len(scraped_tweets) if initial_count > 0: print(f"✓ Found {initial_count} already scraped tweet(s), resuming...") # Filter out already scraped tweets remaining_tweet_ids = [tid for tid in tweet_ids if tid not in scraped_tweets] if not remaining_tweet_ids: print("✓ All tweets already scraped!") return print(f"→ Scraping {len(remaining_tweet_ids)} new tweet(s)...") print("-" * 80) # Track statistics stats = { 'total_requested': len(tweet_ids), 'already_scraped': initial_count, 'newly_scraped': 0, 'failed': 0, 'start_time': datetime.now() } # Scrape tweets for idx, tweet_id in enumerate(remaining_tweet_ids, 1): print(f"\n[{idx}/{len(remaining_tweet_ids)}] Processing tweet {tweet_id}...") try: scrape_tweets_recursive( scraper, tweet_id, scraped_tweets, output_dir, media_dir, avatars_dir, depth=0, max_depth=args.max_recursion_depth, bare_scrape=args.bare_scrape, advanced_info=args.advanced_info, download_media=args.download_media, download_avatars=args.download_avatars, recursive=args.recursive, scrape_replied_to_tweet=args.scrape_replied_to_tweet, recursive_replied_to_tweets=args.recursive_replied_to_tweets, recursive_replied_to_tweets_quotes_retweets=args.recursive_replied_to_tweets_quotes_retweets, download_replied_to_tweets_media=args.download_replied_to_tweets_media, max_replied_to_tweets_recursion_depth=args.max_replied_to_tweets_recursion_depth, delay_between_requests=args.delay_between_requests ) stats['newly_scraped'] += 1 except Exception as e: print(f" ❌ Error processing tweet {tweet_id}: {e}") stats['failed'] += 1 # Calculate final statistics stats['end_time'] = datetime.now() stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds() stats['total_scraped'] = len(scraped_tweets) # Save summary summary = { 'scraping_summary': { 'total_requested': stats['total_requested'], 'already_scraped': stats['already_scraped'], 'newly_scraped': stats['newly_scraped'], 'failed': stats['failed'], 'total_scraped': stats['total_scraped'], 'start_time': stats['start_time'].isoformat(), 'end_time': stats['end_time'].isoformat(), 'duration_seconds': stats['duration'], 'output_directory': output_dir, 'media_directory': media_dir, 'settings': { 'recursive': args.recursive, 'max_recursion_depth': args.max_recursion_depth, 'bare_scrape': args.bare_scrape, 'advanced_info': args.advanced_info, 'download_media': args.download_media, 'download_avatars': args.download_avatars, 'scrape_replied_to_tweet': args.scrape_replied_to_tweet, 'recursive_replied_to_tweets': args.recursive_replied_to_tweets, 'max_replied_to_tweets_recursion_depth': args.max_replied_to_tweets_recursion_depth } } } summary_file = os.path.join(output_dir, 'scraping_summary.toml') if TOML_LIB == 'tomlkit': # Convert to tomlkit document doc = tomlkit.parse('') def dict_to_tomlkit(d, doc_obj): for key, value in d.items(): if isinstance(value, dict): doc_obj[key] = dict_to_tomlkit(value, tomlkit.table()) elif isinstance(value, list): arr = tomlkit.array() for item in value: if isinstance(item, dict): arr.append(dict_to_tomlkit(item, tomlkit.table())) else: arr.append(item) doc_obj[key] = arr else: doc_obj[key] = value return doc_obj doc = dict_to_tomlkit(summary, doc) with open(summary_file, 'w') as f: f.write(tomlkit.dumps(doc)) else: with open(summary_file, 'wb') as f: tomlkit.dump(summary, f) # Print final summary print(f"\n{'='*80}") print("Scraping complete!") print(f" Total requested: {stats['total_requested']}") print(f" Already scraped: {stats['already_scraped']}") print(f" Newly scraped: {stats['newly_scraped']}") print(f" Failed: {stats['failed']}") print(f" Total scraped: {stats['total_scraped']}") print(f" Duration: {stats['duration']:.1f}s ({stats['duration']/60:.1f} minutes)") print(f" Output directory: {output_dir}") print(f" Summary saved to: {summary_file}") print(f"{'='*80}\n") if __name__ == "__main__": main()