修改一些功能

This commit is contained in:
2025-09-14 16:42:27 +08:00
parent 9b791b5567
commit ba18997410
6 changed files with 551 additions and 23 deletions

View File

@@ -1,6 +1,19 @@
###调用服务器API
###By Liang-work(NRFF&nanci)
###Version 1.0
###License: BSD
from ProjectCfg import DOMAIN
from .SNAPI.CallServer import _make_request
from .SNAPI import *
import httpx
def OPITIONS(self,path,params):
response = httpx.options(f"{self.url}{path}",params=params,headers=self.headers)
if response:
return {"status": response.status_code, "data": response.json()}
else:
return {"status": response.status_code, "data": {}}
###=========================活动API=================================
@@ -43,3 +56,22 @@ def GetServerVersion() -> dict:
url = f"{DOMAIN}/version"
headers = {'accept': 'application/json'}
return _make_request('GET', url, headers)
###==========================签到==========================
def SignIn(Authorization: str) -> dict:
"""
签到
:return:
"""
return OPITIONS("/id/accounts/me/check-in",{})
###==========================获取通知==========================
def GetNotifications(Authorization: str,offset: int = 0, take: int = 20) -> dict:
"""
获取通知
:param offset:
:param take:
:return:
"""
return _make_request("GET","/pusher/notification",{"offset":offset,"take":take})

View File

@@ -0,0 +1,408 @@
import requests
from typing import Optional, Dict, Any, List
from .CallServer import _make_request
DOMAIN = "https://id.solian.app"
###==========================安全报告API==========================
def create_report(
resource_identifier: str,
report_type: int,
reason: str,
Authorization: str = ''
) -> dict:
"""创建安全报告"""
url = f"{DOMAIN}/api/safety/reports"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}",
'Content-Type': 'application/json'
}
request_body = {
"resource_identifier": resource_identifier,
"type": report_type,
"reason": reason
}
return _make_request('POST', url, headers, request_body=request_body)
def get_reports(
offset: int = 0,
take: int = 20,
include_resolved: bool = False,
Authorization: str = ''
) -> dict:
"""获取安全报告列表"""
url = f"{DOMAIN}/api/safety/reports"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
params = {
"offset": offset,
"take": take,
"includeResolved": include_resolved
}
return _make_request('GET', url, headers, params=params)
def get_my_reports(
offset: int = 0,
take: int = 20,
include_resolved: bool = False,
Authorization: str = ''
) -> dict:
"""获取我的安全报告列表"""
url = f"{DOMAIN}/api/safety/reports/me"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
params = {
"offset": offset,
"take": take,
"includeResolved": include_resolved
}
return _make_request('GET', url, headers, params=params)
def get_report_by_id(
report_id: str,
Authorization: str = ''
) -> dict:
"""通过ID获取安全报告"""
url = f"{DOMAIN}/api/safety/reports/{report_id}"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
return _make_request('GET', url, headers)
def resolve_report(
report_id: str,
resolution: str,
Authorization: str = ''
) -> dict:
"""解决安全报告"""
url = f"{DOMAIN}/api/safety/reports/{report_id}/resolve"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}",
'Content-Type': 'application/json'
}
request_body = {
"resolution": resolution
}
return _make_request('POST', url, headers, request_body=request_body)
###==========================账户API==========================
def get_account_by_name(
name: str,
Authorization: str = ''
) -> dict:
"""通过名称获取账户信息"""
url = f"{DOMAIN}/api/accounts/{name}"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
return _make_request('GET', url, headers)
def create_account(
name: str,
nick: str,
email: str,
password: str,
captcha_token: str,
language: Optional[str] = None
) -> dict:
"""创建新账户"""
url = f"{DOMAIN}/api/accounts"
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
request_body = {
"name": name,
"nick": nick,
"email": email,
"password": password,
"captcha_token": captcha_token
}
if language:
request_body["language"] = language
return _make_request('POST', url, headers, request_body=request_body)
def recover_password(
account: str,
captcha_token: str
) -> dict:
"""恢复密码"""
url = f"{DOMAIN}/api/accounts/recovery/password"
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
request_body = {
"account": account,
"captcha_token": captcha_token
}
return _make_request('POST', url, headers, request_body=request_body)
###==========================当前账户API==========================
def get_my_account(
Authorization: str
) -> dict:
"""获取当前账户信息"""
url = f"{DOMAIN}/api/accounts/me"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
return _make_request('GET', url, headers)
def update_my_account(
nick: Optional[str] = None,
language: Optional[str] = None,
region: Optional[str] = None,
Authorization: str = ''
) -> dict:
"""更新当前账户基本信息"""
url = f"{DOMAIN}/api/accounts/me"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}",
'Content-Type': 'application/json'
}
request_body = {}
if nick is not None:
request_body["nick"] = nick
if language is not None:
request_body["language"] = language
if region is not None:
request_body["region"] = region
return _make_request('PATCH', url, headers, request_body=request_body)
def delete_my_account(
Authorization: str
) -> dict:
"""删除当前账户"""
url = f"{DOMAIN}/api/accounts/me"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
return _make_request('DELETE', url, headers)
###==========================认证API==========================
def create_auth_challenge(
platform: int,
account: str,
device_id: str,
device_name: Optional[str] = None,
audiences: Optional[List[str]] = None,
scopes: Optional[List[str]] = None
) -> dict:
"""创建认证挑战"""
url = f"{DOMAIN}/api/auth/challenge"
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
request_body = {
"platform": platform,
"account": account,
"device_id": device_id
}
if device_name:
request_body["device_name"] = device_name
if audiences:
request_body["audiences"] = audiences
if scopes:
request_body["scopes"] = scopes
return _make_request('POST', url, headers, request_body=request_body)
def perform_auth_challenge(
challenge_id: str,
factor_id: str,
password: str
) -> dict:
"""执行认证挑战"""
url = f"{DOMAIN}/api/auth/challenge/{challenge_id}"
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
request_body = {
"factor_id": factor_id,
"password": password
}
return _make_request('PATCH', url, headers, request_body=request_body)
def exchange_token(
grant_type: Optional[str] = None,
refresh_token: Optional[str] = None,
code: Optional[str] = None
) -> dict:
"""交换令牌"""
url = f"{DOMAIN}/api/auth/token"
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
request_body = {}
if grant_type:
request_body["grant_type"] = grant_type
if refresh_token:
request_body["refresh_token"] = refresh_token
if code:
request_body["code"] = code
return _make_request('POST', url, headers, request_body=request_body)
###==========================订阅API==========================
def get_subscriptions(
offset: int = 0,
take: int = 20,
Authorization: str = ''
) -> dict:
"""获取订阅列表"""
url = f"{DOMAIN}/api/subscriptions"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
params = {
"offset": offset,
"take": take
}
return _make_request('GET', url, headers, params=params)
def create_subscription(
identifier: str,
payment_method: str,
payment_details: dict,
coupon: Optional[str] = None,
cycle_duration_days: Optional[int] = None,
is_free_trial: bool = False,
is_auto_renewal: bool = False,
Authorization: str = ''
) -> dict:
"""创建订阅"""
url = f"{DOMAIN}/api/subscriptions"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}",
'Content-Type': 'application/json'
}
request_body = {
"identifier": identifier,
"payment_method": payment_method,
"payment_details": payment_details,
"is_free_trial": is_free_trial,
"is_auto_renewal": is_auto_renewal
}
if coupon:
request_body["coupon"] = coupon
if cycle_duration_days:
request_body["cycle_duration_days"] = cycle_duration_days
return _make_request('POST', url, headers, request_body=request_body)
###==========================钱包API==========================
def get_wallet(
Authorization: str
) -> dict:
"""获取钱包信息"""
url = f"{DOMAIN}/api/wallets"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
return _make_request('GET', url, headers)
def get_wallet_transactions(
offset: int = 0,
take: int = 20,
Authorization: str = ''
) -> dict:
"""获取钱包交易记录"""
url = f"{DOMAIN}/api/wallets/transactions"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
params = {
"offset": offset,
"take": take
}
return _make_request('GET', url, headers, params=params)
def update_wallet_balance(
account_id: str,
amount: float,
currency: str,
remark: Optional[str] = None,
Authorization: str = ''
) -> dict:
"""更新钱包余额"""
url = f"{DOMAIN}/api/wallets/balance"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}",
'Content-Type': 'application/json'
}
request_body = {
"account_id": account_id,
"amount": amount,
"currency": currency
}
if remark:
request_body["remark"] = remark
return _make_request('POST', url, headers, request_body=request_body)
###==========================关系API==========================
def get_relationships(
offset: int = 0,
take: int = 20,
Authorization: str = ''
) -> dict:
"""获取关系列表"""
url = f"{DOMAIN}/api/relationships"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}"
}
params = {
"offset": offset,
"take": take
}
return _make_request('GET', url, headers, params=params)
def create_relationship(
user_id: str,
status: int,
Authorization: str = ''
) -> dict:
"""创建关系"""
url = f"{DOMAIN}/api/relationships/{user_id}"
headers = {
'accept': 'application/json',
'Authorization': f"Bearer {Authorization}",
'Content-Type': 'application/json'
}
request_body = {
"status": status
}
return _make_request('POST', url, headers, request_body=request_body)
###==========================其他API==========================
def get_version() -> dict:
"""获取API版本"""
url = f"{DOMAIN}/api/version"
headers = {
'accept': 'application/json'
}
return _make_request('GET', url, headers)
def get_ip_check() -> dict:
"""获取IP检查信息"""
url = f"{DOMAIN}/api/ip-check"
headers = {
'accept': 'application/json'
}
return _make_request('GET', url, headers)

View File

@@ -1,16 +1,27 @@
import requests
import requests#导入必要的库
import json
from requests.exceptions import RequestException
import platform
from .. import PyWebPageAPI
UA = f"SolianForPythonApp/0.0.1(A) ({PyWebPageAPI.GetDeviceInfo()})"
def _make_request(method: str, url: str, headers: dict, params: dict = None, normal_codes: list = [200], request_body: dict = None) -> dict:
"""内部辅助函数用于发送HTTP请求并处理响应"""
headers['User-Agent'] = UA
"""
params:
可选参数用于GET请求的查询参数
request_body:
可选参数用于POST和PATCH请求的请求体
normal_codes:
可选参数用于指定正常的HTTP状态码列表默认值为[200]
return:
字典类型,包含请求的响应数据或错误信息
内部辅助函数用于发送HTTP请求并处理响应
支持GET、POST、DELETE、PATCH请求
自动处理JSON格式的请求体和响应体
处理常见的HTTP错误码返回统一的错误信息格式"""
headers['User-Agent'] = UA#添加UA
try:
if method == 'GET':
if method == 'GET':#请求类型选择
response = requests.get(url, headers=headers, params=params)
elif method == 'POST':
response = requests.post(url, headers=headers, data=json.dumps(request_body))
@@ -21,11 +32,11 @@ def _make_request(method: str, url: str, headers: dict, params: dict = None, nor
else:
return {"error": "Unsupported HTTP method"}
if response.status_code not in normal_codes:
if response.status_code not in normal_codes:#不在计划内的错误码
return {"error": f"Unexpected status code: {response.status_code}"}
return response.json()
except json.JSONDecodeError:
except json.JSONDecodeError:#json解析错误
return {"error": response.text}
except RequestException as e:
return {"error": str(e)}

View File

@@ -1 +1,5 @@
from . import Chat, Poll, Post,PostCategory,PostTag,Publisher,WebArticle,WebFeed,WebFeedPublic,WebReader,Sticker,CallServer,Realm,RealtimeCall
###Solsynth network的服务器内部API封装
###By Liang-work(NRFF&nanci)
###Version 1.0
###License: BSD
from . import Chat, Poll, Post,PostCategory,PostTag,Publisher,WebArticle,WebFeed,WebFeedPublic,WebReader,Sticker,CallServer,Realm,RealtimeCall,AccountServices

56
core/SolarNetworkDrive.py Normal file
View File

@@ -0,0 +1,56 @@
###官方的文件分享池API
###By Liang-work(NRFF&nanci)
###Version 1.0
###License: BSD
DOMAIN = 'https://api.solian.app'
import base64
import os
from tusclient import client
import requests
import traceback
def DownloadFileFromServer(ID:str,SavePath:str='../temp/') -> bytes:
"""
从服务器下载文件
:param ID: 文件ID
:param SavePath: 本地保存路径
:return:
"""
response = requests.get(f"{DOMAIN}/drive/files/{ID}")
if response.status_code == 200:
with open(SavePath, "wb") as f:
f.write(response.content)
return response.status_code
else:
return response.status_code
def UploadFileToServer(token:str,file_path,file_type,pool=1,chunk_size=1024*1024*10):
"""
使用tuspy上传文件
:param file_path: 文件路径
:param file_type: 文件类型 accept_types: ["image/*", "video/*", "audio/*"]
:param pool: 存储池(默认:Solar Network Shared)
:param chunk_size: 分块大小
:return: 文件id
"""
from tusclient import client
pools = [
"07bc8e33-f3aa-4e35-83c2-c63b2b67d8cd", # 雨云存储 中国
"c53136a6-9152-4ecb-9f88-43c41438c23e", # Solar Network Shared
"500e5ed8-bd44-4359-bc0a-ec85e2adf447", # Solar Network Driver
]
headers = {
"Authorization": f"Bearer {token}",
"X-FilePool": pools[pool]
}
tus = client.TusClient('https://api.solian.app/drive/tus')
tus.set_headers(headers)
file = tus.uploader(file_path, chunk_size=chunk_size)
file.metadata = {
"content-type": file_type,
"filename": os.path.basename(file_path)
}
file.upload()
return file.url.split("/")[-1]

View File

@@ -25,17 +25,8 @@
<div class="content">
<h2>欢迎使用</h2>
<div class="card">
<h3>卡片标题</h3>
<p>这是一个WinUI风格的卡片内容区域。您可以在这里放置各种内容。</p>
</div>
<div class="card">
<h3>另一个卡片</h3>
<p>响应式设计使得在窄屏设备上侧边栏可以隐藏。</p>
</div>
<div class="card">
<h3>更多内容</h3>
<p>主内容区域可以滚动,侧边栏在移动设备上会覆盖内容。</p>
<div id="posts-container">
<!-- 帖子卡片将通过JavaScript动态加载 -->
</div>
</div>
</div>
@@ -45,10 +36,36 @@
document.getElementById('sidebar').classList.toggle('open');
});
// 为侧边栏菜单项添加悬停效果
document.querySelectorAll('.sidebar-menu li').forEach(item => {
item.style.cursor = 'pointer';
// 获取帖子数据
async function fetchPosts() {
try {
const response = await fetch('/api/posts');
const posts = await response.json();
const container = document.getElementById('posts-container');
container.innerHTML = '';
posts.forEach(post => {
const card = document.createElement('div');
card.className = 'card';
card.innerHTML = `
<h3>${post.title}</h3>
<p class="post-content">${post.content}</p>
<p class="post-description">${post.description}</p>
<div class="post-author">
<span class="username">@${post.author.username}</span>
<span class="nickname">${post.author.nickname}</span>
</div>
`;
container.appendChild(card);
});
} catch (error) {
console.error('获取帖子失败:', error);
}
}
// 页面加载时获取帖子
document.addEventListener('DOMContentLoaded', fetchPosts);
</script>
</body>
</html>