439 lines
16 KiB
Python
439 lines
16 KiB
Python
import argparse
|
|
import time
|
|
import os
|
|
from twitter.account import Account
|
|
|
|
def is_rate_limit_error(error):
|
|
"""
|
|
Check if an error is a rate limit error (429 Too Many Requests).
|
|
|
|
Args:
|
|
error: Exception object or error message
|
|
|
|
Returns:
|
|
True if it's a rate limit error, False otherwise
|
|
"""
|
|
error_str = str(error).lower()
|
|
# Check for common rate limit indicators
|
|
rate_limit_indicators = [
|
|
'429',
|
|
'too many requests',
|
|
'rate limit',
|
|
'rate_limit',
|
|
'exceeded',
|
|
'quota',
|
|
'limit exceeded'
|
|
]
|
|
return any(indicator in error_str for indicator in rate_limit_indicators)
|
|
|
|
|
|
def handle_rate_limit_error(error, retry_count, base_wait_time=60):
|
|
"""
|
|
Handle rate limit errors with exponential backoff.
|
|
|
|
Args:
|
|
error: The exception that occurred
|
|
retry_count: Number of times we've retried
|
|
base_wait_time: Base wait time in seconds (default 60s = 1 minute)
|
|
|
|
Returns:
|
|
Wait time in seconds before retrying
|
|
"""
|
|
# Exponential backoff: 1min, 2min, 4min, 8min, etc.
|
|
wait_time = base_wait_time * (2 ** retry_count)
|
|
# Cap at 15 minutes (900 seconds)
|
|
wait_time = min(wait_time, 900)
|
|
|
|
print(f"\n ⚠ Rate limit detected (attempt {retry_count + 1})")
|
|
print(f" ⏳ Waiting {wait_time}s ({wait_time/60:.1f} minutes) before retry...")
|
|
|
|
return wait_time
|
|
|
|
|
|
def extract_bookmark_entries_from_response(response_data):
|
|
"""
|
|
Extract bookmark entries (tweet IDs and user info) from the response.
|
|
|
|
Args:
|
|
response_data: The response data from account.bookmarks()
|
|
|
|
Returns:
|
|
List of tuples: [(tweet_id, username), ...]
|
|
"""
|
|
bookmark_entries = []
|
|
seen_ids = set()
|
|
|
|
def add_entry(tweet_id, username):
|
|
tid = str(tweet_id).strip()
|
|
if not tid or tid in seen_ids:
|
|
return
|
|
seen_ids.add(tid)
|
|
bookmark_entries.append((tid, username))
|
|
|
|
try:
|
|
# First, check if response is a simple list of tweet IDs or tweet objects.
|
|
payloads = []
|
|
if isinstance(response_data, list):
|
|
# Check if it's a list of simple values (tweet IDs)
|
|
if len(response_data) > 0 and isinstance(response_data[0], (str, int)):
|
|
# Simple list of tweet IDs
|
|
for tid in response_data:
|
|
add_entry(tid, None)
|
|
return bookmark_entries
|
|
# Check if it's a list of tweet objects
|
|
elif len(response_data) > 0 and isinstance(response_data[0], dict):
|
|
# If it has 'id' or 'id_str' field, it might be a simple tweet object
|
|
if 'id' in response_data[0] or 'id_str' in response_data[0]:
|
|
for item in response_data:
|
|
tweet_id = item.get('id_str') or str(item.get('id', ''))
|
|
username = item.get('user', {}).get('screen_name') if 'user' in item else None
|
|
if tweet_id:
|
|
add_entry(tweet_id, username)
|
|
return bookmark_entries
|
|
|
|
# Otherwise, treat as paginated GraphQL response structure.
|
|
payloads = [item for item in response_data if isinstance(item, dict)]
|
|
elif isinstance(response_data, dict):
|
|
payloads = [response_data]
|
|
else:
|
|
return bookmark_entries
|
|
|
|
for data in payloads:
|
|
# Navigate through the nested GraphQL structure (similar to tweets structure).
|
|
timeline = data.get('data', {}).get('bookmark_timeline_v2', {}).get('timeline', {})
|
|
if not timeline:
|
|
# Try alternative path.
|
|
timeline = data.get('data', {}).get('user', {}).get('result', {}).get('timeline_v2', {}).get('timeline', {})
|
|
|
|
instructions = timeline.get('instructions', [])
|
|
|
|
for instruction in instructions:
|
|
if instruction.get('type') == 'TimelineAddEntries':
|
|
entries = instruction.get('entries', [])
|
|
for entry in entries:
|
|
content = entry.get('content', {})
|
|
# Extract bookmark entries
|
|
if content.get('entryType') == 'TimelineTimelineItem':
|
|
item_content = content.get('itemContent', {})
|
|
if item_content.get('itemType') == 'TimelineTweet':
|
|
tweet_result = item_content.get('tweet_results', {}).get('result', {})
|
|
# Get rest_id (the tweet ID)
|
|
tweet_id = tweet_result.get('rest_id')
|
|
|
|
# Get username from tweet result
|
|
username = None
|
|
# Try to get username from user info in tweet
|
|
user_info = tweet_result.get('core', {}).get('user_results', {}).get('result', {})
|
|
if user_info:
|
|
legacy_user = user_info.get('legacy', {})
|
|
if legacy_user:
|
|
username = legacy_user.get('screen_name')
|
|
|
|
if tweet_id:
|
|
add_entry(tweet_id, username)
|
|
|
|
return bookmark_entries
|
|
except Exception as e:
|
|
print(f" ⚠ Warning: Error extracting bookmark entries: {e}")
|
|
return bookmark_entries
|
|
|
|
|
|
def extract_all_bookmarks(account, delay_between_requests=2.0):
|
|
"""
|
|
Extract all bookmarks from the account with proper rate limit handling.
|
|
Account.bookmarks() returns all bookmarks in a single call.
|
|
|
|
Args:
|
|
account: Account instance from twitter.account
|
|
delay_between_requests: Delay in seconds between requests (not used for single call, but kept for consistency)
|
|
|
|
Returns:
|
|
List of tuples: [(tweet_id, username), ...] (newest first)
|
|
"""
|
|
all_bookmarks = []
|
|
retry_count = 0
|
|
|
|
print("Starting to extract bookmarks...")
|
|
print("-" * 50)
|
|
|
|
try:
|
|
print("Fetching bookmarks...", end=" ")
|
|
|
|
# Fetch all bookmarks (single call, no pagination needed)
|
|
try:
|
|
response_data = account.bookmarks()
|
|
retry_count = 0
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
print(f"\n ❌ Error fetching bookmarks: {error_msg}")
|
|
|
|
# Check if it's a rate limit error
|
|
if is_rate_limit_error(e):
|
|
wait_time = handle_rate_limit_error(e, retry_count)
|
|
time.sleep(wait_time)
|
|
retry_count += 1
|
|
# Retry the request
|
|
try:
|
|
response_data = account.bookmarks()
|
|
retry_count = 0
|
|
except Exception as retry_error:
|
|
print(f" ❌ Failed after retry: {retry_error}")
|
|
raise
|
|
else:
|
|
# For non-rate-limit errors, wait a bit and retry once
|
|
if retry_count < 2:
|
|
wait_time = delay_between_requests * 3
|
|
print(f" ⏳ Waiting {wait_time}s before retry...")
|
|
time.sleep(wait_time)
|
|
retry_count += 1
|
|
try:
|
|
response_data = account.bookmarks()
|
|
retry_count = 0
|
|
except Exception as retry_error:
|
|
print(f" ❌ Failed after retry: {retry_error}")
|
|
raise
|
|
else:
|
|
print(f" ❌ Max retries reached. Stopping.")
|
|
raise
|
|
|
|
# Extract bookmark entries from response
|
|
all_bookmarks = extract_bookmark_entries_from_response(response_data)
|
|
|
|
if all_bookmarks:
|
|
print(f"✓ Retrieved {len(all_bookmarks)} bookmarks")
|
|
else:
|
|
print("⚠ No bookmarks found")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠ Extraction interrupted by user")
|
|
except Exception as e:
|
|
print(f"\n\n❌ Error occurred: {str(e)}")
|
|
raise
|
|
|
|
print(f"\n{'='*80}")
|
|
print(f"Bookmark extraction complete!")
|
|
print(f" Total bookmarks found: {len(all_bookmarks)}")
|
|
print(f"{'='*80}\n")
|
|
|
|
return all_bookmarks
|
|
|
|
|
|
def save_bookmarks_and_unbookmark(
|
|
account,
|
|
bookmarks,
|
|
output_file="bookmarks.txt",
|
|
delay_between_requests=2.0,
|
|
write_mode="a",
|
|
):
|
|
"""
|
|
Save bookmark URLs to file (newest first) and unbookmark each one.
|
|
|
|
Args:
|
|
account: Account instance from twitter.account
|
|
bookmarks: List of tuples [(tweet_id, username), ...]
|
|
output_file: Output file path
|
|
delay_between_requests: Delay in seconds between unbookmark requests
|
|
"""
|
|
print(f"\nSaving bookmarks to {output_file} and unbookmarking...")
|
|
print("-" * 50)
|
|
|
|
# Read existing content if file exists
|
|
existing_content = ""
|
|
if os.path.exists(output_file):
|
|
with open(output_file, "r") as f:
|
|
existing_content = f.read()
|
|
|
|
# Choose whether to prepend or append.
|
|
if write_mode not in ['ask', 'p', 'a']:
|
|
raise ValueError("write_mode must be one of: ask, p, a")
|
|
|
|
if write_mode == "ask":
|
|
while True:
|
|
choice = input("Prepend (p) or append (a) new bookmarks? [p/a] (default a): ").strip().lower()
|
|
if choice == "":
|
|
choice = "a"
|
|
if choice in ['p', 'a']:
|
|
break
|
|
print(" ⚠ Invalid choice. Please enter 'p' for prepend or 'a' for append.")
|
|
else:
|
|
choice = write_mode
|
|
|
|
prepend = (choice == 'p')
|
|
|
|
# Collect new bookmark URLs (newest first)
|
|
new_bookmark_urls = []
|
|
unbookmark_count = 0
|
|
retry_count = 0
|
|
|
|
# Process bookmarks (they should already be in order, newest first)
|
|
for tweet_id, username in bookmarks:
|
|
# Construct URL
|
|
if username:
|
|
url = f"https://twitter.com/{username}/status/{tweet_id}"
|
|
else:
|
|
# Fallback if username not available
|
|
url = f"https://twitter.com/i/web/status/{tweet_id}"
|
|
|
|
# Add to new bookmarks list
|
|
new_bookmark_urls.append(url)
|
|
|
|
# Unbookmark the tweet
|
|
try:
|
|
account.unbookmark(tweet_id)
|
|
unbookmark_count += 1
|
|
retry_count = 0 # Reset retry count on success
|
|
|
|
if unbookmark_count % 10 == 0:
|
|
print(f" ✓ Processed {unbookmark_count}/{len(bookmarks)} bookmarks...")
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
print(f"\n ⚠ Error unbookmarking tweet {tweet_id}: {error_msg}")
|
|
|
|
# Check if it's a rate limit error
|
|
if is_rate_limit_error(e):
|
|
wait_time = handle_rate_limit_error(e, retry_count)
|
|
time.sleep(wait_time)
|
|
retry_count += 1
|
|
# Retry the unbookmark
|
|
try:
|
|
account.unbookmark(tweet_id)
|
|
unbookmark_count += 1
|
|
retry_count = 0
|
|
except Exception as retry_error:
|
|
print(f" ❌ Failed to unbookmark {tweet_id} after retry: {retry_error}")
|
|
else:
|
|
# For other errors, just log and continue
|
|
if retry_count < 2:
|
|
wait_time = delay_between_requests * 3
|
|
print(f" ⏳ Waiting {wait_time}s before retry...")
|
|
time.sleep(wait_time)
|
|
retry_count += 1
|
|
try:
|
|
account.unbookmark(tweet_id)
|
|
unbookmark_count += 1
|
|
retry_count = 0
|
|
except Exception as retry_error:
|
|
print(f" ❌ Failed to unbookmark {tweet_id} after retry: {retry_error}")
|
|
else:
|
|
print(f" ❌ Skipping unbookmark for {tweet_id} after max retries")
|
|
|
|
# Rate limiting: wait before next unbookmark request
|
|
if delay_between_requests > 0:
|
|
time.sleep(delay_between_requests)
|
|
|
|
# Write bookmarks based on user's choice
|
|
with open(output_file, "w") as f:
|
|
if prepend:
|
|
# Write new bookmarks first (prepended), then existing content
|
|
for url in new_bookmark_urls:
|
|
f.write(f"{url}\n")
|
|
if existing_content:
|
|
f.write(existing_content)
|
|
else:
|
|
# Write existing content first, then new bookmarks (appended)
|
|
if existing_content:
|
|
f.write(existing_content)
|
|
for url in new_bookmark_urls:
|
|
f.write(f"{url}\n")
|
|
|
|
print(f"\n{'='*80}")
|
|
print(f"Processing complete!")
|
|
print(f" Total bookmarks saved: {len(bookmarks)}")
|
|
print(f" Total unbookmarked: {unbookmark_count}")
|
|
print(f" Output file: {output_file}")
|
|
print(f"{'='*80}\n")
|
|
return {
|
|
"saved_count": len(bookmarks),
|
|
"unbookmarked_count": unbookmark_count,
|
|
}
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Extract and unbookmark X/Twitter bookmarks.")
|
|
parser.add_argument("--output-file", default="bookmarks.txt", help="Path to output bookmarks file.")
|
|
parser.add_argument(
|
|
"--delay-between-requests",
|
|
type=float,
|
|
default=2.0,
|
|
help="Seconds to wait between unbookmark requests.",
|
|
)
|
|
parser.add_argument(
|
|
"--mode",
|
|
choices=["a", "p", "ask"],
|
|
default="a",
|
|
help="Write mode for bookmark file: append (a), prepend (p), or ask interactively.",
|
|
)
|
|
parser.add_argument(
|
|
"--single-run",
|
|
action="store_true",
|
|
help="Run one extraction pass only.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-runs",
|
|
type=int,
|
|
default=100,
|
|
help="Maximum number of extraction runs when syncing until empty.",
|
|
)
|
|
parser.add_argument(
|
|
"--delay-between-runs",
|
|
type=float,
|
|
default=1.0,
|
|
help="Seconds to wait between extraction runs.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
|
|
# Load cookies
|
|
with open("creds.txt", "r") as file:
|
|
cookie_str = file.read().strip()
|
|
cookie_dict = dict(item.split("=", 1) for item in cookie_str.split(";"))
|
|
|
|
# Initialize account
|
|
account = Account(cookies=cookie_dict)
|
|
|
|
# Configuration
|
|
delay_between_requests = args.delay_between_requests
|
|
output_file = args.output_file
|
|
|
|
total_saved = 0
|
|
total_unbookmarked = 0
|
|
runs = 0
|
|
|
|
while runs < args.max_runs:
|
|
runs += 1
|
|
print(f"\nRun {runs}: fetching bookmarks...")
|
|
bookmarks = extract_all_bookmarks(account, delay_between_requests=delay_between_requests)
|
|
|
|
if not bookmarks:
|
|
print("\nNo bookmarks found.")
|
|
break
|
|
|
|
# Save bookmarks to file and unbookmark them.
|
|
stats = save_bookmarks_and_unbookmark(
|
|
account,
|
|
bookmarks,
|
|
output_file=output_file,
|
|
delay_between_requests=delay_between_requests,
|
|
write_mode=args.mode,
|
|
)
|
|
total_saved += stats["saved_count"]
|
|
total_unbookmarked += stats["unbookmarked_count"]
|
|
print(f"\nSuccessfully processed {len(bookmarks)} bookmarks in run {runs}")
|
|
|
|
if args.single_run:
|
|
break
|
|
if stats["unbookmarked_count"] == 0:
|
|
print("No bookmarks were unbookmarked in this run; stopping to avoid an infinite loop.")
|
|
break
|
|
if runs < args.max_runs and args.delay_between_runs > 0:
|
|
time.sleep(args.delay_between_runs)
|
|
|
|
if runs >= args.max_runs:
|
|
print(f"\nReached max runs ({args.max_runs}) before bookmarks were fully exhausted.")
|
|
|
|
print(f"\nDone. Total saved: {total_saved}, total unbookmarked: {total_unbookmarked}")
|