1
Fork 0
mirror of https://github.com/thegeneralist01/twitter-openapi synced 2026-01-11 15:40:26 +01:00

update wip

Signed-off-by: ふぁ <yuki@yuki0311.com>
This commit is contained in:
ふぁ 2025-05-01 01:28:12 +09:00
parent 988d50fd60
commit de523dc052
No known key found for this signature in database
GPG key ID: 83A8A5E74872A8AA
4 changed files with 237 additions and 44 deletions

View file

@ -13,7 +13,10 @@ from pathlib import Path
from typing import Any
import openapi_client as pt
import requests
import urllib3
from x_client_transaction import ClientTransaction
from x_client_transaction.utils import handle_x_migration
warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")
@ -58,13 +61,17 @@ def find_name(x):
def get_kwargs(key, additional):
kwargs = {"path_query_id": placeholder[key]["queryId"]}
kwargs = {"path_query_id": placeholder[key]["queryId"], "_headers": {}}
if placeholder[key].get("variables") is not None:
kwargs["variables"] = json.dumps(placeholder[key]["variables"] | additional)
if placeholder[key].get("features") is not None:
kwargs["features"] = json.dumps(placeholder[key]["features"])
if placeholder[key].get("fieldToggles") is not None:
kwargs["field_toggles"] = json.dumps(placeholder[key]["fieldToggles"])
if placeholder[key].get("@path") is not None:
kwargs["_headers"]["x-client-transaction-id"] = ct.generate_transaction_id(
method=placeholder[key]["@method"], path=placeholder[key]["@path"]
)
return kwargs
@ -267,6 +274,12 @@ if __name__ == "__main__":
api_conf.access_token = access_token
api_client = pt.ApiClient(configuration=api_conf, cookie=cookies_str)
api_client.user_agent = get_header(latest_user_agent, "chrome-fetch")["user-agent"]
session = requests.Session()
session.headers = get_header(latest_user_agent, "chrome")
ct = ClientTransaction(handle_x_migration(session))
error_count = 0
for x in [pt.DefaultApi, pt.TweetApi, pt.UserApi, pt.UsersApi, pt.UserListApi]:
@ -354,7 +367,10 @@ if __name__ == "__main__":
try:
logger.info("Try: Self UserTweets Test")
kwargs = get_kwargs("UserTweets", {"userId": id})
res = pt.TweetApi(api_client).get_user_tweets_with_http_info(**kwargs)
headers = get_header()
res = pt.TweetApi(api_client).get_user_tweets_with_http_info(
_headers=headers, **kwargs
)
data = res.data.to_dict()
rate = match_rate(
@ -388,7 +404,7 @@ if __name__ == "__main__":
"1875050002046726519",
"1848219562136801480",
"1881993128288399684",
"1899104692577489182"
"1899104692577489182",
]
for id in ids:
try:

View file

@ -3,65 +3,77 @@ import re
from urllib.parse import urlencode, urlparse
import openapi_client as pt
import requests
import urllib3
from x_client_transaction import ClientTransaction
from x_client_transaction.utils import handle_x_migration
def get_kwargs(key, additional):
kwargs = {"path_query_id": placeholder[key]["queryId"]}
kwargs = {"path_query_id": placeholder[key]["queryId"], "_headers": {}}
if placeholder[key].get("variables") is not None:
kwargs["variables"] = json.dumps(placeholder[key]["variables"] | additional)
if placeholder[key].get("features") is not None:
kwargs["features"] = json.dumps(placeholder[key]["features"])
if placeholder[key].get("fieldToggles") is not None:
kwargs["field_toggles"] = json.dumps(placeholder[key]["fieldToggles"])
if placeholder[key].get("@path") is not None:
kwargs["_headers"]["x-client-transaction-id"] = ct.generate_transaction_id(
method=placeholder[key]["@method"], path=placeholder[key]["@path"]
)
return kwargs
class SessionManager:
def __init__(self) -> None:
header = "https://raw.githubusercontent.com/fa0311/latest-user-agent/refs/heads/main/header.json"
self.http = urllib3.PoolManager()
self.chorome_header = json.loads(self.http.request("GET", header).data)
def child(self):
return SessionManagerChild(self.http,self.chorome_header)
return SessionManagerChild(self.http, self.chorome_header)
class SessionManagerChild:
def __init__(self, http, chorome_header) -> None:
self.http = http
self.chorome_header = chorome_header
self.session = {}
def cookie_normalize(self, cookie: list[str]) -> dict[str, str]:
value = {x.split("; ")[0].split("=")[0]: x.split("; ")[0].split("=")[1] for x in cookie}
value = {
x.split("; ")[0].split("=")[0]: x.split("; ")[0].split("=")[1]
for x in cookie
}
return {key: value[key] for key in value if len(value[key]) > 0}
def cookie_to_str(self, cookie: dict[str, str]) -> str:
return "; ".join([f"{key}={value}" for key, value in cookie.items()])
def getHader(self, additionals={}) -> dict[str, str]:
ignore = ["host", "connection"]
base = {key: value for key, value in self.chorome_header["chrome"].items() if key not in ignore}
base = {
key: value
for key, value in self.chorome_header["chrome"].items()
if key not in ignore
}
return base | {"cookie": self.cookie_to_str(self.session)} | additionals
def update_normalize(self, cookie: list[str]):
self.update(self.cookie_normalize(cookie))
def update(self, cookie: dict[str, str]):
self.session.update(cookie)
def pop(self, key: str):
self.session.pop(key)
def get(self, key: str):
return self.session.get(key)
def to_str(self):
return self.cookie_to_str(self.session)
def get_guest_token():
twitter_url = "https://x.com/elonmusk"
@ -69,9 +81,7 @@ def get_guest_token():
chrome = SessionManager()
x = chrome.child()
twitter = chrome.child()
def regex(str: str, **kwargs) -> str:
return str.format(
quote=r"[\'\"]",
@ -79,21 +89,30 @@ def get_guest_token():
dot=r"\.",
any=r".*?",
target=r"([\s\S]*?)",
**kwargs
**kwargs,
)
def redirect(method: str, url: str, body: str = None, headers: dict[str, str] = {}) -> urllib3.HTTPResponse:
def redirect(
method: str, url: str, body: str = None, headers: dict[str, str] = {}
) -> urllib3.HTTPResponse:
for _ in range(10):
if urlparse(url).netloc == "x.com":
res = http.request(method, url, headers=x.getHader(headers), body=body, redirect=False)
res = http.request(
method, url, headers=x.getHader(headers), body=body, redirect=False
)
x.update_normalize(res.headers._container["set-cookie"][1:])
elif urlparse(url).netloc == "twitter.com":
res = http.request(method, url, headers=twitter.getHader(headers), body=body, redirect=False)
res = http.request(
method,
url,
headers=twitter.getHader(headers),
body=body,
redirect=False,
)
twitter.update_normalize(res.headers._container["set-cookie"][1:])
else:
raise Exception("Invalid domain")
method = "GET"
body = None
headers = {}
@ -109,7 +128,7 @@ def get_guest_token():
url = f"{domain}{new_path}"
else:
url = new_path
elif re.findall(regex(location), res.data.decode()):
url = re.findall(regex(location), res.data.decode())[0]
elif re.findall(regex(submit), res.data.decode()):
@ -118,7 +137,7 @@ def get_guest_token():
input_html = re.findall(regex(input), form_html[0][1])
method = "POST"
url = form_html[0][0]
body = urlencode({k:v for k,v in input_html})
body = urlencode({k: v for k, v in input_html})
headers = {"content-type": "application/x-www-form-urlencoded"}
elif res.status == 200:
return res
@ -126,21 +145,19 @@ def get_guest_token():
raise Exception("Failed to redirect")
else:
raise Exception("Failed to redirect")
res = redirect("GET", twitter_url)
reg = "document{dot}cookie{space}={space}{quote}{target}{quote}"
if re.findall(regex(reg), res.data.decode()):
if re.findall(regex(reg), res.data.decode()):
find = re.findall(regex(reg), res.data.decode())
x.update_normalize(find)
if x.get("gt") is None:
raise Exception("Failed to get guest token")
return x
if __name__ == "__main__":
cookies = get_guest_token()
cookies_str = cookies.to_str()
@ -167,15 +184,33 @@ if __name__ == "__main__":
api_client = pt.ApiClient(configuration=api_conf, cookie=cookies_str)
api_client.user_agent = latest_user_agent["chrome-fetch"]
res = pt.TweetApi(api_client).get_user_tweets_with_http_info(
**get_kwargs("UserTweets", {}),
).model_dump_json()
res = pt.TweetApi(api_client).get_user_highlights_tweets_with_http_info(
**get_kwargs("UserHighlightsTweets", {}),
).model_dump_json()
res = pt.DefaultApi(api_client).get_tweet_result_by_rest_id_with_http_info(
**get_kwargs("TweetResultByRestId", {}),
).model_dump_json()
res = pt.UserApi(api_client).get_user_by_screen_name_with_http_info(
**get_kwargs("UserByScreenName", {})
).model_dump_json()
session = requests.Session()
session.headers = latest_user_agent["chrome"]
ct = ClientTransaction(handle_x_migration(session))
res = (
pt.TweetApi(api_client)
.get_user_tweets_with_http_info(
**get_kwargs("UserTweets", {}),
)
.model_dump_json()
)
res = (
pt.TweetApi(api_client)
.get_user_highlights_tweets_with_http_info(
**get_kwargs("UserHighlightsTweets", {}),
)
.model_dump_json()
)
res = (
pt.DefaultApi(api_client)
.get_tweet_result_by_rest_id_with_http_info(
**get_kwargs("TweetResultByRestId", {}),
)
.model_dump_json()
)
res = (
pt.UserApi(api_client)
.get_user_by_screen_name_with_http_info(**get_kwargs("UserByScreenName", {}))
.model_dump_json()
)