From c09db5533a34ab86dd86613aef6f224469da28f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=97=E8=BE=9E?= Date: Thu, 11 Sep 2025 11:50:04 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20core?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/AccountServices.py | 246 ++++++++++++++++++++++++++++++++++++++++ core/CallServerAPIs.py | 92 +++++++++++++++ core/PyWebPageAPI.py | 78 +++++++++++++ core/WebApp.py | 29 +++++ core/WebViewWIndow.py | 7 ++ 5 files changed, 452 insertions(+) create mode 100644 core/AccountServices.py create mode 100644 core/CallServerAPIs.py create mode 100644 core/PyWebPageAPI.py create mode 100644 core/WebApp.py create mode 100644 core/WebViewWIndow.py diff --git a/core/AccountServices.py b/core/AccountServices.py new file mode 100644 index 0000000..76324d9 --- /dev/null +++ b/core/AccountServices.py @@ -0,0 +1,246 @@ +import requests +import json +import uuid +import platform +import hashlib + +class AuthClient: + def __init__(self, base_url): + self.base_url = base_url + self.stage = 'find-account' + self.error = None + self.account_identifier = '' + self.device_id = '' + self.challenge = None + self.factors = [] + self.selected_factor_id = None + self.password = '' + self.user_store = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' # 您可以自定义此User-Agent字符串 + + def generate_device_id(self): + """生成设备指纹 + :return: 设备指纹ID(字符串) + """ + platform_info = platform.system() + platform.release() + platform.machine() # 修正了这里的拼接错误 + unique_id = uuid.getnode() + device_info = f"{platform_info}-{unique_id}" + # 使用SHA-256哈希生成设备ID + self.device_id = hashlib.sha256(device_info.encode()).hexdigest() + return self.device_id + + def check_account_validity(self, account_identifier): + """检查账户有效性 + :param account_identifier: 用户的账户标识符(邮箱或用户名) + :return: 挑战信息(字典) + """ + self.account_identifier = account_identifier + self.error = None + if not self.account_identifier: + self.error = 'Please enter your email or username.' + return None + try: + response = requests.post( + f"{self.base_url}/auth/challenge", + headers={'Content-Type': 'application/json', 'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent + data=json.dumps({ + 'platform': 1, + 'account': self.account_identifier, + 'device_id': self.generate_device_id(), + }) + ) + if not response.ok: + raise Exception(response.text or 'Account not found.') + self.challenge = response.json() + self.stage = 'select-factor' + return self.challenge + except Exception as e: + self.error = str(e) + return None + + def get_authentication_factors(self): + """获取可用的认证因素 + :return: 可用的认证因素列表(列表) + """ + self.error = None + if not self.challenge: + self.error = 'No challenge information available.' + return None + try: + response = requests.get( + f"{self.base_url}/auth/challenge/{self.challenge['id']}/factors", + headers={'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"} # 添加自定义User-Agent + ) + if not response.ok: + raise Exception('Could not fetch authentication factors.') + available_factors = response.json() + self.factors = [factor for factor in available_factors if factor['id'] not in self.challenge.get('blacklist_factors', [])] + if len(self.factors) == 0: + self.error = 'No available authentication factors.' + return None + else: + self.stage = 'select-factor' + return self.factors + except Exception as e: + self.error = str(e) + return None + + def request_verification_code(self, selected_factor_id): + """请求验证码 + :param selected_factor_id: 用户选择的认证因素ID(字符串类型) + :return: None + """ + self.selected_factor_id = selected_factor_id + self.error = None + if not self.selected_factor_id: + self.error = 'No authentication factor selected.' + return None + selected_factor = self.get_selected_factor() + if not selected_factor: + self.error = 'Selected factor not found.' + return None + hint = {'contact': selected_factor['contact']} + try: + response = requests.post( + f"{self.base_url}/auth/challenge/{self.challenge['id']}/factors/{self.selected_factor_id}", + headers={'Content-Type': 'application/json', 'User-Agent': self.user_store, 'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent + data=json.dumps(hint) + ) + if not response.ok: + raise Exception(response.text or 'Failed to send code.') + self.stage = 'enter-code' + except Exception as e: + self.error = str(e) + self.stage = 'select-factor' + return None + + def get_selected_factor(self): + """获取选择的认证因素 + :return: 选择的认证因素(字典)或 None + """ + if not self.selected_factor_id: + return None + return next((factor for factor in self.factors if factor['id'] == self.selected_factor_id), None) + + def verify_factor(self, selected_factor_id, password): + """验证所选的认证因素 + :param selected_factor_id: 用户选择的认证因素ID(字符串类型) + :param password: 用户输入的密码或验证码(字符串类型) + :return: None + """ + self.selected_factor_id = selected_factor_id + self.password = password + self.error = None + if not self.selected_factor_id or not self.password: + self.error = 'Please enter your password/code.' + return None + try: + response = requests.patch( + f"{self.base_url}/auth/challenge/{self.challenge['id']}", + headers={'Content-Type': 'application/json', 'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent + data=json.dumps({ + 'factor_id': self.selected_factor_id, + 'password': self.password, + }) + ) + if not response.ok: + raise Exception(response.text or 'Verification failed.') + self.challenge = response.json() + self.password = '' + if self.challenge['step_remain'] == 0: + self.stage = 'token-exchange' + else: + self.stage = 'select-factor' + except Exception as e: + self.error = str(e) + self.stage = 'select-factor' + + def exchange_token(self): + """交换令牌以完成登录 + :return: None + """ + self.error = None + if not self.challenge: + self.error = 'No challenge information available.' + return None + try: + response = requests.post( + f"{self.base_url}/auth/token", + headers={'Content-Type': 'application/json', 'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent + data=json.dumps({ + 'grant_type': 'authorization_code', + 'code': self.challenge['id'], + }) + ) + if not response.ok: + raise Exception(response.text or 'Token exchange failed.') + token_info = response.json() + token = token_info['token'] + #self.user_store.fetchUser() + redirect_uri = 'redirect_uri_from_query' # 这里需要根据实际情况获取 + if redirect_uri: + print(f"Redirecting to: {redirect_uri}") + else: + print("Navigating to home page.") + return token + except Exception as e: + self.error = str(e) + self.stage = 'select-factor' + + def get_factor_name(self, factor_type): + """根据认证因素类型返回相应的名称 + :param factor_type: 认证因素类型(整数) + :return: 认证因素名称(字符串) + """ + factor_names = { + 0: 'Password', + 1: 'Email', + 2: 'Authenticator App', + } + return factor_names.get(factor_type, 'Unknown Factor') + +# 示例调用 +if __name__ == "__main__": + auth_client = AuthClient(base_url='https://api.solian.app/id') + account_identifier = ''#你的账号 + selected_factor_id = "enter-code" # 用户需要选择一个因素 + password = ''#你的密码 + # 第一步:检查账户有效性 + challenge = auth_client.check_account_validity(account_identifier) + if auth_client.error: + print(f"Error in check account validity: {auth_client.error}") + elif challenge: + print(f"Challenge information: {challenge}") + # 第二步:获取可用的认证因素 + factors = auth_client.get_authentication_factors() + if auth_client.error: + print(f"Error in get authentication factors: {auth_client.error}") + elif factors: + print(f"Available factors: {factors}") + # 假设用户选择第一个因素 + selected_factor_id = factors[0]['id'] + print(f"Selected factor ID: {selected_factor_id}") + # 第三步:请求验证码(如果需要) + if factors[0]['type'] == 1: + auth_client.request_verification_code(selected_factor_id) + if auth_client.error: + print(f"Error in request verification code: {auth_client.error}") + elif auth_client.stage == 'enter-code': + print("Verification code requested. Please enter the code.") + # 第四步:验证所选的认证因素 + auth_client.verify_factor(selected_factor_id, password) + if auth_client.error: + print(f"Error in verify factor: {auth_client.error}") + elif auth_client.stage == 'token-exchange': + print("Factor verified. Ready to exchange token.") + # 第五步:交换令牌以完成登录 + if auth_client.stage == 'token-exchange': + token = auth_client.exchange_token() + if auth_client.error: + print(f"Error in exchange token: {auth_client.error}") + else: + print("Login successful!") + print(f"Token: {token}") + else: + print("Login failed.") + else: + print("No available authentication factors.") diff --git a/core/CallServerAPIs.py b/core/CallServerAPIs.py new file mode 100644 index 0000000..f48bd07 --- /dev/null +++ b/core/CallServerAPIs.py @@ -0,0 +1,92 @@ +import requests +from requests.exceptions import RequestException +import json + +DOMAIN = "https://solian.app/api" + +def _make_request(method: str, url: str, headers: dict, params: dict = None, data: dict = None) -> dict: + """内部辅助函数,用于发送HTTP请求并处理响应""" + try: + if method == 'GET': + response = requests.get(url, headers=headers, params=params) + elif method == 'POST': + response = requests.post(url, headers=headers, data=data) + elif method == 'DELETE': + response = requests.delete(url, headers=headers, params=params) + elif method == 'PATCH': + response = requests.patch(url, headers=headers, data=data) + else: + return {"error": "Unsupported HTTP method"} + + if response.status_code != 200: + return {"error": response.status_code} + + return response.json() + + except json.JSONDecodeError: + return {"error": response.text} + except RequestException as e: + return {"error": str(e)} + +def ActivityAPIs(cursor: str = '', filter: str = '', take: int = 20, debuginclude: str = '', Authorization: str = '') -> dict: + """获取首页内容""" + url = f"{DOMAIN}/activities" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"cursor": cursor, "filter": filter, "take": take, "debuginclude": debuginclude} + return _make_request('GET', url, headers, params) + +def ChatSummary(Authorization: str) -> dict: + """获取聊天摘要""" + url = f"{DOMAIN}/chat/summary" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetChatMessageAllInfo(Authorization: str, roomid: str, offset: int = 0, take: int = 20) -> dict: + """获取聊天消息""" + url = f"{DOMAIN}/chat/{roomid}/message" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params) + +def GetChatMessageBaseInfo(Authorization: str, roomid: str) -> dict: + """获取聊天消息基础信息""" + url = f"{DOMAIN}/chat/{roomid}/message/base" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers) + +def GetAMessageAllInfo(Authorization: str, roomid: str, messageid: str) -> dict: + """获取具体聊天消息所有信息""" + url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetAMessageInfo(Authorization: str, roomid: str, messageid: str) -> dict: + """获取聊天消息基础信息。暂时作为保留""" + url = f"{DOMAIN}/chat/{roomid}/message/{messageid}/info" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers) + +def DelMessage(Authorization: str, roomid: str, messageid: str) -> dict: + """删除聊天消息""" + url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers) + +def GetChatRoomInfo(Authorization: str, id: str) -> dict: + """获取聊天房间信息""" + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def ModifyChatRoomInfo(Authorization: str, id: str, data: dict) -> dict: + """修改聊天房间信息""" + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, data=data) + +def DeleteChatRoom(Authorization: str, id: str) -> dict: + """删除聊天房间""" + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers) + diff --git a/core/PyWebPageAPI.py b/core/PyWebPageAPI.py new file mode 100644 index 0000000..39f4ba5 --- /dev/null +++ b/core/PyWebPageAPI.py @@ -0,0 +1,78 @@ +import os +import sys +import importlib +import platform +import ProjectCfg +def EnvironmentCheck() -> bool: + """ + 检查环境是否符合要求 + :return: + """ + if sys.version_info < ProjectCfg.PYTHON_VERSION_MIN and ProjectCfg.PYTHON_VERSION_MIN!=(0,0): + return False + if sys.version_info > ProjectCfg.PYTHON_VERSION_MAX and ProjectCfg.PYTHON_VERSION_MAX!=(0,0): + return False + for lib in ProjectCfg.LIB_LIST: + if importlib.util.find_spec(lib) is None: + return False + if platform.system() not in ProjectCfg.SUPPORTED_PLATFORMS and ProjectCfg.SUPPORTED_PLATFORMS!=['All']: + return False + return True + +def CheckWebviewInstalled() -> tuple[bool,str]: + """ + 检查WebView框架是否已安装 + :return: + """ + system_platform : str = platform.system() + + if system_platform == "Windows": + # 检查WebView2 + webview2_path : str = r'C:\Program Files (x86)\Microsoft\EdgeWebView2' + webview2_dll : str = r'\WebView2Loader.dll' + for root, dirs, files in os.walk(webview2_path): + if webview2_dll in files: + return (True,'') + return (False,"No installed WebView2") + + elif system_platform == "Darwin": + # 检查WebKit框架 + webkit_path = '/System/Library/Frameworks/WebKit.framework' + if os.path.exists(webkit_path): + return (True,'') + return (False,"Unsupported WKWebView") + + elif system_platform == "Linux": + # 检查WebKitGTK + webkitgtk_paths = [ + '/usr/lib/libwebkit2gtk-4.0.so', + '/usr/lib/x86_64-linux-gnu/libwebkit2gtk-4.0.so' + ] + for path in webkitgtk_paths: + if os.path.exists(path): + return (True,'') + + # 检查QtWebEngine + qtwebengine_paths = [ + '/usr/lib/qt5/bin/qtwebengine_process', + '/usr/lib/x86_64-linux-gnu/qt5/bin/qtwebengine_process' + ] + for path in qtwebengine_paths: + if os.path.exists(path): + return "QtWebEngine已安装" + + return (False,"Not Found Webview framework.") + + else: + return (False,"Unsupported OS.") + +def CheckPortAvailable(port : int) -> bool: + """ + 检查端口是否可用 + :param port: + :return: + """ + import socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(('localhost', port)) != 0 + return False \ No newline at end of file diff --git a/core/WebApp.py b/core/WebApp.py new file mode 100644 index 0000000..5928a2e --- /dev/null +++ b/core/WebApp.py @@ -0,0 +1,29 @@ +import flask + +app = flask.Flask(__name__, template_folder='../webfile', static_folder='../webfile/static') + +@app.errorhandler(500) +def internal_server_error(error): + # 返回500错误的HTML页面 + return flask.render_template("/error/500.html",error=error,static_url_path='/static') + +@app.errorhandler(404) +def not_found(error): + # 返回404错误的HTML页面 + return flask.render_template("/error/404.html",error=error,static_url_path='/static'),404 + +@app.errorhandler(403) +def forbidden(error): + # 返回403错误的HTML页面 + return flask.render_template("/error/403.html",error=error,static_url_path='/static'),403 + +@app.route('/') +def index(): + return flask.render_template('index.html') + +def AppStart(host: str, port: int): + """启动Flask应用""" + app.run(host=host, port=port, debug=True, use_reloader=False) + +if __name__ == '__main__': + app.run(host="127.0.0.1", port=5000, debug=True) diff --git a/core/WebViewWIndow.py b/core/WebViewWIndow.py new file mode 100644 index 0000000..369ec27 --- /dev/null +++ b/core/WebViewWIndow.py @@ -0,0 +1,7 @@ +import webview + +def WebViewWIndow(url: str,title: str,width: int,height: int):#创建Webview窗口 + webview.create_window(title=title, url=url, width=width, height=height) + webview.settings['ALLOW_DOWNLOADS'] = True + webview.start(icon="./assets/img/icon.png") + exit(0) \ No newline at end of file