diff --git a/ProjectCfg.py b/ProjectCfg.py index d8b299e..defdb96 100644 --- a/ProjectCfg.py +++ b/ProjectCfg.py @@ -1,3 +1,5 @@ +import platform + # 项目配置 PROFECT_NAME : str = 'SolianForDesktop' PROFECT_VERSION : str = '1.0(alpha)' @@ -8,3 +10,5 @@ WINDOW_HEIGHT : int = 500 PYTHON_VERSION_MIN : tuple[int,int] = (3, 7) PYTHON_VERSION_MAX : tuple[int,int] = (0, 0) LIB_LIST : list[str] = ['flask'] +DOMAIN : str = 'https://solian.app/api' +UA : str = f"SolianForPythonApp/0.000.001 ({platform.system()})" \ No newline at end of file diff --git a/core/CallServerAPIs.py b/core/CallServerAPIs.py index eb1f0ea..94a0927 100644 --- a/core/CallServerAPIs.py +++ b/core/CallServerAPIs.py @@ -1,234 +1,43 @@ -import requests -from requests.exceptions import RequestException -import json -import platform - -DOMAIN = "https://solian.app/api" -UA = f"SolianForPythonApp/0.000.001({platform.system()})" - -def _make_request(method: str, url: str, headers: dict, params: dict = None, NormalCode: int = [204],RequestsBody: dict | int = None) -> dict: - """内部辅助函数,用于发送HTTP请求并处理响应""" - headers['User-Agent'] = UA - try: - if method == 'GET': - response = requests.get(url, headers=headers, params=params) - elif method == 'POST': - response = requests.post(url, headers=headers, data=json.dumps(RequestsBody)) - elif method == 'DELETE': - response = requests.delete(url, headers=headers, params=params) - elif method == 'PATCH': - response = requests.patch(url, headers=headers, data=json.dumps(RequestsBody)) - else: - return {"error": "Unsupported HTTP method"} - - if response.status_code != 200 and response.status_code not in NormalCode: - return {"error": response.status_code} - - return response.json() - - except json.JSONDecodeError: - return {"error": response.text} - except RequestException as e: - return {"error": str(e)} +from ProjectCfg import DOMAIN +from SNAPI.CallServer import _make_request +from typing import List,Any,Optional,Dict +from SNAPI import * +###=========================活动API================================= def ActivityAPIs(cursor: str = '', filter: str = '', take: int = 20, debuginclude: str = '', Authorization: str = '') -> dict: """获取首页内容""" url = f"{DOMAIN}/activities" headers = {'accept': 'application/json', 'Authorization': Authorization} params = {"cursor": cursor, "filter": filter, "take": take, "debuginclude": debuginclude} - return _make_request('GET', url, headers, params) + return _make_request('GET', url, headers, params=params) -###=========================聊天API================================= -def ChatSummary(Authorization: str) -> dict: - """获取聊天摘要""" - url = f"{DOMAIN}/chat/summary" +###==========================领域的发现========================== +def GetDiscover(query: str = '', take: int = 20, offset: int = 0, Authorization: str = '') -> dict: + """获取发现""" + url = f"{DOMAIN}/discovery/realms" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {'query': query, 'take': take, 'offset': offset} + return _make_request('GET', url, headers, params=params) + +###==========================领域聊天========================== +def RealmChat(Authorization: str,slug:str ) -> dict: + """获取领域聊天""" + url = f"{DOMAIN}/realms/{slug}/chat" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('GET', url, headers) -def GetChatMessageAllInfo(Authorization: str, roomid: str, offset: int = 0, take: int = 20) -> dict: - """获取聊天消息""" - url = f"{DOMAIN}/chat/{roomid}/message" +###==========================尊贵的Solar Network 恒星计划订阅用户专属翻译功能======= :(呜呜呜,没钱订阅 + +def TranslationText(Authorization: str,text:str ,Tolang:str,FromLang:str) -> dict: + """翻译文本""" + url = f"{DOMAIN}/translation" headers = {'accept': 'application/json', 'Authorization': Authorization} - params = {"offset": offset, "take": take} - return _make_request('GET', url, headers, params) - -def SendChatMessage(Authorization: str, roomid: str,content: str,nonce: str,attachments_id: list,meta: dict,replied_message_id: str,forwarded_message_id: str) -> dict: - """发送聊天消息""" - RequestsBody={ - "content": content, - "nonce": nonce, - "attachments_id": attachments_id, - "meta": meta, - "replied_message_id": replied_message_id, - "forwarded_message_id": forwarded_message_id -} - url = f"{DOMAIN}/chat/{roomid}/message" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('POST', url, headers,RequestsBody=RequestsBody,NormalCode=[201]) - -def GetAMessageAllInfo(Authorization: str, roomid: str, messageid: str) -> dict: - """获取具体聊天消息所有信息""" - url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('GET', url, headers) - -def ModifyChatMessage(Authorization: str, roomid: str, messageid: str,content: str,nonce: str,attachments_id: list,meta: dict,replied_message_id: str,forwarded_message_id: str) -> dict: - """修改聊天消息""" - RequestsBody={ - "content": content, - "nonce": nonce, - "attachments_id": attachments_id, - "meta": meta, - "replied_message_id": replied_message_id, - "forwarded_message_id": forwarded_message_id -} - url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('PATCH', url, headers,RequestsBody=RequestsBody) - -def DeleteAMessageInfo(Authorization: str, roomid: str, messageid: str) -> dict: - """删除聊天消息基础信息。暂时作为保留""" - url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('DELETE', url, headers) - -def DeleteChatMessage(Authorization: str, roomid: str, messageid: str) -> dict: - """删除聊天消息""" - url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('DELETE', url, headers) - -def SyncChatMessage(Authorization: str, roomid: str, last_sync_timestamp: int) -> dict: - """同步聊天消息""" - url = f"{DOMAIN}/chat/{roomid}/sync" - RequestsBody={ - "last_sync_timestamp": last_sync_timestamp -} - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('POST', url, headers,RequestsBody=RequestsBody) - -###=========================聊天室API================================= - -def GetChatRoomInfo(Authorization: str, id: str) -> dict: - """获取聊天房间信息""" - url = f"{DOMAIN}/chat/{id}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('GET', url, headers) - -def ModifyChatRoomInfo(Authorization: str, id: str, name: str, description: str, picture_id: str, backgournd_id: str, realm_id: str, is_community: bool, is_public: bool) -> dict: - """修改聊天房间信息""" - RequestsBody={ - "name": name, - "description": description, - "picture_id": picture_id, - "background_id": backgournd_id, - "realm_id": realm_id, - "is_community": is_community, - "is_public": is_public, -} - url = f"{DOMAIN}/chat/{id}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('PATCH', url, headers,RequestsBody=RequestsBody) - -def DeleteChatRoom(Authorization: str, id: str) -> dict: - """删除聊天房间""" - url = f"{DOMAIN}/chat/{id}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('DELETE', url, headers,NormalCode=[204]) - -def GetChatList(Authorization: str) -> dict: - """获取聊天列表""" - url = f"{DOMAIN}/chat" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('GET', url, headers) - -def CreateChatWithRealm(Authorization: str, name: str, description: str, picture_id: str, backgournd_id: str, realm_id: str, is_community: bool, is_public: bool) -> dict: - """创建聊天房间""" - RequestsBody={ - "name": name, - "description": description, - "picture_id": picture_id, - "background_id": backgournd_id, - "realm_id": realm_id, - "is_community": is_community, - "is_public": is_public, -} - url = f"{DOMAIN}/chat" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('POST', url, headers,RequestsBody=RequestsBody,NormalCode=[201]) - -def GetChatToAccount(Authorization: str,accountid: str) -> dict: - """获取聊天对象用户信息""" - url = f"{DOMAIN}/chat/direct/{accountid}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('GET', url, headers) - -def GetChatRoomSelfInfo(Authorization: str,roomid:str) -> dict: - """获取聊天房间自己信息""" - url = f"{DOMAIN}/chat/{roomid}/members/me" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('GET', url, headers) - -def JoinChatRoom(Authorization: str,roomid:str) -> dict: - """加入聊天房间""" - url = f"{DOMAIN}/chat/{roomid}/members/me" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('POST', url, headers,NormalCode=[201]) - -def LeaveChatRoom(Authorization: str,roomid:str) -> dict: - """退出聊天房间""" - url = f"{DOMAIN}/chat/{roomid}/members/me" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('DELETE', url, headers,NormalCode=[204]) - -def GetChatRoomMembers(Authorization: str,roomid:str) -> dict: - """获取聊天房间成员""" - url = f"{DOMAIN}/chat/{roomid}/members" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('GET', url, headers) - -def SendChatInvites(Authorization: str,accountid: str,role: str) -> dict: - """发送聊天邀请""" - RequestsBody={ - "related_user_id": accountid, - "role": role -} - url = f"{DOMAIN}/chat/invites" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('POST', url, headers,RequestsBody=RequestsBody,NormalCode=[201]) - -def GetChatInvites(Authorization: str) -> dict: - """获取聊天邀请""" - url = f"{DOMAIN}/chat/invites" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('GET', url, headers) - -def AcceptChatInvite(Authorization: str,roomid:str) -> dict: - """接受聊天邀请""" - url = f"{DOMAIN}/chat/invites/{roomid}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('POST', url, headers) - -def DeclineChatInvite(Authorization: str,roomid:str) -> dict: - """拒绝聊天邀请""" - url = f"{DOMAIN}/chat/invites/{roomid}" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('DELETE', url, headers) - -def SetChatNotify(Authorization: str,roomid:str,notify_level: str,break_until: str) -> dict: - """设置聊天通知级别""" - RequestsBody={ - "notify_level": notify_level, - "break_until": break_until -} - url = f"{DOMAIN}/chat/{roomid}/members/me/notify" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('PATCH', url, headers,RequestsBody=RequestsBody,NormalCode=[201]) - -def SetChatMemberRole(Authorization: str,roomid:str,membersid: str,role: int) -> dict: - """设置聊天成员角色""" - url=f"{DOMAIN}/chat/{roomid}/members/{membersid}/role" - headers = {'accept': 'application/json', 'Authorization': Authorization} - return _make_request('PATCH', url, headers,RequestsBody=role,NormalCode=[201]) + params = {"to":Tolang,"from":FromLang} + return _make_request('POST', url, headers, params=params,request_body=text) +def GetServerVersion() -> dict: + """获取服务器版本""" + url = f"{DOMAIN}/version" + headers = {'accept': 'application/json'} + return _make_request('GET', url, headers) \ No newline at end of file diff --git a/core/SNAPI/CallServer.py b/core/SNAPI/CallServer.py new file mode 100644 index 0000000..99ab109 --- /dev/null +++ b/core/SNAPI/CallServer.py @@ -0,0 +1,30 @@ +import requests +import json +from requests.exceptions import RequestException +import platform + +UA = f"SolianForPythonApp/0.000.001 ({platform.system()})" + +def _make_request(method: str, url: str, headers: dict, params: dict = None, normal_codes: list = [200], request_body: dict = None) -> dict: + """内部辅助函数,用于发送HTTP请求并处理响应""" + headers['User-Agent'] = UA + try: + if method == 'GET': + response = requests.get(url, headers=headers, params=params) + elif method == 'POST': + response = requests.post(url, headers=headers, data=json.dumps(request_body)) + elif method == 'DELETE': + response = requests.delete(url, headers=headers, params=params) + elif method == 'PATCH': + response = requests.patch(url, headers=headers, data=json.dumps(request_body)) + else: + return {"error": "Unsupported HTTP method"} + + if response.status_code not in normal_codes: + return {"error": f"Unexpected status code: {response.status_code}"} + + return response.json() + except json.JSONDecodeError: + return {"error": response.text} + except RequestException as e: + return {"error": str(e)} \ No newline at end of file diff --git a/core/SNAPI/Chat.py b/core/SNAPI/Chat.py new file mode 100644 index 0000000..36757a3 --- /dev/null +++ b/core/SNAPI/Chat.py @@ -0,0 +1,199 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###=========================聊天API================================= +def ChatSummary(Authorization: str) -> dict: + """获取聊天摘要""" + url = f"{DOMAIN}/chat/summary" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetChatMessages(Authorization: str, room_id: str, offset: int = 0, take: int = 20) -> dict: + """获取聊天消息""" + url = f"{DOMAIN}/chat/{room_id}/messages" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def PostChatMessage(Authorization: str, room_id: str, content: str, nonce: Optional[str] = None, attachments_id: Optional[List[str]] = None, meta: Optional[Dict[str, Any]] = None, replied_message_id: Optional[str] = None, forwarded_message_id: Optional[str] = None) -> dict: + """发送聊天消息""" + body = { + "content": content, + "nonce": nonce, + "attachments_id": attachments_id, + "meta": meta, + "replied_message_id": replied_message_id, + "forwarded_message_id": forwarded_message_id, + } + url = f"{DOMAIN}/chat/{room_id}/messages" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetChatMessageById(Authorization: str, room_id: str, message_id: str) -> dict: + """获取具体聊天消息所有信息""" + url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchChatMessage(Authorization: str, room_id: str, message_id: str, content: Optional[str] = None, nonce: Optional[str] = None, attachments_id: Optional[List[str]] = None, meta: Optional[Dict[str, Any]] = None, replied_message_id: Optional[str] = None, forwarded_message_id: Optional[str] = None) -> dict: + """修改聊天消息""" + body = { + "content": content, + "nonce": nonce, + "attachments_id": attachments_id, + "meta": meta, + "replied_message_id": replied_message_id, + "forwarded_message_id": forwarded_message_id, + } + url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body) + +def DeleteChatMessage(Authorization: str, room_id: str, message_id: str) -> dict: + """删除聊天消息""" + url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def PostChatSync(Authorization: str, room_id: str, last_sync_timestamp: int) -> dict: + """同步聊天消息""" + body = {"last_sync_timestamp": last_sync_timestamp} + url = f"{DOMAIN}/chat/{room_id}/sync" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body) + +###=========================聊天室API================================= +def GetChatRoom(Authorization: str, id: str) -> dict: + """获取聊天房间信息""" + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchChatRoom(Authorization: str, id: str, name: Optional[str] = None, description: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None, realm_id: Optional[str] = None, is_community: Optional[bool] = None, is_public: Optional[bool] = None) -> dict: + """修改聊天房间信息""" + body = { + "name": name, + "description": description, + "picture_id": picture_id, + "background_id": background_id, + "realm_id": realm_id, + "is_community": is_community, + "is_public": is_public, + } + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body) + +def DeleteChatRoom(Authorization: str, id: str) -> dict: + """删除聊天房间""" + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetChatRooms(Authorization: str) -> dict: + """获取聊天列表""" + url = f"{DOMAIN}/chat" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostChatRoom(Authorization: str, name: str, description: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None, realm_id: Optional[str] = None, is_community: Optional[bool] = None, is_public: Optional[bool] = None) -> dict: + """创建聊天房间""" + body = { + "name": name, + "description": description, + "picture_id": picture_id, + "background_id": background_id, + "realm_id": realm_id, + "is_community": is_community, + "is_public": is_public, + } + url = f"{DOMAIN}/chat" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetDirectChat(Authorization: str, account_id: str) -> dict: + """获取聊天对象用户信息""" + url = f"{DOMAIN}/chat/direct/{account_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetChatRoomSelfInfo(Authorization: str, room_id: str) -> dict: + """获取聊天房间自己信息""" + url = f"{DOMAIN}/chat/{room_id}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def JoinChatRoom(Authorization: str, room_id: str) -> dict: + """加入聊天房间""" + url = f"{DOMAIN}/chat/{room_id}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def LeaveChatRoom(Authorization: str, room_id: str) -> dict: + """退出聊天房间""" + url = f"{DOMAIN}/chat/{room_id}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetChatRoomMembers(Authorization: str, room_id: str, offset: int = 0, take: int = 20, with_status: bool = False) -> dict: + """获取聊天房间成员""" + params = { + "offset": offset, + "take": take, + "withStatus": with_status, + } + url = f"{DOMAIN}/chat/{room_id}/members" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers, params=params) + +def PostChatMemberInvite(Authorization: str, room_id: str, related_user_id: str, role: int) -> dict: + """发送聊天邀请""" + body = { + "related_user_id": related_user_id, + "role": role, + } + url = f"{DOMAIN}/chat/invites/{room_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetChatInvites(Authorization: str) -> dict: + """获取聊天邀请""" + url = f"{DOMAIN}/chat/invites" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def AcceptChatInvite(Authorization: str, room_id: str) -> dict: + """接受聊天邀请""" + url = f"{DOMAIN}/chat/invites/{room_id}/accept" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers) + +def DeclineChatInvite(Authorization: str, room_id: str) -> dict: + """拒绝聊天邀请""" + url = f"{DOMAIN}/chat/invites/{room_id}/decline" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers) + +def SetChatNotify(Authorization: str, room_id: str, notify_level: int, break_until: Optional[str] = None) -> dict: + """设置聊天通知级别""" + body = { + "notify_level": notify_level, + "break_until": break_until, + } + url = f"{DOMAIN}/chat/{room_id}/members/me/notify" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) + +def SetChatMemberRole(Authorization: str, room_id: str, member_id: str, role: int) -> dict: + """设置聊天成员角色""" + body = {"role": role} + url = f"{DOMAIN}/chat/{room_id}/members/{member_id}/role" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) + +def KickChatMember(Authorization: str, room_id: str, member_id: str) -> dict: + """踢出聊天成员""" + url = f"{DOMAIN}/chat/{room_id}/members/{member_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) diff --git a/core/SNAPI/Poll.py b/core/SNAPI/Poll.py new file mode 100644 index 0000000..fe2a5f3 --- /dev/null +++ b/core/SNAPI/Poll.py @@ -0,0 +1,69 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###==========================投票功能============================ +def GetPoll(poll_id: str, Authorization: str) -> dict: + """获取投票""" + url = f"{DOMAIN}/polls/{poll_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPoll(Authorization: str, title: Optional[str] = None, description: Optional[str] = None, ended_at: Optional[str] = None, is_anonymous: Optional[bool] = None, questions: Optional[List[Dict[str, Any]]] = None) -> dict: + """创建投票""" + body = { + "title": title, + "description": description, + "ended_at": ended_at, + "is_anonymous": is_anonymous, + "questions": questions, + } + url = f"{DOMAIN}/polls" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePoll(Authorization: str, poll_id: str) -> dict: + """删除投票""" + url = f"{DOMAIN}/polls/{poll_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def AnswerPoll(Authorization: str, poll_id: str, answer: Dict[str, Any]) -> dict: + """回答投票""" + body = {"answer": answer} + url = f"{DOMAIN}/polls/{poll_id}/answer" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePollAnswer(Authorization: str, poll_id: str) -> dict: + """删除投票回答""" + url = f"{DOMAIN}/polls/{poll_id}/answer" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPollFeedback(Authorization: str, poll_id: str, offset: int = 0, take: int = 20) -> dict: + """获取投票反馈""" + url = f"{DOMAIN}/polls/{poll_id}/feedback" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {'offset': offset, 'take': take} + return _make_request('GET', url, headers, params=params) + +def GetMyPolls(Authorization: str, pub: Optional[str] = None, active: bool = False, offset: int = 0, take: int = 20) -> dict: + """获取我的投票""" + url = f"{DOMAIN}/polls/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {'offset': offset, 'take': take, 'pub': pub, 'active': active} + return _make_request('GET', url, headers, params=params) + +def PatchPoll(Authorization: str, poll_id: str, title: Optional[str] = None, description: Optional[str] = None, ended_at: Optional[str] = None, is_anonymous: Optional[bool] = None, questions: Optional[List[Dict[str, Any]]] = None) -> dict: + """修改投票""" + body = { + "title": title, + "description": description, + "ended_at": ended_at, + "is_anonymous": is_anonymous, + "questions": questions, + } + url = f"{DOMAIN}/polls/{poll_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) diff --git a/core/SNAPI/Post.py b/core/SNAPI/Post.py new file mode 100644 index 0000000..9e1a2e8 --- /dev/null +++ b/core/SNAPI/Post.py @@ -0,0 +1,148 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict +###==========================帖子管理============================ +def GetFeaturedPosts(Authorization: str) -> dict: + """获取精选帖子""" + url = f"{DOMAIN}/posts/featured" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPosts(Authorization: str, offset: int = 0, take: int = 20, pub: Optional[str] = None, realm: Optional[str] = None, post_type: Optional[int] = None, categories: Optional[List[str]] = None, tags: Optional[List[str]] = None, query: Optional[str] = None, vector: bool = False, media: bool = False, shuffle: bool = False, replies: Optional[bool] = None, pinned: Optional[bool] = None) -> dict: + """获取帖子列表""" + url = f"{DOMAIN}/posts" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = { + "offset": offset, + "take": take, + "pub": pub, + "realm": realm, + "type": post_type, + "categories": categories, + "tags": tags, + "query": query, + "vector": vector, + "media": media, + "shuffle": shuffle, + "replies": replies, + "pinned": pinned, + } + return _make_request('GET', url, headers, params=params) + +def PostPost(Authorization: str, title: str, description: str, slug: str, content: str, visibility: int, post_type: int, embed_view: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, categories: Optional[List[str]] = None, attachments: Optional[List[str]] = None, published_at: Optional[str] = None, replied_post_id: Optional[str] = None, forwarded_post_id: Optional[str] = None, realm_id: Optional[str] = None, poll_id: Optional[str] = None) -> dict: + """创建帖子""" + body = { + "title": title, + "description": description, + "slug": slug, + "content": content, + "visibility": visibility, + "type": post_type, + "embed_view": embed_view, + "tags": tags, + "categories": categories, + "attachments": attachments, + "published_at": published_at, + "replied_post_id": replied_post_id, + "forwarded_post_id": forwarded_post_id, + "realm_id": realm_id, + "poll_id": poll_id, + } + url = f"{DOMAIN}/posts" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPostById(Authorization: str, post_id: str) -> dict: + """获取帖子详情""" + url = f"{DOMAIN}/posts/{post_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchPost(Authorization: str, post_id: str, title: Optional[str] = None, description: Optional[str] = None, slug: Optional[str] = None, content: Optional[str] = None, visibility: Optional[int] = None, post_type: Optional[int] = None, embed_view: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, categories: Optional[List[str]] = None, attachments: Optional[List[str]] = None, published_at: Optional[str] = None, replied_post_id: Optional[str] = None, forwarded_post_id: Optional[str] = None, realm_id: Optional[str] = None, poll_id: Optional[str] = None) -> dict: + """修改帖子""" + body = { + "title": title, + "description": description, + "slug": slug, + "content": content, + "visibility": visibility, + "type": post_type, + "embed_view": embed_view, + "tags": tags, + "categories": categories, + "attachments": attachments, + "published_at": published_at, + "replied_post_id": replied_post_id, + "forwarded_post_id": forwarded_post_id, + "realm_id": realm_id, + "poll_id": poll_id, + } + url = f"{DOMAIN}/posts/{post_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body) + +def DeletePost(Authorization: str, post_id: str) -> dict: + """删除帖子""" + url = f"{DOMAIN}/posts/{post_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPostReactions(Authorization: str, post_id: str, symbol: Optional[str] = None, offset: int = 0, take: int = 20) -> dict: + """获取帖子反应""" + url = f"{DOMAIN}/posts/{post_id}/reactions" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"symbol": symbol, "offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def PostPostReaction(Authorization: str, post_id: str, symbol: str, attitude: int) -> dict: + """发送帖子反应""" + body = {"symbol": symbol, "attitude": attitude} + url = f"{DOMAIN}/posts/{post_id}/reactions" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetFeaturedPostReplies(Authorization: str, post_id: str) -> dict: + """获取精选帖子回复""" + url = f"{DOMAIN}/posts/{post_id}/replies/featured" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPinnedPostReplies(Authorization: str, post_id: str) -> dict: + """获取置顶帖子回复""" + url = f"{DOMAIN}/posts/{post_id}/replies/pinned" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPostReplies(Authorization: str, post_id: str, offset: int = 0, take: int = 20) -> dict: + """获取帖子回复""" + url = f"{DOMAIN}/posts/{post_id}/replies" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def GetPostAwards(Authorization: str, post_id: str, offset: int = 0, take: int = 20) -> dict: + """获取帖子奖励""" + url = f"{DOMAIN}/posts/{post_id}/awards" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def PostPostAward(Authorization: str, post_id: str, amount: float, attitude: int, message: Optional[str] = None) -> dict: + """发送帖子奖励""" + body = {"amount": amount, "attitude": attitude, "message": message} + url = f"{DOMAIN}/posts/{post_id}/awards" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def PostPinPost(Authorization: str, post_id: str, mode: int) -> dict: + """置顶帖子""" + body = {"mode": mode} + url = f"{DOMAIN}/posts/{post_id}/pin" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePinPost(Authorization: str, post_id: str) -> dict: + """取消置顶帖子""" + url = f"{DOMAIN}/posts/{post_id}/pin" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) diff --git a/core/SNAPI/PostCategory.py b/core/SNAPI/PostCategory.py new file mode 100644 index 0000000..9148425 --- /dev/null +++ b/core/SNAPI/PostCategory.py @@ -0,0 +1,42 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###==========================帖子分类管理============================ +def GetPostCategories(Authorization: str, query: Optional[str] = None, offset: int = 0, take: int = 20, order: Optional[str] = None) -> dict: + """获取帖子分类""" + url = f"{DOMAIN}/posts/categories" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"query": query, "offset": offset, "take": take, "order": order} + return _make_request('GET', url, headers, params=params) + +def PostPostCategory(Authorization: str, slug: Optional[str] = None, name: Optional[str] = None) -> dict: + """创建帖子分类""" + body = {"slug": slug, "name": name} + url = f"{DOMAIN}/posts/categories" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPostCategoryBySlug(Authorization: str, slug: str) -> dict: + """获取帖子分类详情""" + url = f"{DOMAIN}/posts/categories/{slug}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostSubscribeCategory(Authorization: str, slug: str) -> dict: + """订阅帖子分类""" + url = f"{DOMAIN}/posts/categories/{slug}/subscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def PostUnsubscribeCategory(Authorization: str, slug: str) -> dict: + """取消订阅帖子分类""" + url = f"{DOMAIN}/posts/categories/{slug}/unsubscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def GetCategorySubscription(Authorization: str, slug: str) -> dict: + """获取分类订阅状态""" + url = f"{DOMAIN}/posts/categories/{slug}/subscription" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) diff --git a/core/SNAPI/PostTag.py b/core/SNAPI/PostTag.py new file mode 100644 index 0000000..d021bd1 --- /dev/null +++ b/core/SNAPI/PostTag.py @@ -0,0 +1,42 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###==========================帖子标签管理============================ +def GetPostTags(Authorization: str, query: Optional[str] = None, offset: int = 0, take: int = 20, order: Optional[str] = None) -> dict: + """获取帖子标签""" + url = f"{DOMAIN}/posts/tags" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"query": query, "offset": offset, "take": take, "order": order} + return _make_request('GET', url, headers, params=params) + +def PostTag(Authorization: str, slug: Optional[str] = None, name: Optional[str] = None) -> dict: + """创建帖子标签""" + body = {"slug": slug, "name": name} + url = f"{DOMAIN}/posts/tags" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetTagBySlug(Authorization: str, slug: str) -> dict: + """获取标签详情""" + url = f"{DOMAIN}/posts/tags/{slug}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostSubscribeTag(Authorization: str, slug: str) -> dict: + """订阅标签""" + url = f"{DOMAIN}/posts/tags/{slug}/subscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def PostUnsubscribeTag(Authorization: str, slug: str) -> dict: + """取消订阅标签""" + url = f"{DOMAIN}/posts/tags/{slug}/unsubscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def GetTagSubscription(Authorization: str, slug: str) -> dict: + """获取标签订阅状态""" + url = f"{DOMAIN}/posts/tags/{slug}/subscription" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) diff --git a/core/SNAPI/Publisher.py b/core/SNAPI/Publisher.py new file mode 100644 index 0000000..464269d --- /dev/null +++ b/core/SNAPI/Publisher.py @@ -0,0 +1,149 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict +###==========================发布者管理============================ +def GetPublisher(Authorization: str, name: str) -> dict: + """获取发布者信息""" + url = f"{DOMAIN}/publishers/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict: + """修改发布者信息""" + body = { + "nick": nick, + "bio": bio, + "picture_id": picture_id, + "background_id": background_id, + } + url = f"{DOMAIN}/publishers/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) + +def DeletePublisher(Authorization: str, name: str) -> dict: + """删除发布者""" + url = f"{DOMAIN}/publishers/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPublisherStats(Authorization: str, name: str) -> dict: + """获取发布者统计信息""" + url = f"{DOMAIN}/publishers/{name}/stats" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPublishersOfAccountId(Authorization: str, account_id: str) -> dict: + """获取指定账户的发布者""" + url = f"{DOMAIN}/publishers/of/{account_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPublishers(Authorization: str) -> dict: + """获取所有发布者""" + url = f"{DOMAIN}/publishers" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPublisherInvite(Authorization: str, name: str, related_user_id: str, role: int) -> dict: + """发送发布者邀请""" + body = {"related_user_id": related_user_id, "role": role} + url = f"{DOMAIN}/publishers/invites/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPublisherInvites(Authorization: str, name: str) -> dict: + """获取发布者邀请""" + url = f"{DOMAIN}/publishers/invites/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def AcceptPublisherInvite(Authorization: str, name: str) -> dict: + """接受发布者邀请""" + url = f"{DOMAIN}/publishers/invites/{name}/accept" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def DeclinePublisherInvite(Authorization: str, name: str) -> dict: + """拒绝发布者邀请""" + url = f"{DOMAIN}/publishers/invites/{name}/decline" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def DeletePublisherMember(Authorization: str, name: str, member_id: str) -> dict: + """删除发布者成员""" + url = f"{DOMAIN}/publishers/{name}/members/{member_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def PostIndividualPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict: + """创建个人发布者""" + body = { + "name": name, + "nick": nick, + "bio": bio, + "picture_id": picture_id, + "background_id": background_id, + } + url = f"{DOMAIN}/publishers/individual" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def PostOrganizationPublisher(Authorization: str, name: str, realm_slug: str) -> dict: + """创建组织发布者""" + body = {"name": name, "realm_slug": realm_slug} + url = f"{DOMAIN}/publishers/organization/{realm_slug}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPublisherMembers(Authorization: str, name: str, offset: int = 0, take: int = 20) -> dict: + """获取发布者成员""" + url = f"{DOMAIN}/publishers/{name}/members" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def GetMyPublisherMember(Authorization: str, name: str) -> dict: + """获取我的发布者成员信息""" + url = f"{DOMAIN}/publishers/{name}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPublisherFeature(Authorization: str, name: str, flag: str, expired_at: Optional[str] = None) -> dict: + """创建发布者特性""" + body = {"flag": flag, "expired_at": expired_at} + url = f"{DOMAIN}/publishers/{name}/features" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePublisherFeature(Authorization: str, name: str, flag: str) -> dict: + """删除发布者特性""" + url = f"{DOMAIN}/publishers/{name}/features/{flag}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPublisherSubscriptions(Authorization: str) -> dict: + """获取发布者订阅状态""" + url = f"{DOMAIN}/publishers/subscriptions" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPublisherSubscription(Authorization: str, name: str) -> dict: + """获取特定发布者的订阅状态""" + url = f"{DOMAIN}/api/publishers/{name}/subscription" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPublisherSubscribe(Authorization: str, name: str, subscribeRequest: Optional[dict] = None,tier:int=0) -> dict: + """订阅特定发布者""" + RequestsBody={ + "tier":tier + } + url = f"{DOMAIN}/api/publishers/{name}/subscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=subscribeRequest) + +def PostPublisherUnsubscribe(Authorization: str, name: str) -> dict: + """取消订阅特定发布者""" + url = f"{DOMAIN}/api/publishers/{name}/unsubscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers)