对服务器API调用文件进行拆解与更新

This commit is contained in:
2025-09-12 23:31:26 +08:00
parent 740a4f67aa
commit f7b4f7158a
9 changed files with 713 additions and 221 deletions

View File

@@ -1,3 +1,5 @@
import platform
# 项目配置
PROFECT_NAME : str = 'SolianForDesktop'
PROFECT_VERSION : str = '1.0(alpha)'
@@ -8,3 +10,5 @@ WINDOW_HEIGHT : int = 500
PYTHON_VERSION_MIN : tuple[int,int] = (3, 7)
PYTHON_VERSION_MAX : tuple[int,int] = (0, 0)
LIB_LIST : list[str] = ['flask']
DOMAIN : str = 'https://solian.app/api'
UA : str = f"SolianForPythonApp/0.000.001 ({platform.system()})"

View File

@@ -1,234 +1,43 @@
import requests
from requests.exceptions import RequestException
import json
import platform
DOMAIN = "https://solian.app/api"
UA = f"SolianForPythonApp/0.000.001({platform.system()})"
def _make_request(method: str, url: str, headers: dict, params: dict = None, NormalCode: int = [204],RequestsBody: dict | int = None) -> dict:
"""内部辅助函数用于发送HTTP请求并处理响应"""
headers['User-Agent'] = UA
try:
if method == 'GET':
response = requests.get(url, headers=headers, params=params)
elif method == 'POST':
response = requests.post(url, headers=headers, data=json.dumps(RequestsBody))
elif method == 'DELETE':
response = requests.delete(url, headers=headers, params=params)
elif method == 'PATCH':
response = requests.patch(url, headers=headers, data=json.dumps(RequestsBody))
else:
return {"error": "Unsupported HTTP method"}
if response.status_code != 200 and response.status_code not in NormalCode:
return {"error": response.status_code}
return response.json()
except json.JSONDecodeError:
return {"error": response.text}
except RequestException as e:
return {"error": str(e)}
from ProjectCfg import DOMAIN
from SNAPI.CallServer import _make_request
from typing import List,Any,Optional,Dict
from SNAPI import *
###=========================活动API=================================
def ActivityAPIs(cursor: str = '', filter: str = '', take: int = 20, debuginclude: str = '', Authorization: str = '') -> dict:
"""获取首页内容"""
url = f"{DOMAIN}/activities"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"cursor": cursor, "filter": filter, "take": take, "debuginclude": debuginclude}
return _make_request('GET', url, headers, params)
return _make_request('GET', url, headers, params=params)
###=========================聊天API=================================
def ChatSummary(Authorization: str) -> dict:
"""获取聊天摘要"""
url = f"{DOMAIN}/chat/summary"
###==========================领域的发现==========================
def GetDiscover(query: str = '', take: int = 20, offset: int = 0, Authorization: str = '') -> dict:
"""获取发现"""
url = f"{DOMAIN}/discovery/realms"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {'query': query, 'take': take, 'offset': offset}
return _make_request('GET', url, headers, params=params)
###==========================领域聊天==========================
def RealmChat(Authorization: str,slug:str ) -> dict:
"""获取领域聊天"""
url = f"{DOMAIN}/realms/{slug}/chat"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetChatMessageAllInfo(Authorization: str, roomid: str, offset: int = 0, take: int = 20) -> dict:
"""获取聊天消息"""
url = f"{DOMAIN}/chat/{roomid}/message"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"offset": offset, "take": take}
return _make_request('GET', url, headers, params)
###==========================尊贵的Solar Network 恒星计划订阅用户专属翻译功能======= :(呜呜呜,没钱订阅
def SendChatMessage(Authorization: str, roomid: str,content: str,nonce: str,attachments_id: list,meta: dict,replied_message_id: str,forwarded_message_id: str) -> dict:
"""发送聊天消息"""
RequestsBody={
"content": content,
"nonce": nonce,
"attachments_id": attachments_id,
"meta": meta,
"replied_message_id": replied_message_id,
"forwarded_message_id": forwarded_message_id
}
url = f"{DOMAIN}/chat/{roomid}/message"
def TranslationText(Authorization: str,text:str ,Tolang:str,FromLang:str) -> dict:
"""翻译文本"""
url = f"{DOMAIN}/translation"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers,RequestsBody=RequestsBody,NormalCode=[201])
params = {"to":Tolang,"from":FromLang}
return _make_request('POST', url, headers, params=params,request_body=text)
def GetAMessageAllInfo(Authorization: str, roomid: str, messageid: str) -> dict:
"""获取具体聊天消息所有信息"""
url = f"{DOMAIN}/chat/{roomid}/message/{messageid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
def GetServerVersion() -> dict:
"""获取服务器版本"""
url = f"{DOMAIN}/version"
headers = {'accept': 'application/json'}
return _make_request('GET', url, headers)
def ModifyChatMessage(Authorization: str, roomid: str, messageid: str,content: str,nonce: str,attachments_id: list,meta: dict,replied_message_id: str,forwarded_message_id: str) -> dict:
"""修改聊天消息"""
RequestsBody={
"content": content,
"nonce": nonce,
"attachments_id": attachments_id,
"meta": meta,
"replied_message_id": replied_message_id,
"forwarded_message_id": forwarded_message_id
}
url = f"{DOMAIN}/chat/{roomid}/message/{messageid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers,RequestsBody=RequestsBody)
def DeleteAMessageInfo(Authorization: str, roomid: str, messageid: str) -> dict:
"""删除聊天消息基础信息。暂时作为保留"""
url = f"{DOMAIN}/chat/{roomid}/message/{messageid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers)
def DeleteChatMessage(Authorization: str, roomid: str, messageid: str) -> dict:
"""删除聊天消息"""
url = f"{DOMAIN}/chat/{roomid}/message/{messageid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers)
def SyncChatMessage(Authorization: str, roomid: str, last_sync_timestamp: int) -> dict:
"""同步聊天消息"""
url = f"{DOMAIN}/chat/{roomid}/sync"
RequestsBody={
"last_sync_timestamp": last_sync_timestamp
}
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers,RequestsBody=RequestsBody)
###=========================聊天室API=================================
def GetChatRoomInfo(Authorization: str, id: str) -> dict:
"""获取聊天房间信息"""
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def ModifyChatRoomInfo(Authorization: str, id: str, name: str, description: str, picture_id: str, backgournd_id: str, realm_id: str, is_community: bool, is_public: bool) -> dict:
"""修改聊天房间信息"""
RequestsBody={
"name": name,
"description": description,
"picture_id": picture_id,
"background_id": backgournd_id,
"realm_id": realm_id,
"is_community": is_community,
"is_public": is_public,
}
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers,RequestsBody=RequestsBody)
def DeleteChatRoom(Authorization: str, id: str) -> dict:
"""删除聊天房间"""
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers,NormalCode=[204])
def GetChatList(Authorization: str) -> dict:
"""获取聊天列表"""
url = f"{DOMAIN}/chat"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def CreateChatWithRealm(Authorization: str, name: str, description: str, picture_id: str, backgournd_id: str, realm_id: str, is_community: bool, is_public: bool) -> dict:
"""创建聊天房间"""
RequestsBody={
"name": name,
"description": description,
"picture_id": picture_id,
"background_id": backgournd_id,
"realm_id": realm_id,
"is_community": is_community,
"is_public": is_public,
}
url = f"{DOMAIN}/chat"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers,RequestsBody=RequestsBody,NormalCode=[201])
def GetChatToAccount(Authorization: str,accountid: str) -> dict:
"""获取聊天对象用户信息"""
url = f"{DOMAIN}/chat/direct/{accountid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetChatRoomSelfInfo(Authorization: str,roomid:str) -> dict:
"""获取聊天房间自己信息"""
url = f"{DOMAIN}/chat/{roomid}/members/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def JoinChatRoom(Authorization: str,roomid:str) -> dict:
"""加入聊天房间"""
url = f"{DOMAIN}/chat/{roomid}/members/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers,NormalCode=[201])
def LeaveChatRoom(Authorization: str,roomid:str) -> dict:
"""退出聊天房间"""
url = f"{DOMAIN}/chat/{roomid}/members/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers,NormalCode=[204])
def GetChatRoomMembers(Authorization: str,roomid:str) -> dict:
"""获取聊天房间成员"""
url = f"{DOMAIN}/chat/{roomid}/members"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def SendChatInvites(Authorization: str,accountid: str,role: str) -> dict:
"""发送聊天邀请"""
RequestsBody={
"related_user_id": accountid,
"role": role
}
url = f"{DOMAIN}/chat/invites"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers,RequestsBody=RequestsBody,NormalCode=[201])
def GetChatInvites(Authorization: str) -> dict:
"""获取聊天邀请"""
url = f"{DOMAIN}/chat/invites"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def AcceptChatInvite(Authorization: str,roomid:str) -> dict:
"""接受聊天邀请"""
url = f"{DOMAIN}/chat/invites/{roomid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers)
def DeclineChatInvite(Authorization: str,roomid:str) -> dict:
"""拒绝聊天邀请"""
url = f"{DOMAIN}/chat/invites/{roomid}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers)
def SetChatNotify(Authorization: str,roomid:str,notify_level: str,break_until: str) -> dict:
"""设置聊天通知级别"""
RequestsBody={
"notify_level": notify_level,
"break_until": break_until
}
url = f"{DOMAIN}/chat/{roomid}/members/me/notify"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers,RequestsBody=RequestsBody,NormalCode=[201])
def SetChatMemberRole(Authorization: str,roomid:str,membersid: str,role: int) -> dict:
"""设置聊天成员角色"""
url=f"{DOMAIN}/chat/{roomid}/members/{membersid}/role"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers,RequestsBody=role,NormalCode=[201])

30
core/SNAPI/CallServer.py Normal file
View File

@@ -0,0 +1,30 @@
import requests
import json
from requests.exceptions import RequestException
import platform
UA = f"SolianForPythonApp/0.000.001 ({platform.system()})"
def _make_request(method: str, url: str, headers: dict, params: dict = None, normal_codes: list = [200], request_body: dict = None) -> dict:
"""内部辅助函数用于发送HTTP请求并处理响应"""
headers['User-Agent'] = UA
try:
if method == 'GET':
response = requests.get(url, headers=headers, params=params)
elif method == 'POST':
response = requests.post(url, headers=headers, data=json.dumps(request_body))
elif method == 'DELETE':
response = requests.delete(url, headers=headers, params=params)
elif method == 'PATCH':
response = requests.patch(url, headers=headers, data=json.dumps(request_body))
else:
return {"error": "Unsupported HTTP method"}
if response.status_code not in normal_codes:
return {"error": f"Unexpected status code: {response.status_code}"}
return response.json()
except json.JSONDecodeError:
return {"error": response.text}
except RequestException as e:
return {"error": str(e)}

199
core/SNAPI/Chat.py Normal file
View File

@@ -0,0 +1,199 @@
from ProjectCfg import DOMAIN
from CallServer import _make_request
from typing import List,Any,Optional,Dict
###=========================聊天API=================================
def ChatSummary(Authorization: str) -> dict:
"""获取聊天摘要"""
url = f"{DOMAIN}/chat/summary"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetChatMessages(Authorization: str, room_id: str, offset: int = 0, take: int = 20) -> dict:
"""获取聊天消息"""
url = f"{DOMAIN}/chat/{room_id}/messages"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"offset": offset, "take": take}
return _make_request('GET', url, headers, params=params)
def PostChatMessage(Authorization: str, room_id: str, content: str, nonce: Optional[str] = None, attachments_id: Optional[List[str]] = None, meta: Optional[Dict[str, Any]] = None, replied_message_id: Optional[str] = None, forwarded_message_id: Optional[str] = None) -> dict:
"""发送聊天消息"""
body = {
"content": content,
"nonce": nonce,
"attachments_id": attachments_id,
"meta": meta,
"replied_message_id": replied_message_id,
"forwarded_message_id": forwarded_message_id,
}
url = f"{DOMAIN}/chat/{room_id}/messages"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetChatMessageById(Authorization: str, room_id: str, message_id: str) -> dict:
"""获取具体聊天消息所有信息"""
url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PatchChatMessage(Authorization: str, room_id: str, message_id: str, content: Optional[str] = None, nonce: Optional[str] = None, attachments_id: Optional[List[str]] = None, meta: Optional[Dict[str, Any]] = None, replied_message_id: Optional[str] = None, forwarded_message_id: Optional[str] = None) -> dict:
"""修改聊天消息"""
body = {
"content": content,
"nonce": nonce,
"attachments_id": attachments_id,
"meta": meta,
"replied_message_id": replied_message_id,
"forwarded_message_id": forwarded_message_id,
}
url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, request_body=body)
def DeleteChatMessage(Authorization: str, room_id: str, message_id: str) -> dict:
"""删除聊天消息"""
url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def PostChatSync(Authorization: str, room_id: str, last_sync_timestamp: int) -> dict:
"""同步聊天消息"""
body = {"last_sync_timestamp": last_sync_timestamp}
url = f"{DOMAIN}/chat/{room_id}/sync"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body)
###=========================聊天室API=================================
def GetChatRoom(Authorization: str, id: str) -> dict:
"""获取聊天房间信息"""
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PatchChatRoom(Authorization: str, id: str, name: Optional[str] = None, description: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None, realm_id: Optional[str] = None, is_community: Optional[bool] = None, is_public: Optional[bool] = None) -> dict:
"""修改聊天房间信息"""
body = {
"name": name,
"description": description,
"picture_id": picture_id,
"background_id": background_id,
"realm_id": realm_id,
"is_community": is_community,
"is_public": is_public,
}
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, request_body=body)
def DeleteChatRoom(Authorization: str, id: str) -> dict:
"""删除聊天房间"""
url = f"{DOMAIN}/chat/{id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def GetChatRooms(Authorization: str) -> dict:
"""获取聊天列表"""
url = f"{DOMAIN}/chat"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PostChatRoom(Authorization: str, name: str, description: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None, realm_id: Optional[str] = None, is_community: Optional[bool] = None, is_public: Optional[bool] = None) -> dict:
"""创建聊天房间"""
body = {
"name": name,
"description": description,
"picture_id": picture_id,
"background_id": background_id,
"realm_id": realm_id,
"is_community": is_community,
"is_public": is_public,
}
url = f"{DOMAIN}/chat"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetDirectChat(Authorization: str, account_id: str) -> dict:
"""获取聊天对象用户信息"""
url = f"{DOMAIN}/chat/direct/{account_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetChatRoomSelfInfo(Authorization: str, room_id: str) -> dict:
"""获取聊天房间自己信息"""
url = f"{DOMAIN}/chat/{room_id}/members/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def JoinChatRoom(Authorization: str, room_id: str) -> dict:
"""加入聊天房间"""
url = f"{DOMAIN}/chat/{room_id}/members/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, normal_codes=[201])
def LeaveChatRoom(Authorization: str, room_id: str) -> dict:
"""退出聊天房间"""
url = f"{DOMAIN}/chat/{room_id}/members/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def GetChatRoomMembers(Authorization: str, room_id: str, offset: int = 0, take: int = 20, with_status: bool = False) -> dict:
"""获取聊天房间成员"""
params = {
"offset": offset,
"take": take,
"withStatus": with_status,
}
url = f"{DOMAIN}/chat/{room_id}/members"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers, params=params)
def PostChatMemberInvite(Authorization: str, room_id: str, related_user_id: str, role: int) -> dict:
"""发送聊天邀请"""
body = {
"related_user_id": related_user_id,
"role": role,
}
url = f"{DOMAIN}/chat/invites/{room_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetChatInvites(Authorization: str) -> dict:
"""获取聊天邀请"""
url = f"{DOMAIN}/chat/invites"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def AcceptChatInvite(Authorization: str, room_id: str) -> dict:
"""接受聊天邀请"""
url = f"{DOMAIN}/chat/invites/{room_id}/accept"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers)
def DeclineChatInvite(Authorization: str, room_id: str) -> dict:
"""拒绝聊天邀请"""
url = f"{DOMAIN}/chat/invites/{room_id}/decline"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers)
def SetChatNotify(Authorization: str, room_id: str, notify_level: int, break_until: Optional[str] = None) -> dict:
"""设置聊天通知级别"""
body = {
"notify_level": notify_level,
"break_until": break_until,
}
url = f"{DOMAIN}/chat/{room_id}/members/me/notify"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201])
def SetChatMemberRole(Authorization: str, room_id: str, member_id: str, role: int) -> dict:
"""设置聊天成员角色"""
body = {"role": role}
url = f"{DOMAIN}/chat/{room_id}/members/{member_id}/role"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201])
def KickChatMember(Authorization: str, room_id: str, member_id: str) -> dict:
"""踢出聊天成员"""
url = f"{DOMAIN}/chat/{room_id}/members/{member_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])

69
core/SNAPI/Poll.py Normal file
View File

@@ -0,0 +1,69 @@
from ProjectCfg import DOMAIN
from CallServer import _make_request
from typing import List,Any,Optional,Dict
###==========================投票功能============================
def GetPoll(poll_id: str, Authorization: str) -> dict:
"""获取投票"""
url = f"{DOMAIN}/polls/{poll_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PostPoll(Authorization: str, title: Optional[str] = None, description: Optional[str] = None, ended_at: Optional[str] = None, is_anonymous: Optional[bool] = None, questions: Optional[List[Dict[str, Any]]] = None) -> dict:
"""创建投票"""
body = {
"title": title,
"description": description,
"ended_at": ended_at,
"is_anonymous": is_anonymous,
"questions": questions,
}
url = f"{DOMAIN}/polls"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def DeletePoll(Authorization: str, poll_id: str) -> dict:
"""删除投票"""
url = f"{DOMAIN}/polls/{poll_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def AnswerPoll(Authorization: str, poll_id: str, answer: Dict[str, Any]) -> dict:
"""回答投票"""
body = {"answer": answer}
url = f"{DOMAIN}/polls/{poll_id}/answer"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def DeletePollAnswer(Authorization: str, poll_id: str) -> dict:
"""删除投票回答"""
url = f"{DOMAIN}/polls/{poll_id}/answer"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def GetPollFeedback(Authorization: str, poll_id: str, offset: int = 0, take: int = 20) -> dict:
"""获取投票反馈"""
url = f"{DOMAIN}/polls/{poll_id}/feedback"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {'offset': offset, 'take': take}
return _make_request('GET', url, headers, params=params)
def GetMyPolls(Authorization: str, pub: Optional[str] = None, active: bool = False, offset: int = 0, take: int = 20) -> dict:
"""获取我的投票"""
url = f"{DOMAIN}/polls/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {'offset': offset, 'take': take, 'pub': pub, 'active': active}
return _make_request('GET', url, headers, params=params)
def PatchPoll(Authorization: str, poll_id: str, title: Optional[str] = None, description: Optional[str] = None, ended_at: Optional[str] = None, is_anonymous: Optional[bool] = None, questions: Optional[List[Dict[str, Any]]] = None) -> dict:
"""修改投票"""
body = {
"title": title,
"description": description,
"ended_at": ended_at,
"is_anonymous": is_anonymous,
"questions": questions,
}
url = f"{DOMAIN}/polls/{poll_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201])

148
core/SNAPI/Post.py Normal file
View File

@@ -0,0 +1,148 @@
from ProjectCfg import DOMAIN
from CallServer import _make_request
from typing import List,Any,Optional,Dict
###==========================帖子管理============================
def GetFeaturedPosts(Authorization: str) -> dict:
"""获取精选帖子"""
url = f"{DOMAIN}/posts/featured"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetPosts(Authorization: str, offset: int = 0, take: int = 20, pub: Optional[str] = None, realm: Optional[str] = None, post_type: Optional[int] = None, categories: Optional[List[str]] = None, tags: Optional[List[str]] = None, query: Optional[str] = None, vector: bool = False, media: bool = False, shuffle: bool = False, replies: Optional[bool] = None, pinned: Optional[bool] = None) -> dict:
"""获取帖子列表"""
url = f"{DOMAIN}/posts"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {
"offset": offset,
"take": take,
"pub": pub,
"realm": realm,
"type": post_type,
"categories": categories,
"tags": tags,
"query": query,
"vector": vector,
"media": media,
"shuffle": shuffle,
"replies": replies,
"pinned": pinned,
}
return _make_request('GET', url, headers, params=params)
def PostPost(Authorization: str, title: str, description: str, slug: str, content: str, visibility: int, post_type: int, embed_view: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, categories: Optional[List[str]] = None, attachments: Optional[List[str]] = None, published_at: Optional[str] = None, replied_post_id: Optional[str] = None, forwarded_post_id: Optional[str] = None, realm_id: Optional[str] = None, poll_id: Optional[str] = None) -> dict:
"""创建帖子"""
body = {
"title": title,
"description": description,
"slug": slug,
"content": content,
"visibility": visibility,
"type": post_type,
"embed_view": embed_view,
"tags": tags,
"categories": categories,
"attachments": attachments,
"published_at": published_at,
"replied_post_id": replied_post_id,
"forwarded_post_id": forwarded_post_id,
"realm_id": realm_id,
"poll_id": poll_id,
}
url = f"{DOMAIN}/posts"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetPostById(Authorization: str, post_id: str) -> dict:
"""获取帖子详情"""
url = f"{DOMAIN}/posts/{post_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PatchPost(Authorization: str, post_id: str, title: Optional[str] = None, description: Optional[str] = None, slug: Optional[str] = None, content: Optional[str] = None, visibility: Optional[int] = None, post_type: Optional[int] = None, embed_view: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, categories: Optional[List[str]] = None, attachments: Optional[List[str]] = None, published_at: Optional[str] = None, replied_post_id: Optional[str] = None, forwarded_post_id: Optional[str] = None, realm_id: Optional[str] = None, poll_id: Optional[str] = None) -> dict:
"""修改帖子"""
body = {
"title": title,
"description": description,
"slug": slug,
"content": content,
"visibility": visibility,
"type": post_type,
"embed_view": embed_view,
"tags": tags,
"categories": categories,
"attachments": attachments,
"published_at": published_at,
"replied_post_id": replied_post_id,
"forwarded_post_id": forwarded_post_id,
"realm_id": realm_id,
"poll_id": poll_id,
}
url = f"{DOMAIN}/posts/{post_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, request_body=body)
def DeletePost(Authorization: str, post_id: str) -> dict:
"""删除帖子"""
url = f"{DOMAIN}/posts/{post_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def GetPostReactions(Authorization: str, post_id: str, symbol: Optional[str] = None, offset: int = 0, take: int = 20) -> dict:
"""获取帖子反应"""
url = f"{DOMAIN}/posts/{post_id}/reactions"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"symbol": symbol, "offset": offset, "take": take}
return _make_request('GET', url, headers, params=params)
def PostPostReaction(Authorization: str, post_id: str, symbol: str, attitude: int) -> dict:
"""发送帖子反应"""
body = {"symbol": symbol, "attitude": attitude}
url = f"{DOMAIN}/posts/{post_id}/reactions"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetFeaturedPostReplies(Authorization: str, post_id: str) -> dict:
"""获取精选帖子回复"""
url = f"{DOMAIN}/posts/{post_id}/replies/featured"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetPinnedPostReplies(Authorization: str, post_id: str) -> dict:
"""获取置顶帖子回复"""
url = f"{DOMAIN}/posts/{post_id}/replies/pinned"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetPostReplies(Authorization: str, post_id: str, offset: int = 0, take: int = 20) -> dict:
"""获取帖子回复"""
url = f"{DOMAIN}/posts/{post_id}/replies"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"offset": offset, "take": take}
return _make_request('GET', url, headers, params=params)
def GetPostAwards(Authorization: str, post_id: str, offset: int = 0, take: int = 20) -> dict:
"""获取帖子奖励"""
url = f"{DOMAIN}/posts/{post_id}/awards"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"offset": offset, "take": take}
return _make_request('GET', url, headers, params=params)
def PostPostAward(Authorization: str, post_id: str, amount: float, attitude: int, message: Optional[str] = None) -> dict:
"""发送帖子奖励"""
body = {"amount": amount, "attitude": attitude, "message": message}
url = f"{DOMAIN}/posts/{post_id}/awards"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def PostPinPost(Authorization: str, post_id: str, mode: int) -> dict:
"""置顶帖子"""
body = {"mode": mode}
url = f"{DOMAIN}/posts/{post_id}/pin"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def DeletePinPost(Authorization: str, post_id: str) -> dict:
"""取消置顶帖子"""
url = f"{DOMAIN}/posts/{post_id}/pin"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])

View File

@@ -0,0 +1,42 @@
from ProjectCfg import DOMAIN
from CallServer import _make_request
from typing import List,Any,Optional,Dict
###==========================帖子分类管理============================
def GetPostCategories(Authorization: str, query: Optional[str] = None, offset: int = 0, take: int = 20, order: Optional[str] = None) -> dict:
"""获取帖子分类"""
url = f"{DOMAIN}/posts/categories"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"query": query, "offset": offset, "take": take, "order": order}
return _make_request('GET', url, headers, params=params)
def PostPostCategory(Authorization: str, slug: Optional[str] = None, name: Optional[str] = None) -> dict:
"""创建帖子分类"""
body = {"slug": slug, "name": name}
url = f"{DOMAIN}/posts/categories"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetPostCategoryBySlug(Authorization: str, slug: str) -> dict:
"""获取帖子分类详情"""
url = f"{DOMAIN}/posts/categories/{slug}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PostSubscribeCategory(Authorization: str, slug: str) -> dict:
"""订阅帖子分类"""
url = f"{DOMAIN}/posts/categories/{slug}/subscribe"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, normal_codes=[201])
def PostUnsubscribeCategory(Authorization: str, slug: str) -> dict:
"""取消订阅帖子分类"""
url = f"{DOMAIN}/posts/categories/{slug}/unsubscribe"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, normal_codes=[201])
def GetCategorySubscription(Authorization: str, slug: str) -> dict:
"""获取分类订阅状态"""
url = f"{DOMAIN}/posts/categories/{slug}/subscription"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)

42
core/SNAPI/PostTag.py Normal file
View File

@@ -0,0 +1,42 @@
from ProjectCfg import DOMAIN
from CallServer import _make_request
from typing import List,Any,Optional,Dict
###==========================帖子标签管理============================
def GetPostTags(Authorization: str, query: Optional[str] = None, offset: int = 0, take: int = 20, order: Optional[str] = None) -> dict:
"""获取帖子标签"""
url = f"{DOMAIN}/posts/tags"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"query": query, "offset": offset, "take": take, "order": order}
return _make_request('GET', url, headers, params=params)
def PostTag(Authorization: str, slug: Optional[str] = None, name: Optional[str] = None) -> dict:
"""创建帖子标签"""
body = {"slug": slug, "name": name}
url = f"{DOMAIN}/posts/tags"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetTagBySlug(Authorization: str, slug: str) -> dict:
"""获取标签详情"""
url = f"{DOMAIN}/posts/tags/{slug}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PostSubscribeTag(Authorization: str, slug: str) -> dict:
"""订阅标签"""
url = f"{DOMAIN}/posts/tags/{slug}/subscribe"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, normal_codes=[201])
def PostUnsubscribeTag(Authorization: str, slug: str) -> dict:
"""取消订阅标签"""
url = f"{DOMAIN}/posts/tags/{slug}/unsubscribe"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, normal_codes=[201])
def GetTagSubscription(Authorization: str, slug: str) -> dict:
"""获取标签订阅状态"""
url = f"{DOMAIN}/posts/tags/{slug}/subscription"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)

149
core/SNAPI/Publisher.py Normal file
View File

@@ -0,0 +1,149 @@
from ProjectCfg import DOMAIN
from CallServer import _make_request
from typing import List,Any,Optional,Dict
###==========================发布者管理============================
def GetPublisher(Authorization: str, name: str) -> dict:
"""获取发布者信息"""
url = f"{DOMAIN}/publishers/{name}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PatchPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict:
"""修改发布者信息"""
body = {
"nick": nick,
"bio": bio,
"picture_id": picture_id,
"background_id": background_id,
}
url = f"{DOMAIN}/publishers/{name}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201])
def DeletePublisher(Authorization: str, name: str) -> dict:
"""删除发布者"""
url = f"{DOMAIN}/publishers/{name}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def GetPublisherStats(Authorization: str, name: str) -> dict:
"""获取发布者统计信息"""
url = f"{DOMAIN}/publishers/{name}/stats"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetPublishersOfAccountId(Authorization: str, account_id: str) -> dict:
"""获取指定账户的发布者"""
url = f"{DOMAIN}/publishers/of/{account_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetPublishers(Authorization: str) -> dict:
"""获取所有发布者"""
url = f"{DOMAIN}/publishers"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PostPublisherInvite(Authorization: str, name: str, related_user_id: str, role: int) -> dict:
"""发送发布者邀请"""
body = {"related_user_id": related_user_id, "role": role}
url = f"{DOMAIN}/publishers/invites/{name}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetPublisherInvites(Authorization: str, name: str) -> dict:
"""获取发布者邀请"""
url = f"{DOMAIN}/publishers/invites/{name}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def AcceptPublisherInvite(Authorization: str, name: str) -> dict:
"""接受发布者邀请"""
url = f"{DOMAIN}/publishers/invites/{name}/accept"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, normal_codes=[201])
def DeclinePublisherInvite(Authorization: str, name: str) -> dict:
"""拒绝发布者邀请"""
url = f"{DOMAIN}/publishers/invites/{name}/decline"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, normal_codes=[201])
def DeletePublisherMember(Authorization: str, name: str, member_id: str) -> dict:
"""删除发布者成员"""
url = f"{DOMAIN}/publishers/{name}/members/{member_id}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def PostIndividualPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict:
"""创建个人发布者"""
body = {
"name": name,
"nick": nick,
"bio": bio,
"picture_id": picture_id,
"background_id": background_id,
}
url = f"{DOMAIN}/publishers/individual"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def PostOrganizationPublisher(Authorization: str, name: str, realm_slug: str) -> dict:
"""创建组织发布者"""
body = {"name": name, "realm_slug": realm_slug}
url = f"{DOMAIN}/publishers/organization/{realm_slug}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def GetPublisherMembers(Authorization: str, name: str, offset: int = 0, take: int = 20) -> dict:
"""获取发布者成员"""
url = f"{DOMAIN}/publishers/{name}/members"
headers = {'accept': 'application/json', 'Authorization': Authorization}
params = {"offset": offset, "take": take}
return _make_request('GET', url, headers, params=params)
def GetMyPublisherMember(Authorization: str, name: str) -> dict:
"""获取我的发布者成员信息"""
url = f"{DOMAIN}/publishers/{name}/members/me"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PostPublisherFeature(Authorization: str, name: str, flag: str, expired_at: Optional[str] = None) -> dict:
"""创建发布者特性"""
body = {"flag": flag, "expired_at": expired_at}
url = f"{DOMAIN}/publishers/{name}/features"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
def DeletePublisherFeature(Authorization: str, name: str, flag: str) -> dict:
"""删除发布者特性"""
url = f"{DOMAIN}/publishers/{name}/features/{flag}"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('DELETE', url, headers, normal_codes=[204])
def GetPublisherSubscriptions(Authorization: str) -> dict:
"""获取发布者订阅状态"""
url = f"{DOMAIN}/publishers/subscriptions"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def GetPublisherSubscription(Authorization: str, name: str) -> dict:
"""获取特定发布者的订阅状态"""
url = f"{DOMAIN}/api/publishers/{name}/subscription"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('GET', url, headers)
def PostPublisherSubscribe(Authorization: str, name: str, subscribeRequest: Optional[dict] = None,tier:int=0) -> dict:
"""订阅特定发布者"""
RequestsBody={
"tier":tier
}
url = f"{DOMAIN}/api/publishers/{name}/subscribe"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers, request_body=subscribeRequest)
def PostPublisherUnsubscribe(Authorization: str, name: str) -> dict:
"""取消订阅特定发布者"""
url = f"{DOMAIN}/api/publishers/{name}/unsubscribe"
headers = {'accept': 'application/json', 'Authorization': Authorization}
return _make_request('POST', url, headers)