diff --git a/docs/README.md b/docs/README.md index c9fa131..c6d1eba 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,14 +15,13 @@ An open-source self-hosted archiving tool. Work in progress. - [x] Snapchat - [ ] YouTube Posts (postponed) - [x] Archiving local files - - [x] Archiving Twitter Tweets & Threads + - [x] Archiving Twitter Tweets, Threads, and Articles - [ ] Archiving files from cloud storage services (Google Drive, Dropbox, OneDrive) and from URLs - [ ] URLs - [ ] Google Drive - [ ] Dropbox - [ ] OneDrive - (Some of these could be postponed for later.) - - [ ] Archiving Twitter articles - [ ] Archive web pages (HTML, CSS, JS, images) - [ ] Archiving emails (???) - [ ] Gmail @@ -62,7 +61,7 @@ This project aims to provide a reliable solution for archiving important data fr - Local files: `file:///absolute/path/to/file.ext` - YouTube media: standard video/short URLs, plus [shorthand video inputs](#supported-shorthand-inputs) - X/Twitter media from Tweets: normal Tweet URLs or the `tweet:media:ID` shorthand -- X/Twitter Tweet content scrape: [Tweet and Thread shorthands](#supported-shorthand-inputs). (These are saved as TOML files in `raw_tweets/`) +- X/Twitter Tweet content scrape: [Tweet and Thread shorthands](#supported-shorthand-inputs). (These are saved as JSON files in `raw_tweets/`) - Instagram, Facebook, TikTok, Reddit, Snapchat: direct URLs or platform-prefixed shorthand passed through to `yt-dlp` ### Supported Shorthand Inputs @@ -73,7 +72,7 @@ This project aims to provide a reliable solution for archiving important data fr - `yt:short/ID` - `yt:shorts/ID` - `youtube:shorts/ID` -- X/Twitter tweet TOML content: +- X/Twitter tweet JSON content: - `tweet:ID` - `x:tweet:ID` - `x:x:ID` @@ -81,7 +80,7 @@ This project aims to provide a reliable solution for archiving important data fr - `twitter:tweet:ID` - X/Twitter media/video download: - `tweet:media:ID` -- X/Twitter thread TOML content: +- X/Twitter thread JSON content: - `x:thread:ID` - `twitter:thread:ID` - Other platform shorthands: diff --git a/flake.nix b/flake.nix index 93677bf..a050caa 100644 --- a/flake.nix +++ b/flake.nix @@ -39,7 +39,10 @@ inherit version; hash = "sha256-S5KzQRDIQroc2bJsPLaKR9xocHKniqd9Z055CsC5rbQ="; }; - nativeBuildInputs = [ pyPkgs.setuptools pyPkgs.wheel ]; + nativeBuildInputs = [ + pyPkgs.setuptools + pyPkgs.wheel + ]; propagatedBuildInputs = [ pyPkgs.aiofiles pyPkgs."nest-asyncio" @@ -53,13 +56,9 @@ pythonImportsCheck = [ "twitter" ]; doCheck = false; }; - tweetPython = pkgs.python312.withPackages ( - ps: [ - ps.tomlkit - ps."tomli-w" - twitterApiClient - ] - ); + tweetPython = pkgs.python312.withPackages (ps: [ + twitterApiClient + ]); archivr_unwrapped = pkgs.rustPlatform.buildRustPackage { pname = "archivr"; version = "0.1.0"; @@ -118,7 +117,10 @@ inherit version; hash = "sha256-S5KzQRDIQroc2bJsPLaKR9xocHKniqd9Z055CsC5rbQ="; }; - nativeBuildInputs = [ pyPkgs.setuptools pyPkgs.wheel ]; + nativeBuildInputs = [ + pyPkgs.setuptools + pyPkgs.wheel + ]; propagatedBuildInputs = [ pyPkgs.aiofiles pyPkgs."nest-asyncio" @@ -132,13 +134,9 @@ pythonImportsCheck = [ "twitter" ]; doCheck = false; }; - tweetPython = pkgs.python312.withPackages ( - ps: [ - ps.tomlkit - ps."tomli-w" - twitterApiClient - ] - ); + tweetPython = pkgs.python312.withPackages (ps: [ + twitterApiClient + ]); in { default = pkgs.mkShell { diff --git a/src/downloader/tweets.rs b/src/downloader/tweets.rs index 1e66063..dc430d6 100644 --- a/src/downloader/tweets.rs +++ b/src/downloader/tweets.rs @@ -64,7 +64,7 @@ fn build_scraper_args( /// Archives a tweet (or full thread) identified by `path` (e.g. `"tweet:123"`). /// /// Invokes the Python scraper, then moves all produced media assets into the -/// content-addressed raw store and rewrites the TOML output to use the new +/// content-addressed raw store and rewrites the JSON output to use the new /// store-relative paths. Returns `true` if new content was archived, `false` /// if the tweet was already present and `thread` is `false`. /// @@ -72,7 +72,7 @@ fn build_scraper_args( /// can be overridden via `ARCHIVR_TWEET_SCRAPER` and `ARCHIVR_TWEET_PYTHON`. pub fn archive(path: &str, thread: bool, store_path: &Path, timestamp: &str) -> Result { let invocation_cwd = env::current_dir().context("Failed to read current working directory")?; - // Output directory for Tweet TOML files. + // Output directory for Tweet JSON files. let output_dir = store_path.join("raw_tweets"); // Temporary directory for media assets downloaded by the scraper in `temp/...`. let temp_dir = store_path.join("temp").join(timestamp).join("tweets"); @@ -81,13 +81,13 @@ pub fn archive(path: &str, thread: bool, store_path: &Path, timestamp: &str) -> fs::create_dir_all(&output_dir)?; fs::create_dir_all(&temp_dir)?; - // Path to the root - the to-be-archived tweet's TOML file. - let root_toml = output_dir.join(format!("tweet-{tweet_id}.toml")); - if !thread && root_toml.exists() { + // Path to the root - the to-be-archived tweet's JSON file. + let root_json = output_dir.join(format!("tweet-{tweet_id}.json")); + if !thread && root_json.exists() { return Ok(false); } - let before = tweet_toml_files(&output_dir)?; + let before = tweet_json_files(&output_dir)?; let python = env::var_os("ARCHIVR_TWEET_PYTHON").unwrap_or_else(|| OsString::from("python3")); let scraper_path = env::var_os("ARCHIVR_TWEET_SCRAPER") @@ -135,37 +135,37 @@ pub fn archive(path: &str, thread: bool, store_path: &Path, timestamp: &str) -> ); } - if !root_toml.exists() { + if !root_json.exists() { let stderr = String::from_utf8_lossy(&output.stderr); let stdout = String::from_utf8_lossy(&output.stdout); bail!( - "Tweet scraper completed but did not create expected TOML file: {}\nstdout:\n{}\nstderr:\n{}", - root_toml.display(), + "Tweet scraper completed but did not create expected JSON file: {}\nstdout:\n{}\nstderr:\n{}", + root_json.display(), stdout.trim(), stderr.trim() ); } cleanup_summary(&output_dir)?; - let after = tweet_toml_files(&output_dir)?; - let new_tomls = new_tweet_tomls(&before, &after); - rewrite_tweet_outputs(&new_tomls, &output_dir, &temp_dir, store_path)?; + let after = tweet_json_files(&output_dir)?; + let new_jsons = new_tweet_jsons(&before, &after); + rewrite_tweet_outputs(&new_jsons, &output_dir, &temp_dir, store_path)?; let _ = fs::remove_dir_all(store_path.join("temp").join(timestamp)); Ok(true) } -/// Removes the `scraping_summary.toml` file left by the scraper, if present. +/// Removes the `scraping_summary.json` file left by the scraper, if present. fn cleanup_summary(output_dir: &Path) -> Result<()> { - let summary_path = output_dir.join("scraping_summary.toml"); + let summary_path = output_dir.join("scraping_summary.json"); if summary_path.exists() { fs::remove_file(summary_path)?; } Ok(()) } -/// Returns the set of `tweet-*.toml` files present in `output_dir`. -fn tweet_toml_files(output_dir: &Path) -> Result> { +/// Returns the set of `tweet-*.json` files present in `output_dir`. +fn tweet_json_files(output_dir: &Path) -> Result> { let mut files = HashSet::new(); for entry in fs::read_dir(output_dir)? { @@ -176,7 +176,7 @@ fn tweet_toml_files(output_dir: &Path) -> Result> { && path .file_name() .and_then(|name| name.to_str()) - .is_some_and(|name| name.starts_with("tweet-") && name.ends_with(".toml")) + .is_some_and(|name| name.starts_with("tweet-") && name.ends_with(".json")) { files.insert(path); } @@ -185,38 +185,38 @@ fn tweet_toml_files(output_dir: &Path) -> Result> { Ok(files) } -/// Returns the sorted list of TOML files present in `after` but not in `before`. -fn new_tweet_tomls(before: &HashSet, after: &HashSet) -> Vec { +/// Returns the sorted list of JSON files present in `after` but not in `before`. +fn new_tweet_jsons(before: &HashSet, after: &HashSet) -> Vec { let mut files = after.difference(before).cloned().collect::>(); files.sort(); files } -/// Returns a lazily-compiled regex matching `avatar_local_path = "..."` in TOML. +/// Returns a lazily-compiled regex matching `"avatar_local_path": "..."` in JSON. fn avatar_regex() -> &'static Regex { static REGEX: OnceLock = OnceLock::new(); - REGEX.get_or_init(|| Regex::new(r#"avatar_local_path = "([^"\n]+)""#).unwrap()) + REGEX.get_or_init(|| Regex::new(r#""avatar_local_path": "([^"\n]+)""#).unwrap()) } -/// Returns a lazily-compiled regex matching `local_path = "..."` in TOML. +/// Returns a lazily-compiled regex matching `"local_path": "..."` in JSON. fn media_regex() -> &'static Regex { static REGEX: OnceLock = OnceLock::new(); - REGEX.get_or_init(|| Regex::new(r#"(?m)\blocal_path = "([^"\n]+)""#).unwrap()) + REGEX.get_or_init(|| Regex::new(r#"(?m)"local_path": "([^"\n]+)""#).unwrap()) } -/// Rewrites asset paths in each newly-created TOML file, moving assets into +/// Rewrites asset paths in each newly-created JSON file, moving assets into /// the content-addressed store. Files are written back only if content changed. fn rewrite_tweet_outputs( - tweet_tomls: &[PathBuf], + tweet_jsons: &[PathBuf], output_dir: &Path, temp_dir: &Path, store_path: &Path, ) -> Result<()> { let mut archived_assets = HashMap::new(); - for path in tweet_tomls { + for path in tweet_jsons { let contents = fs::read_to_string(path)?; - let rewritten = rewrite_toml_asset_paths( + let rewritten = rewrite_json_asset_paths( &contents, output_dir, temp_dir, @@ -234,9 +234,9 @@ fn rewrite_tweet_outputs( /// Rewrites all `avatar_local_path` and `local_path` references in `contents`, /// archiving each referenced file into the raw store and returning the updated -/// TOML string. `archived_assets` is a cache to avoid re-archiving the same +/// JSON string. `archived_assets` is a cache to avoid re-archiving the same /// file when it is referenced by multiple tweets. -fn rewrite_toml_asset_paths( +fn rewrite_json_asset_paths( contents: &str, output_dir: &Path, temp_dir: &Path, @@ -250,8 +250,8 @@ fn rewrite_toml_asset_paths( let new_path = archive_asset_reference(&old_path, output_dir, store_path, "avatar", archived_assets)?; rewritten = rewritten.replace( - &format!(r#"avatar_local_path = "{old_path}""#), - &format!(r#"avatar_local_path = "{new_path}""#), + &format!(r#""avatar_local_path": "{old_path}""#), + &format!(r#""avatar_local_path": "{new_path}""#), ); } @@ -260,8 +260,8 @@ fn rewrite_toml_asset_paths( let new_path = archive_asset_reference(&old_path, temp_dir, store_path, "media", archived_assets)?; rewritten = rewritten.replace( - &format!(r#"local_path = "{old_path}""#), - &format!(r#"local_path = "{new_path}""#), + &format!(r#""local_path": "{old_path}""#), + &format!(r#""local_path": "{new_path}""#), ); } @@ -377,19 +377,19 @@ mod tests { fn test_cleanup_summary_removes_summary_only() { let output_dir = unique_path("archivr-tweet-summary"); fs::create_dir_all(&output_dir).unwrap(); - fs::write(output_dir.join("scraping_summary.toml"), "summary").unwrap(); - fs::write(output_dir.join("tweet-1.toml"), "tweet").unwrap(); + fs::write(output_dir.join("scraping_summary.json"), "summary").unwrap(); + fs::write(output_dir.join("tweet-1.json"), "tweet").unwrap(); cleanup_summary(&output_dir).unwrap(); - assert!(!output_dir.join("scraping_summary.toml").exists()); - assert!(output_dir.join("tweet-1.toml").exists()); + assert!(!output_dir.join("scraping_summary.json").exists()); + assert!(output_dir.join("tweet-1.json").exists()); let _ = fs::remove_dir_all(output_dir); } #[test] - fn test_rewrite_toml_asset_paths_rearchives_assets() { + fn test_rewrite_json_asset_paths_rearchives_assets() { let store_path = unique_path("archivr-tweet-store"); let output_dir = store_path.join("raw_tweets"); let temp_dir = store_path.join("temp").join("ts").join("tweets"); @@ -408,15 +408,12 @@ mod tests { ) .unwrap(); - let contents = r#" -[entities] -media = [{ local_path = "media/123/media_1.jpg" }] + let contents = r#"{ + "entities": { "media": [{ "local_path": "media/123/media_1.jpg" }] }, + "author": { "avatar_local_path": "../temp/ts/tweets/media/avatars/avatar.jpg" } +}"#; -[author] -avatar_local_path = "../temp/ts/tweets/media/avatars/avatar.jpg" -"#; - - let rewritten = rewrite_toml_asset_paths( + let rewritten = rewrite_json_asset_paths( contents, &output_dir, &temp_dir, @@ -425,8 +422,8 @@ avatar_local_path = "../temp/ts/tweets/media/avatars/avatar.jpg" ) .unwrap(); - assert!(rewritten.contains(r#"avatar_local_path = "raw/"#)); - assert!(rewritten.contains(r#"local_path = "raw/"#)); + assert!(rewritten.contains(r#""avatar_local_path": "raw/"#)); + assert!(rewritten.contains(r#""local_path": "raw/"#)); assert!( !temp_dir .join("media") @@ -464,7 +461,7 @@ avatar_local_path = "../temp/ts/tweets/media/avatars/avatar.jpg" let output_dir = store_path.join("raw_tweets"); fs::create_dir_all(&output_dir).unwrap(); fs::create_dir_all(store_path.join("temp")).unwrap(); - fs::write(output_dir.join("tweet-123.toml"), "id = \"123\"").unwrap(); + fs::write(output_dir.join("tweet-123.json"), r#"{"id":"123"}"#).unwrap(); let credentials = store_path.join("creds.txt"); fs::write(&credentials, "ct0=test;auth_token=test").unwrap(); @@ -522,15 +519,13 @@ done mkdir -p "$output_dir" "$media_dir/avatars" "$media_dir/$tweet_id" printf 'avatar' > "$media_dir/avatars/author.jpg" printf 'media' > "$media_dir/$tweet_id/media_1.jpg" -printf 'summary = true\n' > "$output_dir/scraping_summary.toml" -cat > "$output_dir/tweet-$tweet_id.toml" < "$output_dir/scraping_summary.json" +cat > "$output_dir/tweet-$tweet_id.json" < Set[str]: +def parse_tweet_ids_from_args( + tweet_ids_str: Optional[str], tweet_ids_files: Optional[str] +) -> Set[str]: """ Parse tweet IDs from CLI arguments. @@ -99,17 +86,17 @@ def parse_tweet_ids_from_args(tweet_ids_str: Optional[str], # Parse comma-separated tweet IDs if tweet_ids_str: - ids = [tid.strip() for tid in tweet_ids_str.split(',') if tid.strip()] + ids = [tid.strip() for tid in tweet_ids_str.split(",") if tid.strip()] all_tweet_ids.update(ids) # Parse tweet IDs from files if tweet_ids_files: - file_paths = [f.strip() for f in tweet_ids_files.split(',') if f.strip()] + file_paths = [f.strip() for f in tweet_ids_files.split(",") if f.strip()] for file_path in file_paths: file_path = os.path.expanduser(file_path) if not os.path.isabs(file_path): file_path = os.path.join(os.getcwd(), file_path) - + if not os.path.exists(file_path): print(f"⚠ Warning: File not found: {file_path}") continue @@ -140,41 +127,41 @@ def parse_tweet_ids_from_file(file_path: str) -> List[str]: List of tweet IDs """ tweet_ids = [] - + # Check file extension _, ext = os.path.splitext(file_path.lower()) - - if ext == '.json': + + if ext == ".json": # Try to parse as JSON - with open(file_path, 'r') as f: + with open(file_path, "r") as f: data = json.load(f) - + # Check if it's a scrape summary file - if isinstance(data, dict) and 'tweet_ids_file' in data: + if isinstance(data, dict) and "tweet_ids_file" in data: # It's a scrape summary file - tweet_ids_file = data['tweet_ids_file'] + tweet_ids_file = data["tweet_ids_file"] if not os.path.isabs(tweet_ids_file): # Make relative to the summary file's directory summary_dir = os.path.dirname(file_path) tweet_ids_file = os.path.join(summary_dir, tweet_ids_file) - + # Recursively parse the tweet IDs file return parse_tweet_ids_from_file(tweet_ids_file) - + # Check if it's a list of tweet IDs elif isinstance(data, list): tweet_ids = [str(tid) for tid in data if tid] else: raise ValueError(f"Unexpected JSON structure in {file_path}") - + else: # Assume plain text file with one tweet ID per line - with open(file_path, 'r') as f: + with open(file_path, "r") as f: for line in f: line = line.strip() - if line and not line.startswith('#'): + if line and not line.startswith("#"): tweet_ids.append(line) - + return tweet_ids @@ -203,62 +190,78 @@ def extract_tweet_from_response(response_data: Any, tweet_id: str) -> Optional[D # Navigate through the nested structure # Try different possible paths tweet_result = None - + # Path 1: TweetDetail GraphQL response structure # Check for threaded_conversation_with_injections_v2 structure - if 'data' in data: - threaded_conversation = data.get('data', {}).get('threaded_conversation_with_injections_v2', {}) - instructions = threaded_conversation.get('instructions', []) - + if "data" in data: + threaded_conversation = data.get("data", {}).get( + "threaded_conversation_with_injections_v2", {} + ) + instructions = threaded_conversation.get("instructions", []) + for instruction in instructions: - if instruction.get('type') == 'TimelineAddEntries': - entries = instruction.get('entries', []) + if instruction.get("type") == "TimelineAddEntries": + entries = instruction.get("entries", []) for entry in entries: - content = entry.get('content', {}) - if content.get('entryType') == 'TimelineTimelineItem': - item_content = content.get('itemContent', {}) - if item_content.get('itemType') == 'TimelineTweet': - result = item_content.get('tweet_results', {}).get('result', {}) - if result.get('rest_id') == tweet_id: + content = entry.get("content", {}) + if content.get("entryType") == "TimelineTimelineItem": + item_content = content.get("itemContent", {}) + if item_content.get("itemType") == "TimelineTweet": + result = item_content.get("tweet_results", {}).get( + "result", {} + ) + if result.get("rest_id") == tweet_id: tweet_result = result break if tweet_result: break if tweet_result: break - + # Path 2: Timeline structure (for user tweets) - if not tweet_result and 'data' in data: - timeline = data.get('data', {}).get('user', {}).get('result', {}).get('timeline_v2', {}).get('timeline', {}) - instructions = timeline.get('instructions', []) - + if not tweet_result and "data" in data: + timeline = ( + data.get("data", {}) + .get("user", {}) + .get("result", {}) + .get("timeline_v2", {}) + .get("timeline", {}) + ) + instructions = timeline.get("instructions", []) + for instruction in instructions: - if instruction.get('type') == 'TimelineAddEntries': - entries = instruction.get('entries', []) + if instruction.get("type") == "TimelineAddEntries": + entries = instruction.get("entries", []) for entry in entries: - content = entry.get('content', {}) - if content.get('entryType') == 'TimelineTimelineItem': - item_content = content.get('itemContent', {}) - if item_content.get('itemType') == 'TimelineTweet': - result = item_content.get('tweet_results', {}).get('result', {}) - if result.get('rest_id') == tweet_id: + content = entry.get("content", {}) + if content.get("entryType") == "TimelineTimelineItem": + item_content = content.get("itemContent", {}) + if item_content.get("itemType") == "TimelineTweet": + result = item_content.get("tweet_results", {}).get( + "result", {} + ) + if result.get("rest_id") == tweet_id: tweet_result = result break if tweet_result: break if tweet_result: break - + # Path 3: Direct tweet lookup (recursive search) if not tweet_result: + def find_tweet_recursive(obj, target_id): if isinstance(obj, dict): # Check if this is a tweet result with matching ID - if obj.get('rest_id') == target_id and obj.get('__typename') == 'Tweet': + if ( + obj.get("rest_id") == target_id + and obj.get("__typename") == "Tweet" + ): return obj # Also check legacy.id_str for older format - legacy = obj.get('legacy', {}) - if legacy and legacy.get('id_str') == target_id: + legacy = obj.get("legacy", {}) + if legacy and legacy.get("id_str") == target_id: return obj # Recursively search for value in obj.values(): @@ -271,7 +274,7 @@ def extract_tweet_from_response(response_data: Any, tweet_id: str) -> Optional[D if result: return result return None - + tweet_result = find_tweet_recursive(data, tweet_id) return tweet_result @@ -279,12 +282,186 @@ def extract_tweet_from_response(response_data: Any, tweet_id: str) -> Optional[D except Exception as e: print(f" ⚠ Warning: Error extracting tweet {tweet_id}: {e}") import traceback + traceback.print_exc() return None -def extract_tweet_data(tweet_result: Dict, bare_scrape: bool = False, - advanced_info: bool = False) -> Dict: +from typing import Any, Dict, List, Optional + + +def extract_article_data(tweet_result: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Extract article data from a tweet result if the tweet contains an article. + """ + article_result = ( + tweet_result.get("article", {}).get("article_results", {}).get("result", {}) + ) + + if not article_result: + return None + + content_state = article_result.get("content_state", {}) + blocks = content_state.get("blocks", []) + entity_map_raw = content_state.get("entityMap", []) + media_entities = article_result.get("media_entities", []) + + # Normalize entity map because X may return it as a list of + # {"key": "...", "value": {...}} objects. + entity_map: Dict[str, Dict[str, Any]] = {} + if isinstance(entity_map_raw, list): + for entry in entity_map_raw: + key = str(entry.get("key")) + value = entry.get("value", {}) + entity_map[key] = value + elif isinstance(entity_map_raw, dict): + entity_map = {str(k): v for k, v in entity_map_raw.items()} + + # Index article media by media_id so atomic MEDIA blocks can be resolved. + media_by_id: Dict[str, Dict[str, Any]] = {} + for media in media_entities: + media_id = str(media.get("media_id")) + media_by_id[media_id] = media + + structured_blocks: List[Dict[str, Any]] = [] + + for block in blocks: + block_type = block.get("type", "") + block_text = block.get("text", "") + block_data: Dict[str, Any] = { + "type": block_type, + "text": block_text, + "key": block.get("key", ""), + "inline_style_ranges": block.get("inlineStyleRanges", []), + "entity_ranges": block.get("entityRanges", []), + "data": block.get("data", {}), + } + + # Resolve atomic blocks into something archivable/useful. + if block_type == "atomic": + resolved_entities: List[Dict[str, Any]] = [] + + for entity_range in block.get("entityRanges", []): + entity_key = str(entity_range.get("key")) + entity = entity_map.get(entity_key, {}) + entity_type = entity.get("type", "") + entity_data = entity.get("data", {}) + + if entity_type == "MEDIA": + for media_item in entity_data.get("mediaItems", []): + media_id = str(media_item.get("mediaId")) + media = media_by_id.get(media_id, {}) + media_info = media.get("media_info", {}) + + resolved_entities.append( + { + "type": "media", + "media_id": media_id, + "media_key": media.get("media_key", ""), + "url": media_info.get("original_img_url", ""), + "width": media_info.get("original_img_width", 0), + "height": media_info.get("original_img_height", 0), + } + ) + + elif entity_type == "TWEET": + resolved_entities.append( + { + "type": "tweet", + "tweet_id": entity_data.get("tweetId", ""), + } + ) + + elif entity_type == "DIVIDER": + resolved_entities.append({"type": "divider"}) + + elif entity_type == "LINK": + resolved_entities.append( + { + "type": "link", + "url": entity_data.get("url", ""), + } + ) + + elif entity_type == "TWEMOJI": + resolved_entities.append( + { + "type": "emoji", + "url": entity_data.get("url", ""), + } + ) + + else: + resolved_entities.append( + { + "type": entity_type.lower() if entity_type else "", + "data": entity_data, + } + ) + + block_data["resolved_entities"] = resolved_entities + + structured_blocks.append(block_data) + + # Pull article URL from the wrapper tweet URL entities if present. + legacy = tweet_result.get("legacy", {}) + article_url = "" + for url_obj in legacy.get("entities", {}).get("urls", []): + expanded_url = url_obj.get("expanded_url", "") + if "/i/article/" in expanded_url: + article_url = expanded_url + break + + # Author info: note this lives in user_result.core / avatar in your response, + # not where your current code is reading it from. + user_result = tweet_result.get("core", {}).get("user_results", {}).get("result", {}) + user_core = user_result.get("core", {}) + user_avatar = user_result.get("avatar", {}) + + cover_media = article_result.get("cover_media", {}) + cover_media_info = cover_media.get("media_info", {}) + + article_data = { + "id": article_result.get("rest_id"), + "tweet_id": tweet_result.get("rest_id"), + "url": article_url, + "title": article_result.get("title", ""), + "preview_text": article_result.get("preview_text", ""), + "summary_text": article_result.get("summary_text", ""), + "plain_text": article_result.get("plain_text", ""), + "is_grok_summary_eligible": article_result.get( + "is_grok_summary_eligible", False + ), + "first_published_at_secs": article_result.get("metadata", {}).get( + "first_published_at_secs" + ), + "modified_at_secs": article_result.get("lifecycle_state", {}).get( + "modified_at_secs" + ), + "cover_media": { + "media_id": cover_media.get("media_id"), + "media_key": cover_media.get("media_key", ""), + "url": cover_media_info.get("original_img_url", ""), + "width": cover_media_info.get("original_img_width", 0), + "height": cover_media_info.get("original_img_height", 0), + }, + "author": { + "id": user_result.get("rest_id"), + "name": user_core.get("name", ""), + "screen_name": user_core.get("screen_name", ""), + "avatar_url": user_avatar.get("image_url", ""), + }, + "blocks": structured_blocks, + "media_entities": media_entities, + "entity_map": entity_map, + } + + return article_data + + +def extract_tweet_data( + tweet_result: Dict, bare_scrape: bool = False, advanced_info: bool = False +) -> Dict: """ Extract tweet data from tweet result structure. @@ -297,119 +474,144 @@ def extract_tweet_data(tweet_result: Dict, bare_scrape: bool = False, Dictionary with tweet data """ tweet_data = {} - + # Extract tweet ID (bare) - tweet_data['id'] = tweet_result.get('rest_id') - + tweet_data["id"] = tweet_result.get("rest_id") + # Extract legacy data (main tweet content) - legacy = tweet_result.get('legacy', {}) - + legacy = tweet_result.get("legacy", {}) + # Extract full text (bare) - tweet_data['full_text'] = legacy.get('full_text', '') - + tweet_data["full_text"] = legacy.get("full_text", "") + # Extract is_quote_status (bare) - tweet_data['is_quote_status'] = legacy.get('is_quote_status', False) - + tweet_data["is_quote_status"] = legacy.get("is_quote_status", False) + # Extract entities (always included) - entities = legacy.get('entities', {}) - tweet_data['entities'] = { - 'hashtags': entities.get('hashtags', []), - 'urls': entities.get('urls', []), - 'user_mentions': entities.get('user_mentions', []), - 'symbols': entities.get('symbols', []), - 'media': entities.get('media', []) if not bare_scrape else [] + entities = legacy.get("entities", {}) + tweet_data["entities"] = { + "hashtags": entities.get("hashtags", []), + "urls": entities.get("urls", []), + "user_mentions": entities.get("user_mentions", []), + "symbols": entities.get("symbols", []), + "media": entities.get("media", []) if not bare_scrape else [], } - + # Extract optional fields if not bare scrape if not bare_scrape: # Optional: creation date if advanced_info: - tweet_data['created_at'] = legacy.get('created_at') - + tweet_data["created_at"] = legacy.get("created_at") + # Optional: bookmark count if advanced_info: - tweet_data['bookmark_count'] = legacy.get('bookmark_count', 0) - + tweet_data["bookmark_count"] = legacy.get("bookmark_count", 0) + # Optional: favorite count if advanced_info: - tweet_data['favorite_count'] = legacy.get('favorite_count', 0) - + tweet_data["favorite_count"] = legacy.get("favorite_count", 0) + # Optional: quote count if advanced_info: - tweet_data['quote_count'] = legacy.get('quote_count', 0) - + tweet_data["quote_count"] = legacy.get("quote_count", 0) + # Optional: reply count if advanced_info: - tweet_data['reply_count'] = legacy.get('reply_count', 0) - + tweet_data["reply_count"] = legacy.get("reply_count", 0) + # Optional: retweet count if advanced_info: - tweet_data['retweet_count'] = legacy.get('retweet_count', 0) - + tweet_data["retweet_count"] = legacy.get("retweet_count", 0) + # Optional: retweeted status if advanced_info: - tweet_data['retweeted'] = legacy.get('retweeted', False) - + tweet_data["retweeted"] = legacy.get("retweeted", False) + # Optional: edit_tweet_ids if advanced_info: - edit_control = tweet_result.get('edit_control', {}) - edit_tweet_ids = edit_control.get('edit_tweet_ids', []) + edit_control = tweet_result.get("edit_control", {}) + edit_tweet_ids = edit_control.get("edit_tweet_ids", []) if edit_tweet_ids: - tweet_data['edit_tweet_ids'] = edit_tweet_ids - + tweet_data["edit_tweet_ids"] = edit_tweet_ids + # Extract author information - core = tweet_result.get('core', {}) - user_results = core.get('user_results', {}) - user_result = user_results.get('result', {}) - legacy_user = user_result.get('legacy', {}) - + core = tweet_result.get("core", {}) + user_results = core.get("user_results", {}) + user_result = user_results.get("result", {}) + legacy_user = user_result.get("legacy", {}) + # Author ID (bare) - tweet_data['author'] = { - 'id': user_result.get('rest_id'), - 'name': legacy_user.get('name', ''), - 'screen_name': legacy_user.get('screen_name', '') + tweet_data["author"] = { + "id": user_result.get("rest_id"), + "name": legacy_user.get("name", ""), + "screen_name": legacy_user.get("screen_name", ""), } - + + # Crutch-y way of fixing Author ID if broken + if tweet_data["author"]["name"] == "" and tweet_data["author"]["screen_name"] == "": + user_result = user_results.get("result", {}) + user_core = user_result.get("core", {}) + + tweet_data["author"] = { + "id": user_result.get("rest_id"), + "name": user_core.get("name", ""), + "screen_name": user_core.get("screen_name", ""), + } + + tweet_data["is_article"] = False + + # Article data (bare) + article_data = extract_article_data(tweet_result) + if article_data: + tweet_data["article"] = article_data + tweet_data["is_article"] = True + # Author optional fields if not bare_scrape: # Avatar URL (always included if downloading avatars) - profile_image_url = legacy_user.get('profile_image_url_https', '') - tweet_data['author']['avatar_url'] = profile_image_url - + profile_image_url = legacy_user.get("profile_image_url_https", "") + tweet_data["author"]["avatar_url"] = profile_image_url or user_result.get( + "avatar", {} + ).get("image_url", "") + # Optional: verified status if advanced_info: - tweet_data['author']['is_verified'] = user_result.get('is_blue_verified', False) - + tweet_data["author"]["is_verified"] = user_result.get( + "is_blue_verified", False + ) + # Optional: follower count if advanced_info: - tweet_data['author']['followers_count'] = legacy_user.get('followers_count', 0) - + tweet_data["author"]["followers_count"] = legacy_user.get( + "followers_count", 0 + ) + # Extract retweeted status if present # Check both top-level and legacy level - retweeted_status_result = tweet_result.get('retweeted_status_result', {}) + retweeted_status_result = tweet_result.get("retweeted_status_result", {}) if not retweeted_status_result: - retweeted_status_result = legacy.get('retweeted_status_result', {}) - + retweeted_status_result = legacy.get("retweeted_status_result", {}) + if retweeted_status_result: - retweeted_result = retweeted_status_result.get('result', {}) + retweeted_result = retweeted_status_result.get("result", {}) if retweeted_result: # Extract bare minimum for retweeted tweet - tweet_data['retweeted_status'] = extract_tweet_data( - retweeted_result, + tweet_data["retweeted_status"] = extract_tweet_data( + retweeted_result, bare_scrape=True, # Always bare for retweeted tweets - advanced_info=False + advanced_info=False, ) - + # Extract quoted status if present - quoted_status_id_str = legacy.get('quoted_status_id_str') + quoted_status_id_str = legacy.get("quoted_status_id_str") if quoted_status_id_str: - tweet_data['quoted_status_id'] = quoted_status_id_str - + tweet_data["quoted_status_id"] = quoted_status_id_str + # Extract replied-to tweet ID if present - in_reply_to_status_id_str = legacy.get('in_reply_to_status_id_str') + in_reply_to_status_id_str = legacy.get("in_reply_to_status_id_str") if in_reply_to_status_id_str: - tweet_data['in_reply_to_status_id'] = in_reply_to_status_id_str - + tweet_data["in_reply_to_status_id"] = in_reply_to_status_id_str + return tweet_data @@ -427,15 +629,17 @@ def download_file(url: str, output_path: str, retry_count: int = 0) -> bool: """ try: os.makedirs(os.path.dirname(output_path), exist_ok=True) - + # Create request with user agent req = urllib.request.Request(url) - req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') - + req.add_header( + "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + ) + with urllib.request.urlopen(req, timeout=30) as response: - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(response.read()) - + return True except Exception as e: if retry_count < 2: @@ -457,52 +661,104 @@ def download_tweet_media(tweet_data: Dict, tweet_id: str, media_dir: str) -> Lis List of local file paths for downloaded media """ media_paths = [] - entities = tweet_data.get('entities', {}) - media_list = entities.get('media', []) - + entities = tweet_data.get("entities", {}) + media_list = entities.get("media", []) + if not media_list: return media_paths - + tweet_media_dir = os.path.join(media_dir, tweet_id) - + for idx, media_item in enumerate(media_list): - media_url = media_item.get('media_url_https') or media_item.get('media_url') + media_url = media_item.get("media_url_https") or media_item.get("media_url") if not media_url: continue - + # Determine file extension - ext = 'jpg' # Default - if 'type' in media_item: - media_type = media_item['type'] - if media_type == 'video': + ext = "jpg" # Default + if "type" in media_item: + media_type = media_item["type"] + if media_type == "video": # Try to get video URL - video_info = media_item.get('video_info', {}) - variants = video_info.get('variants', []) + video_info = media_item.get("video_info", {}) + variants = video_info.get("variants", []) if variants: # Get the highest bitrate variant - best_variant = max(variants, key=lambda v: v.get('bitrate', 0)) - media_url = best_variant.get('url', media_url) - ext = 'mp4' - elif media_type == 'animated_gif': - ext = 'gif' - + best_variant = max(variants, key=lambda v: v.get("bitrate", 0)) + media_url = best_variant.get("url", media_url) + ext = "mp4" + elif media_type == "animated_gif": + ext = "gif" + # Extract extension from URL if possible parsed_url = urllib.parse.urlparse(media_url) path_ext = os.path.splitext(parsed_url.path)[1] if path_ext: - ext = path_ext.lstrip('.') - + ext = path_ext.lstrip(".") + filename = f"media_{idx + 1}.{ext}" output_path = os.path.join(tweet_media_dir, filename) - + if download_file(media_url, output_path): media_paths.append(output_path) # Update tweet data with local path - media_item['local_path'] = os.path.relpath(output_path, os.path.dirname(media_dir)) - + media_item["local_path"] = os.path.relpath( + output_path, os.path.dirname(media_dir) + ) + return media_paths +def download_article_media( + article_data: Dict, tweet_id: str, media_dir: str, output_dir: str +) -> None: + """ + Download images embedded in an article: the cover image and any inline + media blocks in the article body. Sets ``local_path`` in-place on each + media item so the Rust archiver can rewrite paths into the content store. + + Args: + article_data: Article dict produced by extract_article_data() + tweet_id: ID of the wrapper tweet (used as the media subdirectory name) + media_dir: Root media directory (e.g. ``{temp_dir}/media``) + output_dir: Directory where tweet JSON files are written; used to + compute relative paths consistent with the rest of the scraper + """ + article_media_dir = os.path.join(media_dir, tweet_id) + # Paths are stored relative to the parent of media_dir (i.e. temp_dir), + # matching the convention used by download_tweet_media. + rel_base = os.path.dirname(media_dir) + + def _ext_from_url(url: str) -> str: + parsed = urllib.parse.urlparse(url) + ext = os.path.splitext(parsed.path)[1].lstrip(".") + return ext if ext else "jpg" + + # --- Cover image --- + cover = article_data.get("cover_media", {}) + cover_url = cover.get("url", "") + if cover_url and not cover.get("local_path"): + ext = _ext_from_url(cover_url) + output_path = os.path.join(article_media_dir, f"cover.{ext}") + if download_file(cover_url, output_path): + cover["local_path"] = os.path.relpath(output_path, rel_base) + + # --- Inline block images --- + for block in article_data.get("blocks", []): + for entity in block.get("resolved_entities", []): + if entity.get("type") != "media": + continue + url = entity.get("url", "") + if not url or entity.get("local_path"): + continue + media_id = entity.get("media_id", "") + ext = _ext_from_url(url) + filename = f"article_{media_id}.{ext}" if media_id else f"article_img.{ext}" + output_path = os.path.join(article_media_dir, filename) + if download_file(url, output_path): + entity["local_path"] = os.path.relpath(output_path, rel_base) + + def download_avatar(avatar_url: str, author_id: str, avatars_dir: str) -> Optional[str]: """ Download avatar image for an author. @@ -517,31 +773,35 @@ def download_avatar(avatar_url: str, author_id: str, avatars_dir: str) -> Option """ if not avatar_url: return None - + # Determine file extension - ext = 'jpg' # Default + ext = "jpg" # Default parsed_url = urllib.parse.urlparse(avatar_url) path_ext = os.path.splitext(parsed_url.path)[1] if path_ext: - ext = path_ext.lstrip('.') - + ext = path_ext.lstrip(".") + # Remove '_normal' from filename to get higher resolution if available - avatar_url_hq = avatar_url.replace('_normal', '') - + avatar_url_hq = avatar_url.replace("_normal", "") + filename = f"{author_id}.{ext}" output_path = os.path.join(avatars_dir, filename) - + # Try high quality first, fallback to normal if download_file(avatar_url_hq, output_path): return output_path elif download_file(avatar_url, output_path): return output_path - + return None -def fetch_tweet_by_id(scraper: Scraper, tweet_id: str, retry_count: int = 0, - delay_between_requests: float = 2.0) -> Optional[Dict]: +def fetch_tweet_by_id( + scraper: Scraper, + tweet_id: str, + retry_count: int = 0, + delay_between_requests: float = 2.0, +) -> Optional[Dict]: """ Fetch a single tweet by ID with rate limit handling. @@ -560,10 +820,99 @@ def fetch_tweet_by_id(scraper: Scraper, tweet_id: str, retry_count: int = 0, try: response_data = None last_error = None - + + # Method 4: Try using the scraper's session directly to make a GraphQL request + if hasattr(scraper, "session"): + try: + # Use the TweetDetail GraphQL endpoint + # The endpoint hash might vary, but this is a common one + url = "https://twitter.com/i/api/graphql/rU08O-YiXdr0IZfE7qaUMg/TweetDetail" + variables = { + "focalTweetId": tweet_id, + "with_rux_injections": False, + "rankingMode": "Relevance", + "includePromotedContent": True, + "withCommunity": True, + "withQuickPromoteEligibilityTweetFields": True, + "withBirdwatchNotes": True, + "withVoice": True, + } + + features = { + "rweb_video_screen_enabled": False, + "profile_label_improvements_pcf_label_in_post_enabled": True, + "responsive_web_profile_redirect_enabled": False, + "rweb_tipjar_consumption_enabled": False, + "verified_phone_label_enabled": False, + "creator_subscriptions_tweet_preview_api_enabled": True, + "responsive_web_graphql_timeline_navigation_enabled": True, + "responsive_web_graphql_skip_user_profile_image_extensions_enabled": False, + "premium_content_api_read_enabled": False, + "communities_web_enable_tweet_community_results_fetch": True, + "c9s_tweet_anatomy_moderator_badge_enabled": True, + "responsive_web_grok_analyze_button_fetch_trends_enabled": False, + "responsive_web_grok_analyze_post_followups_enabled": True, + "responsive_web_jetfuel_frame": True, + "responsive_web_grok_share_attachment_enabled": True, + "responsive_web_grok_annotations_enabled": True, + "articles_preview_enabled": True, + "responsive_web_edit_tweet_api_enabled": True, + "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True, + "view_counts_everywhere_api_enabled": True, + "longform_notetweets_consumption_enabled": True, + "responsive_web_twitter_article_tweet_consumption_enabled": True, + "content_disclosure_indicator_enabled": True, + "content_disclosure_ai_generated_indicator_enabled": True, + "responsive_web_grok_show_grok_translated_post": False, + "responsive_web_grok_analysis_button_from_backend": True, + "post_ctas_fetch_enabled": True, + "freedom_of_speech_not_reach_fetch_enabled": True, + "standardized_nudges_misinfo": True, + "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True, + "longform_notetweets_rich_text_read_enabled": True, + "longform_notetweets_inline_media_enabled": False, + "responsive_web_grok_image_annotation_enabled": True, + "responsive_web_grok_imagine_annotation_enabled": True, + "responsive_web_grok_community_note_auto_translation_is_enabled": False, + "responsive_web_enhance_cards_enabled": False, + } + + field_toggles = { + "withArticleRichContentState": True, + "withArticlePlainText": True, + "withArticleSummaryText": True, + "withArticleVoiceOver": True, + "withGrokAnalyze": False, + "withDisallowedReplyControls": False, + } + params = { + "variables": json.dumps(variables), + "features": json.dumps(features), + "fieldToggles": json.dumps(field_toggles), + } + response = scraper.session.get(url, params=params) + if response.status_code == 200: + response_data = response.json() + if response_data: + print(f" ✓ Fetched using direct GraphQL request") + else: + error_text = ( + response.text[:200] + if hasattr(response, "text") and response.text + else str(response.status_code) + ) + last_error = Exception( + f"GraphQL request failed with status {response.status_code}: {error_text}" + ) + if retry_count == 0: + print(f" ⚠ Debug: Direct GraphQL request failed: {last_error}") + except Exception as e: + last_error = e + pass + # Try different methods based on what's available in the library # Method 1: Try tweets_details() if available (note: plural "tweets") - if hasattr(scraper, 'tweets_details'): + if response_data is None and hasattr(scraper, "tweets_details"): try: response_data = scraper.tweets_details([tweet_id]) if response_data: @@ -573,164 +922,55 @@ def fetch_tweet_by_id(scraper: Scraper, tweet_id: str, retry_count: int = 0, if retry_count == 0: print(f" ⚠ tweets_details() failed: {e}") pass - - # Method 2: Try tweet() method if available - if response_data is None and hasattr(scraper, 'tweet'): - try: - response_data = scraper.tweet(tweet_id) - if response_data: - print(f" ✓ Fetched using tweet()") - except Exception as e: - last_error = e - pass - - # Method 3: Try using GraphQL API directly - if response_data is None and hasattr(scraper, 'graphql'): - try: - variables = { - "focalTweetId": tweet_id, - "with_rux_injections": False, - "includePromotedContent": False, - "withCommunity": True, - "withQuickPromoteEligibilityTweetFields": True, - "withBirdwatchNotes": True, - "withSuperFollowsUserFields": True, - "withDownvotePerspective": False, - "withReactionsMetadata": False, - "withReactionsPerspective": False, - "withReplays": True, - "withVoice": True, - "withV2Timeline": True - } - features = { - "rweb_tipjar_consumption_enabled": True, - "responsive_web_graphql_exclude_directive_enabled": True, - "verified_phone_label_enabled": False, - "creator_subscriptions_quote_tweet_preview_enabled": True, - "responsive_web_graphql_timeline_navigation_enabled": True, - "responsive_web_graphql_skip_user_profile_image_size_enabled": False, - "communities_web_enable_tweet_community_results_fetch": True, - "c9s_tweet_anatomy_moderator_badge_enabled": True, - "articles_preview_enabled": True, - "responsive_web_edit_tweet_api_enabled": True, - "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True, - "view_counts_everywhere_api_enabled": True, - "longform_notetweets_consumption_enabled": True, - "responsive_web_twitter_article_tweet_consumption_enabled": True, - "tweet_awards_web_tipping_enabled": False, - "freedom_of_speech_not_reach_fetch_enabled": True, - "standardized_nudges_misinfo": True, - "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True, - "longform_notetweets_rich_text_read_enabled": True, - "longform_notetweets_inline_media_enabled": True, - "responsive_web_enhance_cards_enabled": False - } - response_data = scraper.graphql("TweetDetail", variables, features) - if response_data: - print(f" ✓ Fetched using graphql()") - except Exception as e: - last_error = e - # Don't silently pass - log the error for debugging - if retry_count == 0: # Only print on first attempt to avoid spam - print(f" ⚠ Debug: graphql() failed: {e}") - pass - - # Method 4: Try using the scraper's session directly to make a GraphQL request - if response_data is None and hasattr(scraper, 'session'): - try: - # Use the TweetDetail GraphQL endpoint - # The endpoint hash might vary, but this is a common one - url = "https://twitter.com/i/api/graphql/VWx37vRycLNpJY1qH7a6ow/TweetDetail" - variables = { - "focalTweetId": tweet_id, - "with_rux_injections": False, - "includePromotedContent": False, - "withCommunity": True, - "withQuickPromoteEligibilityTweetFields": True, - "withBirdwatchNotes": True, - "withSuperFollowsUserFields": True, - "withDownvotePerspective": False, - "withReactionsMetadata": False, - "withReactionsPerspective": False, - "withReplays": True, - "withVoice": True, - "withV2Timeline": True - } - features = { - "rweb_tipjar_consumption_enabled": True, - "responsive_web_graphql_exclude_directive_enabled": True, - "verified_phone_label_enabled": False, - "creator_subscriptions_quote_tweet_preview_enabled": True, - "responsive_web_graphql_timeline_navigation_enabled": True, - "responsive_web_graphql_skip_user_profile_image_size_enabled": False, - "communities_web_enable_tweet_community_results_fetch": True, - "c9s_tweet_anatomy_moderator_badge_enabled": True, - "articles_preview_enabled": True, - "responsive_web_edit_tweet_api_enabled": True, - "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True, - "view_counts_everywhere_api_enabled": True, - "longform_notetweets_consumption_enabled": True, - "responsive_web_twitter_article_tweet_consumption_enabled": True, - "tweet_awards_web_tipping_enabled": False, - "freedom_of_speech_not_reach_fetch_enabled": True, - "standardized_nudges_misinfo": True, - "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True, - "longform_notetweets_rich_text_read_enabled": True, - "longform_notetweets_inline_media_enabled": True, - "responsive_web_enhance_cards_enabled": False - } - params = { - "variables": json.dumps(variables), - "features": json.dumps(features) - } - response = scraper.session.get(url, params=params) - if response.status_code == 200: - response_data = response.json() - if response_data: - print(f" ✓ Fetched using direct GraphQL request") - else: - error_text = response.text[:200] if hasattr(response, 'text') and response.text else str(response.status_code) - last_error = Exception(f"GraphQL request failed with status {response.status_code}: {error_text}") - if retry_count == 0: - print(f" ⚠ Debug: Direct GraphQL request failed: {last_error}") - except Exception as e: - last_error = e - pass - + if response_data is None: # Debug: print available methods - available_methods = [m for m in dir(scraper) if not m.startswith('_') and callable(getattr(scraper, m, None))] - print(f" ⚠ Debug: Available scraper methods: {', '.join(available_methods[:10])}...") + available_methods = [ + m + for m in dir(scraper) + if not m.startswith("_") and callable(getattr(scraper, m, None)) + ] + print( + f" ⚠ Debug: Available scraper methods: {', '.join(available_methods[:10])}..." + ) if last_error: print(f" ⚠ Debug: Last error: {last_error}") error_msg = f"Could not fetch tweet {tweet_id} using any available method. " - error_msg += f"Tried: tweets_details, tweet, graphql, direct GraphQL request. " + error_msg += ( + f"Tried: tweets_details, tweet, graphql, direct GraphQL request. " + ) if last_error: error_msg += f"Last error: {last_error}" raise Exception(error_msg) - + # Extract tweet from response tweet_result = extract_tweet_from_response(response_data, tweet_id) - + if tweet_result: return tweet_result else: # Debug: print response structure - print(f" ⚠ Debug: Response structure keys: {list(response_data.keys()) if isinstance(response_data, dict) else 'Not a dict'}") + print( + f" ⚠ Debug: Response structure keys: {list(response_data.keys()) if isinstance(response_data, dict) else 'Not a dict'}" + ) if isinstance(response_data, list) and len(response_data) > 0: - print(f" ⚠ Debug: Response is list, first item keys: {list(response_data[0].keys()) if isinstance(response_data[0], dict) else 'Not a dict'}") + print( + f" ⚠ Debug: Response is list, first item keys: {list(response_data[0].keys()) if isinstance(response_data[0], dict) else 'Not a dict'}" + ) print(f" ⚠ Warning: Tweet {tweet_id} not found in response") return None except Exception as e: error_msg = str(e) - + # Check if it's a rate limit error if is_rate_limit_error(e): wait_time = handle_rate_limit_error(e, retry_count) time.sleep(wait_time) if retry_count < 5: # Max 5 retries for rate limits - return fetch_tweet_by_id(scraper, tweet_id, retry_count + 1, delay_between_requests) + return fetch_tweet_by_id( + scraper, tweet_id, retry_count + 1, delay_between_requests + ) else: print(f" ❌ Max retries reached for tweet {tweet_id}") return None @@ -738,7 +978,9 @@ def fetch_tweet_by_id(scraper: Scraper, tweet_id: str, retry_count: int = 0, # For other errors, retry once if retry_count < 1: time.sleep(delay_between_requests * 3) - return fetch_tweet_by_id(scraper, tweet_id, retry_count + 1, delay_between_requests) + return fetch_tweet_by_id( + scraper, tweet_id, retry_count + 1, delay_between_requests + ) else: print(f" ⚠ Warning: Error fetching tweet {tweet_id}: {error_msg}") return None @@ -755,24 +997,24 @@ def extract_related_tweet_ids(tweet_data: Dict) -> List[str]: List of related tweet IDs """ related_ids = [] - + # Check for quoted status - quoted_status_id = tweet_data.get('quoted_status_id') + quoted_status_id = tweet_data.get("quoted_status_id") if quoted_status_id: related_ids.append(quoted_status_id) - + # Check for retweeted status - retweeted_status = tweet_data.get('retweeted_status') + retweeted_status = tweet_data.get("retweeted_status") if retweeted_status: - retweet_id = retweeted_status.get('id') + retweet_id = retweeted_status.get("id") if retweet_id: related_ids.append(retweet_id) - + # Check for replied-to status - in_reply_to_status_id = tweet_data.get('in_reply_to_status_id') + in_reply_to_status_id = tweet_data.get("in_reply_to_status_id") if in_reply_to_status_id: related_ids.append(in_reply_to_status_id) - + return related_ids @@ -796,7 +1038,7 @@ def scrape_tweets_recursive( download_replied_to_tweets_media: bool, max_replied_to_tweets_recursion_depth: int, delay_between_requests: float, - replied_to_depth: int = 0 + replied_to_depth: int = 0, ) -> None: """ Recursively scrape tweets (quoted, retweeted, replied-to). @@ -805,7 +1047,7 @@ def scrape_tweets_recursive( scraper: Scraper instance tweet_id: Tweet ID to scrape scraped_tweets: Dictionary of already scraped tweets - output_dir: Output directory for TOML files + output_dir: Output directory for JSON files media_dir: Media directory avatars_dir: Avatars directory depth: Current recursion depth @@ -826,132 +1068,148 @@ def scrape_tweets_recursive( # Skip if already scraped if tweet_id in scraped_tweets: return - + # Check depth limits if depth >= max_depth: return - + if replied_to_depth >= max_replied_to_tweets_recursion_depth: return - + # Fetch tweet print(f" {' ' * depth}→ Fetching tweet {tweet_id}...") - tweet_result = fetch_tweet_by_id(scraper, tweet_id, delay_between_requests=delay_between_requests) - + tweet_result = fetch_tweet_by_id( + scraper, tweet_id, delay_between_requests=delay_between_requests + ) + if not tweet_result: - print(f" {' ' * depth}⚠ Warning: Could not fetch tweet {tweet_id} (deleted or private?)") + print( + f" {' ' * depth}⚠ Warning: Could not fetch tweet {tweet_id} (deleted or private?)" + ) return - + # Extract tweet data - is_replied_to_tweet = (replied_to_depth > 0) + is_replied_to_tweet = replied_to_depth > 0 current_bare_scrape = bare_scrape and not is_replied_to_tweet current_advanced_info = advanced_info and not is_replied_to_tweet - - tweet_data = extract_tweet_data(tweet_result, bare_scrape=current_bare_scrape, - advanced_info=current_advanced_info) - + + tweet_data = extract_tweet_data( + tweet_result, + bare_scrape=current_bare_scrape, + advanced_info=current_advanced_info, + ) + # Download avatar if enabled if download_avatars and not is_replied_to_tweet: - author_id = tweet_data.get('author', {}).get('id') - avatar_url = tweet_data.get('author', {}).get('avatar_url', '') + author_id = tweet_data.get("author", {}).get("id") + avatar_url = tweet_data.get("author", {}).get("avatar_url", "") if author_id and avatar_url: avatar_path = download_avatar(avatar_url, author_id, avatars_dir) if avatar_path: - tweet_data['author']['avatar_local_path'] = os.path.relpath( + tweet_data["author"]["avatar_local_path"] = os.path.relpath( avatar_path, output_dir ) - + # Download media if enabled should_download_media = download_media and not is_replied_to_tweet if not should_download_media and is_replied_to_tweet: should_download_media = download_replied_to_tweets_media - + if should_download_media: download_tweet_media(tweet_data, tweet_id, media_dir) - - # Save tweet to TOML file - toml_file = os.path.join(output_dir, f"tweet-{tweet_id}.toml") + if tweet_data.get("is_article") and tweet_data.get("article"): + download_article_media(tweet_data["article"], tweet_id, media_dir, output_dir) + + # Save tweet to JSON file + json_file = os.path.join(output_dir, f"tweet-{tweet_id}.json") try: - if TOML_LIB == 'tomlkit': - # tomlkit: parse empty string to get document, then update it - doc = tomlkit.parse('') - # Convert dict to tomlkit document recursively - def dict_to_tomlkit(d, doc_obj): - for key, value in d.items(): - if isinstance(value, dict): - doc_obj[key] = dict_to_tomlkit(value, tomlkit.table()) - elif isinstance(value, list): - arr = tomlkit.array() - for item in value: - if isinstance(item, dict): - arr.append(dict_to_tomlkit(item, tomlkit.table())) - else: - arr.append(item) - doc_obj[key] = arr - else: - doc_obj[key] = value - return doc_obj - - doc = dict_to_tomlkit(tweet_data, doc) - with open(toml_file, 'w') as f: - f.write(tomlkit.dumps(doc)) - else: - # tomli_w uses binary mode - with open(toml_file, 'wb') as f: - tomlkit.dump(tweet_data, f) + with open(json_file, "w") as f: + json.dump(tweet_data, f, indent=2) except Exception as e: - print(f" {' ' * depth}⚠ Warning: Failed to save TOML file for tweet {tweet_id}: {e}") + print( + f" {' ' * depth}⚠ Warning: Failed to save JSON file for tweet {tweet_id}: {e}" + ) return - + # Mark as scraped scraped_tweets[tweet_id] = tweet_data - + # Rate limiting if delay_between_requests > 0: time.sleep(delay_between_requests) - + # Recursively scrape related tweets if recursive and depth < max_depth - 1: related_ids = extract_related_tweet_ids(tweet_data) - + for related_id in related_ids: if related_id not in scraped_tweets: scrape_tweets_recursive( - scraper, related_id, scraped_tweets, output_dir, media_dir, - avatars_dir, depth + 1, max_depth, bare_scrape, advanced_info, - download_media, download_avatars, recursive, - scrape_replied_to_tweet, recursive_replied_to_tweets, + scraper, + related_id, + scraped_tweets, + output_dir, + media_dir, + avatars_dir, + depth + 1, + max_depth, + bare_scrape, + advanced_info, + download_media, + download_avatars, + recursive, + scrape_replied_to_tweet, + recursive_replied_to_tweets, recursive_replied_to_tweets_quotes_retweets, - download_replied_to_tweets_media, max_replied_to_tweets_recursion_depth, - delay_between_requests, replied_to_depth + download_replied_to_tweets_media, + max_replied_to_tweets_recursion_depth, + delay_between_requests, + replied_to_depth, ) - + # Handle replied-to tweets if scrape_replied_to_tweet or recursive_replied_to_tweets: - in_reply_to_status_id = tweet_data.get('in_reply_to_status_id') + in_reply_to_status_id = tweet_data.get("in_reply_to_status_id") if in_reply_to_status_id and in_reply_to_status_id not in scraped_tweets: - new_replied_to_depth = replied_to_depth + 1 if recursive_replied_to_tweets else replied_to_depth - + new_replied_to_depth = ( + replied_to_depth + 1 + if recursive_replied_to_tweets + else replied_to_depth + ) + # Determine if we should recursively scrape quotes/retweets of replied-to tweets should_recurse_quotes_retweets = ( - recursive_replied_to_tweets_quotes_retweets and - new_replied_to_depth < max_replied_to_tweets_recursion_depth + recursive_replied_to_tweets_quotes_retweets + and new_replied_to_depth < max_replied_to_tweets_recursion_depth ) - + scrape_tweets_recursive( - scraper, in_reply_to_status_id, scraped_tweets, output_dir, media_dir, - avatars_dir, depth, max_depth, bare_scrape, advanced_info, - download_media, download_avatars, should_recurse_quotes_retweets, - scrape_replied_to_tweet, recursive_replied_to_tweets, + scraper, + in_reply_to_status_id, + scraped_tweets, + output_dir, + media_dir, + avatars_dir, + depth, + max_depth, + bare_scrape, + advanced_info, + download_media, + download_avatars, + should_recurse_quotes_retweets, + scrape_replied_to_tweet, + recursive_replied_to_tweets, recursive_replied_to_tweets_quotes_retweets, - download_replied_to_tweets_media, max_replied_to_tweets_recursion_depth, - delay_between_requests, new_replied_to_depth + download_replied_to_tweets_media, + max_replied_to_tweets_recursion_depth, + delay_between_requests, + new_replied_to_depth, ) def load_scraped_tweets(output_dir: str) -> Dict[str, Dict]: """ - Load already scraped tweets from TOML files (for resume capability). + Load already scraped tweets from JSON files (for resume capability). Args: output_dir: Output directory @@ -960,185 +1218,187 @@ def load_scraped_tweets(output_dir: str) -> Dict[str, Dict]: Dictionary mapping tweet IDs to tweet data """ scraped_tweets = {} - + if not os.path.exists(output_dir): return scraped_tweets - + for filename in os.listdir(output_dir): - if filename.startswith('tweet-') and filename.endswith('.toml'): - tweet_id = filename[6:-5] # Remove 'tweet-' prefix and '.toml' suffix - scraped_tweets[tweet_id] = {'id': tweet_id} # Mark as scraped - + if filename.startswith("tweet-") and filename.endswith(".json"): + tweet_id = filename[6:-5] # Remove 'tweet-' prefix and '.json' suffix + scraped_tweets[tweet_id] = {"id": tweet_id} # Mark as scraped + return scraped_tweets def main(): """Main function.""" parser = argparse.ArgumentParser( - description='Extract tweet contents from Tweet IDs and save as TOML files.' + description="Extract tweet contents from Tweet IDs and save as JSON files." ) - + # Tweet ID inputs parser.add_argument( - '--tweet-ids', + "--tweet-ids", type=str, - help='Comma-separated Tweet IDs, e.g. "12345,67890,13579"' + help='Comma-separated Tweet IDs, e.g. "12345,67890,13579"', ) parser.add_argument( - '--tweet-ids-file', + "--tweet-ids-file", type=str, - help='Path(s) to file(s) containing Tweet IDs (comma-separated), ' - 'e.g. "path/to/tweet_ids.txt,path/to/second/file.json"' + help="Path(s) to file(s) containing Tweet IDs (comma-separated), " + 'e.g. "path/to/tweet_ids.txt,path/to/second/file.json"', ) - + # Output directories parser.add_argument( - '--output-dir', + "--output-dir", type=str, - default='scraped-tweets', - help='Directory to save tweet TOML files (default: scraped-tweets)' + default="scraped-tweets", + help="Directory to save tweet JSON files (default: scraped-tweets)", ) parser.add_argument( - '--media-dir', + "--media-dir", type=str, - help='Directory to save media files (default: /media)' + help="Directory to save media files (default: /media)", ) - + # Media and avatar downloads parser.add_argument( - '--download-media', - action='store_true', - help='Download media files (images, videos, GIFs) attached to tweets' + "--download-media", + action="store_true", + help="Download media files (images, videos, GIFs) attached to tweets", ) avatar_group = parser.add_mutually_exclusive_group() avatar_group.add_argument( - '--download-avatars', - action='store_true', + "--download-avatars", + action="store_true", default=True, - help='Download avatars of tweet authors (default: True)' + help="Download avatars of tweet authors (default: True)", ) avatar_group.add_argument( - '--no-download-avatars', - dest='download_avatars', - action='store_false', - help='Do not download avatars' + "--no-download-avatars", + dest="download_avatars", + action="store_false", + help="Do not download avatars", ) - + # Recursion settings recursion_group = parser.add_mutually_exclusive_group() recursion_group.add_argument( - '--recursive', - action='store_true', + "--recursive", + action="store_true", default=True, - help='Recursively extract quoted or retweeted tweets (default: True)' + help="Recursively extract quoted or retweeted tweets (default: True)", ) recursion_group.add_argument( - '--no-recursive', - dest='recursive', - action='store_false', - help='Do not recursively extract quoted or retweeted tweets' + "--no-recursive", + dest="recursive", + action="store_false", + help="Do not recursively extract quoted or retweeted tweets", ) parser.add_argument( - '--max-recursion-depth', + "--max-recursion-depth", type=int, default=10, - help='Maximum recursion depth for quoted/retweeted tweets (default: 10)' + help="Maximum recursion depth for quoted/retweeted tweets (default: 10)", ) - + # Replied-to tweet settings parser.add_argument( - '--scrape-replied-to-tweet', - action='store_true', - help='Also extract the tweet that the author replied to' + "--scrape-replied-to-tweet", + action="store_true", + help="Also extract the tweet that the author replied to", ) parser.add_argument( - '--recursive-replied-to-tweets', - action='store_true', - help='Recursively extract replied-to tweets' + "--recursive-replied-to-tweets", + action="store_true", + help="Recursively extract replied-to tweets", ) parser.add_argument( - '--recursive-replied-to-tweets-quotes-retweets', - action='store_true', - help='Recursively extract quoted or retweeted tweets of replied-to tweets' + "--recursive-replied-to-tweets-quotes-retweets", + action="store_true", + help="Recursively extract quoted or retweeted tweets of replied-to tweets", ) parser.add_argument( - '--download-replied-to-tweets-media', - action='store_true', - help='Download media for replied-to tweets as well' + "--download-replied-to-tweets-media", + action="store_true", + help="Download media for replied-to tweets as well", ) parser.add_argument( - '--max-replied-to-tweets-recursion-depth', + "--max-replied-to-tweets-recursion-depth", type=int, default=5, - help='Maximum depth for replied-to tweets recursion (default: 5)' + help="Maximum depth for replied-to tweets recursion (default: 5)", ) - + # Scraping modes parser.add_argument( - '--advanced-info', - action='store_true', - help='Extract additional optional information about tweets' + "--advanced-info", + action="store_true", + help="Extract additional optional information about tweets", ) parser.add_argument( - '--bare-scrape', - action='store_true', - help='Only extract bare minimum information about tweets' + "--bare-scrape", + action="store_true", + help="Only extract bare minimum information about tweets", ) - + # Rate limiting parser.add_argument( - '--delay-between-requests', + "--delay-between-requests", type=float, default=2.0, - help='Delay in seconds between requests (default: 2.0)' + help="Delay in seconds between requests (default: 2.0)", ) - + # Credentials parser.add_argument( - '--credentials-file', + "--credentials-file", type=str, - help='Path to credentials file (default: creds.txt in current directory)' + help="Path to credentials file (default: creds.txt in current directory)", ) parser.add_argument( - '--credentials-string', + "--credentials-string", type=str, - help='Credentials string directly (cannot be used with --credentials-file)' + help="Credentials string directly (cannot be used with --credentials-file)", ) - + args = parser.parse_args() - + # Validate arguments if not args.tweet_ids and not args.tweet_ids_file: parser.error("Either --tweet-ids or --tweet-ids-file must be provided") - + if args.bare_scrape and args.advanced_info: parser.error("--bare-scrape and --advanced-info are mutually exclusive") - + if args.credentials_file and args.credentials_string: - parser.error("--credentials-file and --credentials-string cannot be specified at the same time") - + parser.error( + "--credentials-file and --credentials-string cannot be specified at the same time" + ) + # Parse tweet IDs print("Parsing tweet IDs...") tweet_ids = parse_tweet_ids_from_args(args.tweet_ids, args.tweet_ids_file) - + if not tweet_ids: print("❌ No tweet IDs found. Exiting.") return - + print(f"✓ Found {len(tweet_ids)} unique tweet ID(s)") - + # Set up directories output_dir = os.path.abspath(args.output_dir) os.makedirs(output_dir, exist_ok=True) - + if args.media_dir: media_dir = os.path.abspath(args.media_dir) else: - media_dir = os.path.join(output_dir, 'media') - - avatars_dir = os.path.join(media_dir, 'avatars') + media_dir = os.path.join(output_dir, "media") + + avatars_dir = os.path.join(media_dir, "avatars") os.makedirs(avatars_dir, exist_ok=True) - + # Load cookies if args.credentials_string: # Use credentials string directly @@ -1149,144 +1409,133 @@ def main(): if not os.path.exists(creds_file): print(f"❌ Error: Credentials file not found: {creds_file}") return - with open(creds_file, 'r') as f: + with open(creds_file, "r") as f: cookie_str = f.read().strip() else: # Default: look for creds.txt in current directory - creds_file = os.path.join(os.getcwd(), 'creds.txt') + creds_file = os.path.join(os.getcwd(), "creds.txt") if not os.path.exists(creds_file): - print(f"❌ Error: creds.txt not found in current directory ({os.getcwd()}). " - f"Please create it with your Twitter cookies, or use --credentials-file or --credentials-string.") + print( + f"❌ Error: creds.txt not found in current directory ({os.getcwd()}). " + f"Please create it with your Twitter cookies, or use --credentials-file or --credentials-string." + ) return - with open(creds_file, 'r') as f: + with open(creds_file, "r") as f: cookie_str = f.read().strip() - + # Parse cookie string into dictionary cookie_dict = dict(item.split("=", 1) for item in cookie_str.split(";")) - + # Initialize scraper scraper = Scraper(cookies=cookie_dict, save=False) - + # Load already scraped tweets (for resume) scraped_tweets = load_scraped_tweets(output_dir) initial_count = len(scraped_tweets) - + if initial_count > 0: print(f"✓ Found {initial_count} already scraped tweet(s), resuming...") - + # Filter out already scraped tweets remaining_tweet_ids = [tid for tid in tweet_ids if tid not in scraped_tweets] - + if not remaining_tweet_ids: print("✓ All tweets already scraped!") return - + print(f"→ Scraping {len(remaining_tweet_ids)} new tweet(s)...") print("-" * 80) - + # Track statistics stats = { - 'total_requested': len(tweet_ids), - 'already_scraped': initial_count, - 'newly_scraped': 0, - 'failed': 0, - 'start_time': datetime.now() + "total_requested": len(tweet_ids), + "already_scraped": initial_count, + "newly_scraped": 0, + "failed": 0, + "start_time": datetime.now(), } - + # Scrape tweets for idx, tweet_id in enumerate(remaining_tweet_ids, 1): print(f"\n[{idx}/{len(remaining_tweet_ids)}] Processing tweet {tweet_id}...") - + try: scrape_tweets_recursive( - scraper, tweet_id, scraped_tweets, output_dir, media_dir, avatars_dir, - depth=0, max_depth=args.max_recursion_depth, - bare_scrape=args.bare_scrape, advanced_info=args.advanced_info, - download_media=args.download_media, download_avatars=args.download_avatars, + scraper, + tweet_id, + scraped_tweets, + output_dir, + media_dir, + avatars_dir, + depth=0, + max_depth=args.max_recursion_depth, + bare_scrape=args.bare_scrape, + advanced_info=args.advanced_info, + download_media=args.download_media, + download_avatars=args.download_avatars, recursive=args.recursive, scrape_replied_to_tweet=args.scrape_replied_to_tweet, recursive_replied_to_tweets=args.recursive_replied_to_tweets, recursive_replied_to_tweets_quotes_retweets=args.recursive_replied_to_tweets_quotes_retweets, download_replied_to_tweets_media=args.download_replied_to_tweets_media, max_replied_to_tweets_recursion_depth=args.max_replied_to_tweets_recursion_depth, - delay_between_requests=args.delay_between_requests + delay_between_requests=args.delay_between_requests, ) - stats['newly_scraped'] += 1 + stats["newly_scraped"] += 1 except Exception as e: print(f" ❌ Error processing tweet {tweet_id}: {e}") - stats['failed'] += 1 - + stats["failed"] += 1 + # Calculate final statistics - stats['end_time'] = datetime.now() - stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds() - stats['total_scraped'] = len(scraped_tweets) - + stats["end_time"] = datetime.now() + stats["duration"] = (stats["end_time"] - stats["start_time"]).total_seconds() + stats["total_scraped"] = len(scraped_tweets) + # Save summary summary = { - 'scraping_summary': { - 'total_requested': stats['total_requested'], - 'already_scraped': stats['already_scraped'], - 'newly_scraped': stats['newly_scraped'], - 'failed': stats['failed'], - 'total_scraped': stats['total_scraped'], - 'start_time': stats['start_time'].isoformat(), - 'end_time': stats['end_time'].isoformat(), - 'duration_seconds': stats['duration'], - 'output_directory': output_dir, - 'media_directory': media_dir, - 'settings': { - 'recursive': args.recursive, - 'max_recursion_depth': args.max_recursion_depth, - 'bare_scrape': args.bare_scrape, - 'advanced_info': args.advanced_info, - 'download_media': args.download_media, - 'download_avatars': args.download_avatars, - 'scrape_replied_to_tweet': args.scrape_replied_to_tweet, - 'recursive_replied_to_tweets': args.recursive_replied_to_tweets, - 'max_replied_to_tweets_recursion_depth': args.max_replied_to_tweets_recursion_depth - } + "scraping_summary": { + "total_requested": stats["total_requested"], + "already_scraped": stats["already_scraped"], + "newly_scraped": stats["newly_scraped"], + "failed": stats["failed"], + "total_scraped": stats["total_scraped"], + "start_time": stats["start_time"].isoformat(), + "end_time": stats["end_time"].isoformat(), + "duration_seconds": stats["duration"], + "output_directory": output_dir, + "media_directory": media_dir, + "settings": { + "recursive": args.recursive, + "max_recursion_depth": args.max_recursion_depth, + "bare_scrape": args.bare_scrape, + "advanced_info": args.advanced_info, + "download_media": args.download_media, + "download_avatars": args.download_avatars, + "scrape_replied_to_tweet": args.scrape_replied_to_tweet, + "recursive_replied_to_tweets": args.recursive_replied_to_tweets, + "max_replied_to_tweets_recursion_depth": args.max_replied_to_tweets_recursion_depth, + }, } } - - summary_file = os.path.join(output_dir, 'scraping_summary.toml') - if TOML_LIB == 'tomlkit': - # Convert to tomlkit document - doc = tomlkit.parse('') - def dict_to_tomlkit(d, doc_obj): - for key, value in d.items(): - if isinstance(value, dict): - doc_obj[key] = dict_to_tomlkit(value, tomlkit.table()) - elif isinstance(value, list): - arr = tomlkit.array() - for item in value: - if isinstance(item, dict): - arr.append(dict_to_tomlkit(item, tomlkit.table())) - else: - arr.append(item) - doc_obj[key] = arr - else: - doc_obj[key] = value - return doc_obj - - doc = dict_to_tomlkit(summary, doc) - with open(summary_file, 'w') as f: - f.write(tomlkit.dumps(doc)) - else: - with open(summary_file, 'wb') as f: - tomlkit.dump(summary, f) - + + summary_file = os.path.join(output_dir, "scraping_summary.json") + with open(summary_file, "w") as f: + json.dump(summary, f, indent=2) + # Print final summary - print(f"\n{'='*80}") + print(f"\n{'=' * 80}") print("Scraping complete!") print(f" Total requested: {stats['total_requested']}") print(f" Already scraped: {stats['already_scraped']}") print(f" Newly scraped: {stats['newly_scraped']}") print(f" Failed: {stats['failed']}") print(f" Total scraped: {stats['total_scraped']}") - print(f" Duration: {stats['duration']:.1f}s ({stats['duration']/60:.1f} minutes)") + print( + f" Duration: {stats['duration']:.1f}s ({stats['duration'] / 60:.1f} minutes)" + ) print(f" Output directory: {output_dir}") print(f" Summary saved to: {summary_file}") - print(f"{'='*80}\n") + print(f"{'=' * 80}\n") if __name__ == "__main__":