上传文件至 /

This commit is contained in:
2025-09-11 11:48:02 +00:00
parent be5d9e132a
commit 4662e001ed
5 changed files with 498 additions and 0 deletions

246
AccountServices.py Normal file
View File

@@ -0,0 +1,246 @@
import requests
import json
import uuid
import platform
import hashlib
class AuthClient:
def __init__(self, base_url):
self.base_url = base_url
self.stage = 'find-account'
self.error = None
self.account_identifier = ''
self.device_id = ''
self.challenge = None
self.factors = []
self.selected_factor_id = None
self.password = ''
self.user_store = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' # 您可以自定义此User-Agent字符串
def generate_device_id(self):
"""生成设备指纹
:return: 设备指纹ID字符串
"""
platform_info = platform.system() + platform.release() + platform.machine() # 修正了这里的拼接错误
unique_id = uuid.getnode()
device_info = f"{platform_info}-{unique_id}"
# 使用SHA-256哈希生成设备ID
self.device_id = hashlib.sha256(device_info.encode()).hexdigest()
return self.device_id
def check_account_validity(self, account_identifier):
"""检查账户有效性
:param account_identifier: 用户的账户标识符(邮箱或用户名)
:return: 挑战信息(字典)
"""
self.account_identifier = account_identifier
self.error = None
if not self.account_identifier:
self.error = 'Please enter your email or username.'
return None
try:
response = requests.post(
f"{self.base_url}/auth/challenge",
headers={'Content-Type': 'application/json', 'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent
data=json.dumps({
'platform': 1,
'account': self.account_identifier,
'device_id': self.generate_device_id(),
})
)
if not response.ok:
raise Exception(response.text or 'Account not found.')
self.challenge = response.json()
self.stage = 'select-factor'
return self.challenge
except Exception as e:
self.error = str(e)
return None
def get_authentication_factors(self):
"""获取可用的认证因素
:return: 可用的认证因素列表(列表)
"""
self.error = None
if not self.challenge:
self.error = 'No challenge information available.'
return None
try:
response = requests.get(
f"{self.base_url}/auth/challenge/{self.challenge['id']}/factors",
headers={'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"} # 添加自定义User-Agent
)
if not response.ok:
raise Exception('Could not fetch authentication factors.')
available_factors = response.json()
self.factors = [factor for factor in available_factors if factor['id'] not in self.challenge.get('blacklist_factors', [])]
if len(self.factors) == 0:
self.error = 'No available authentication factors.'
return None
else:
self.stage = 'select-factor'
return self.factors
except Exception as e:
self.error = str(e)
return None
def request_verification_code(self, selected_factor_id):
"""请求验证码
:param selected_factor_id: 用户选择的认证因素ID字符串类型
:return: None
"""
self.selected_factor_id = selected_factor_id
self.error = None
if not self.selected_factor_id:
self.error = 'No authentication factor selected.'
return None
selected_factor = self.get_selected_factor()
if not selected_factor:
self.error = 'Selected factor not found.'
return None
hint = {'contact': selected_factor['contact']}
try:
response = requests.post(
f"{self.base_url}/auth/challenge/{self.challenge['id']}/factors/{self.selected_factor_id}",
headers={'Content-Type': 'application/json', 'User-Agent': self.user_store, 'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent
data=json.dumps(hint)
)
if not response.ok:
raise Exception(response.text or 'Failed to send code.')
self.stage = 'enter-code'
except Exception as e:
self.error = str(e)
self.stage = 'select-factor'
return None
def get_selected_factor(self):
"""获取选择的认证因素
:return: 选择的认证因素(字典)或 None
"""
if not self.selected_factor_id:
return None
return next((factor for factor in self.factors if factor['id'] == self.selected_factor_id), None)
def verify_factor(self, selected_factor_id, password):
"""验证所选的认证因素
:param selected_factor_id: 用户选择的认证因素ID字符串类型
:param password: 用户输入的密码或验证码(字符串类型)
:return: None
"""
self.selected_factor_id = selected_factor_id
self.password = password
self.error = None
if not self.selected_factor_id or not self.password:
self.error = 'Please enter your password/code.'
return None
try:
response = requests.patch(
f"{self.base_url}/auth/challenge/{self.challenge['id']}",
headers={'Content-Type': 'application/json', 'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent
data=json.dumps({
'factor_id': self.selected_factor_id,
'password': self.password,
})
)
if not response.ok:
raise Exception(response.text or 'Verification failed.')
self.challenge = response.json()
self.password = ''
if self.challenge['step_remain'] == 0:
self.stage = 'token-exchange'
else:
self.stage = 'select-factor'
except Exception as e:
self.error = str(e)
self.stage = 'select-factor'
def exchange_token(self):
"""交换令牌以完成登录
:return: None
"""
self.error = None
if not self.challenge:
self.error = 'No challenge information available.'
return None
try:
response = requests.post(
f"{self.base_url}/auth/token",
headers={'Content-Type': 'application/json', 'User-Agent': self.user_store,'device_name': "A Python login app.By NRFF & Deepseek"}, # 添加自定义User-Agent
data=json.dumps({
'grant_type': 'authorization_code',
'code': self.challenge['id'],
})
)
if not response.ok:
raise Exception(response.text or 'Token exchange failed.')
token_info = response.json()
token = token_info['token']
#self.user_store.fetchUser()
redirect_uri = 'redirect_uri_from_query' # 这里需要根据实际情况获取
if redirect_uri:
print(f"Redirecting to: {redirect_uri}")
else:
print("Navigating to home page.")
return token
except Exception as e:
self.error = str(e)
self.stage = 'select-factor'
def get_factor_name(self, factor_type):
"""根据认证因素类型返回相应的名称
:param factor_type: 认证因素类型(整数)
:return: 认证因素名称(字符串)
"""
factor_names = {
0: 'Password',
1: 'Email',
2: 'Authenticator App',
}
return factor_names.get(factor_type, 'Unknown Factor')
# 示例调用
if __name__ == "__main__":
auth_client = AuthClient(base_url='https://api.solian.app/id')
account_identifier = ''#你的账号
selected_factor_id = "enter-code" # 用户需要选择一个因素
password = ''#你的密码
# 第一步:检查账户有效性
challenge = auth_client.check_account_validity(account_identifier)
if auth_client.error:
print(f"Error in check account validity: {auth_client.error}")
elif challenge:
print(f"Challenge information: {challenge}")
# 第二步:获取可用的认证因素
factors = auth_client.get_authentication_factors()
if auth_client.error:
print(f"Error in get authentication factors: {auth_client.error}")
elif factors:
print(f"Available factors: {factors}")
# 假设用户选择第一个因素
selected_factor_id = factors[0]['id']
print(f"Selected factor ID: {selected_factor_id}")
# 第三步:请求验证码(如果需要)
if factors[0]['type'] == 1:
auth_client.request_verification_code(selected_factor_id)
if auth_client.error:
print(f"Error in request verification code: {auth_client.error}")
elif auth_client.stage == 'enter-code':
print("Verification code requested. Please enter the code.")
# 第四步:验证所选的认证因素
auth_client.verify_factor(selected_factor_id, password)
if auth_client.error:
print(f"Error in verify factor: {auth_client.error}")
elif auth_client.stage == 'token-exchange':
print("Factor verified. Ready to exchange token.")
# 第五步:交换令牌以完成登录
if auth_client.stage == 'token-exchange':
token = auth_client.exchange_token()
if auth_client.error:
print(f"Error in exchange token: {auth_client.error}")
else:
print("Login successful!")
print(f"Token: {token}")
else:
print("Login failed.")
else:
print("No available authentication factors.")

72
AppMain.py Normal file
View File

@@ -0,0 +1,72 @@
import os
import threading
import traceback
import signal
import time
# 初始化设置
os.chdir(os.path.dirname(os.path.abspath(__file__))) # 切换到当前目录
import core.WebApp as WebApp
import core.PyWebPageAPI as PyWebPageAPI
import core.WebViewWIndow as WebViewWIndow
import ProjectCfg
def exit_handler(signum, frame):
"""处理退出信号"""
exit(0)
def find_available_port(start_port=5000, end_port=10000):
"""查找可用端口"""
for port in range(start_port, end_port):
if PyWebPageAPI.CheckPortAvailable(port):
return port
print("No available port found.")
exit(1)
def main():
"""主程序逻辑"""
signal.signal(signal.SIGINT, exit_handler)
try:
# 环境检查
if not PyWebPageAPI.EnvironmentCheck():
print("Error: Environment check failed.")
exit(1)
# Webview检查
webview_check = PyWebPageAPI.CheckWebviewInstalled()
if not webview_check[0]:
print("Warning: Webview not installed.")
print(webview_check[1])
StartPort = find_available_port()
# 启动应用
app_thread = threading.Thread(
target=WebApp.AppStart,
args=("127.0.0.1", StartPort),
daemon=True
)
app_thread.start()
try:
WebViewWIndow.WebViewWIndow(#窗口配置
f'http://127.0.0.1:{StartPort}',
ProjectCfg.WINDOW_TITLE,
ProjectCfg.WINDOW_WIDTH,
ProjectCfg.WINDOW_HEIGHT,
)
except Exception as e:
traceback.print_exc()
pass
while app_thread.is_alive():
time.sleep(0.1)
except Exception as e:
traceback.print_exc()
exit(1)
if __name__ == '__main__':
main()

92
CallServerAPIs.py Normal file
View File

@@ -0,0 +1,92 @@
import requests
from requests.exceptions import RequestException
import json
DOMAIN = "https://solian.app/api"
def _make_request(method: str, url: str, headers: dict, params: dict = None, data: dict = None) -> dict:
"""内部辅助函数用于发送HTTP请求并处理响应"""
try:
if method == 'GET':
response = requests.get(url, headers=headers, params=params)
elif method == 'POST':
response = requests.post(url, headers=headers, data=data)
elif method == 'DELETE':
response = requests.delete(url, headers=headers, params=params)
elif method == 'PATCH':
response = requests.patch(url, headers=headers, data=data)
else:
return {"error": "Unsupported HTTP method"}
if response.status_code != 200:
return {"error": response.status_code}
return response.json()
except json.JSONDecodeError:
return {"error": response.text}
except RequestException as e:
return {"error": str(e)}
def ActivityAPIs(cursor: str = '', filter: str = '', take: int = 20, debuginclude: str = '', Authorization: str = '') -> dict:
"""获取首页内容"""
url = f"{DOMAIN}/activities"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"cursor": cursor, "filter": filter, "take": take, "debuginclude": debuginclude}
return _make_request('GET', url, headers, params)
def ChatSummary(Authorization: str) -> dict:
"""获取聊天摘要"""
url = f"{DOMAIN}/chat/summary"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetChatMessageAllInfo(Authorization: str, roomid: str, offset: int = 0, take: int = 20) -> dict:
"""获取聊天消息"""
url = f"{DOMAIN}/chat/{roomid}/message"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"offset": offset, "take": take}
return _make_request('GET', url, headers, params)
def GetChatMessageBaseInfo(Authorization: str, roomid: str) -> dict:
"""获取聊天消息基础信息"""
url = f"{DOMAIN}/chat/{roomid}/message/base"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers)
def GetAMessageAllInfo(Authorization: str, roomid: str, messageid: str) -> dict:
"""获取具体聊天消息所有信息"""
url = f"{DOMAIN}/chat/{roomid}/message/{messageid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetAMessageInfo(Authorization: str, roomid: str, messageid: str) -> dict:
"""获取聊天消息基础信息。暂时作为保留"""
url = f"{DOMAIN}/chat/{roomid}/message/{messageid}/info"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers)
def DelMessage(Authorization: str, roomid: str, messageid: str) -> dict:
"""删除聊天消息"""
url = f"{DOMAIN}/chat/{roomid}/message/{messageid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers)
def GetChatRoomInfo(Authorization: str, id: str) -> dict:
"""获取聊天房间信息"""
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def ModifyChatRoomInfo(Authorization: str, id: str, data: dict) -> dict:
"""修改聊天房间信息"""
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, data=data)
def DeleteChatRoom(Authorization: str, id: str) -> dict:
"""删除聊天房间"""
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers)

10
ProjectCfg.py Normal file
View File

@@ -0,0 +1,10 @@
# 项目配置
PROFECT_NAME : str = 'SolianForDesktop'
PROFECT_VERSION : str = '1.0(alpha)'
SUPPORTED_PLATFORMS : list[str] = ['All']
WINDOW_TITLE : str = 'SolianForDesktop'
WINDOW_WIDTH : int = 700
WINDOW_HEIGHT : int = 500
PYTHON_VERSION_MIN : tuple[int,int] = (3, 7)
PYTHON_VERSION_MAX : tuple[int,int] = (0, 0)
LIB_LIST : list[str] = ['flask']

78
PyWebPageAPI.py Normal file
View File

@@ -0,0 +1,78 @@
import os
import sys
import importlib
import platform
import ProjectCfg
def EnvironmentCheck() -> bool:
"""
检查环境是否符合要求
:return:
"""
if sys.version_info < ProjectCfg.PYTHON_VERSION_MIN and ProjectCfg.PYTHON_VERSION_MIN!=(0,0):
return False
if sys.version_info > ProjectCfg.PYTHON_VERSION_MAX and ProjectCfg.PYTHON_VERSION_MAX!=(0,0):
return False
for lib in ProjectCfg.LIB_LIST:
if importlib.util.find_spec(lib) is None:
return False
if platform.system() not in ProjectCfg.SUPPORTED_PLATFORMS and ProjectCfg.SUPPORTED_PLATFORMS!=['All']:
return False
return True
def CheckWebviewInstalled() -> tuple[bool,str]:
"""
检查WebView框架是否已安装
:return:
"""
system_platform : str = platform.system()
if system_platform == "Windows":
# 检查WebView2
webview2_path : str = r'C:\Program Files (x86)\Microsoft\EdgeWebView2'
webview2_dll : str = r'\WebView2Loader.dll'
for root, dirs, files in os.walk(webview2_path):
if webview2_dll in files:
return (True,'')
return (False,"No installed WebView2")
elif system_platform == "Darwin":
# 检查WebKit框架
webkit_path = '/System/Library/Frameworks/WebKit.framework'
if os.path.exists(webkit_path):
return (True,'')
return (False,"Unsupported WKWebView")
elif system_platform == "Linux":
# 检查WebKitGTK
webkitgtk_paths = [
'/usr/lib/libwebkit2gtk-4.0.so',
'/usr/lib/x86_64-linux-gnu/libwebkit2gtk-4.0.so'
]
for path in webkitgtk_paths:
if os.path.exists(path):
return (True,'')
# 检查QtWebEngine
qtwebengine_paths = [
'/usr/lib/qt5/bin/qtwebengine_process',
'/usr/lib/x86_64-linux-gnu/qt5/bin/qtwebengine_process'
]
for path in qtwebengine_paths:
if os.path.exists(path):
return "QtWebEngine已安装"
return (False,"Not Found Webview framework.")
else:
return (False,"Unsupported OS.")
def CheckPortAvailable(port : int) -> bool:
"""
检查端口是否可用
:param port:
:return:
"""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) != 0
return False