diff --git a/core/SNAPI/Chat.py b/core/SNAPI/Chat.py new file mode 100644 index 0000000..39dec33 --- /dev/null +++ b/core/SNAPI/Chat.py @@ -0,0 +1,199 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###=========================聊天API================================= +def ChatSummary(Authorization: str) -> dict: + """获取聊天摘要""" + url = f"{DOMAIN}/chat/summary" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetChatMessages(Authorization: str, room_id: str, offset: int = 0, take: int = 20) -> dict: + """获取聊天消息""" + url = f"{DOMAIN}/chat/{room_id}/messages" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def PostChatMessage(Authorization: str, room_id: str, content: str, nonce: Optional[str] = None, attachments_id: Optional[List[str]] = None, meta: Optional[Dict[str, Any]] = None, replied_message_id: Optional[str] = None, forwarded_message_id: Optional[str] = None) -> dict: + """发送聊天消息""" + body = { + "content": content, + "nonce": nonce, + "attachments_id": attachments_id, + "meta": meta, + "replied_message_id": replied_message_id, + "forwarded_message_id": forwarded_message_id, + } + url = f"{DOMAIN}/chat/{room_id}/messages" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetChatMessageById(Authorization: str, room_id: str, message_id: str) -> dict: + """获取具体聊天消息所有信息""" + url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchChatMessage(Authorization: str, room_id: str, message_id: str, content: Optional[str] = None, nonce: Optional[str] = None, attachments_id: Optional[List[str]] = None, meta: Optional[Dict[str, Any]] = None, replied_message_id: Optional[str] = None, forwarded_message_id: Optional[str] = None) -> dict: + """修改聊天消息""" + body = { + "content": content, + "nonce": nonce, + "attachments_id": attachments_id, + "meta": meta, + "replied_message_id": replied_message_id, + "forwarded_message_id": forwarded_message_id, + } + url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body) + +def DeleteChatMessage(Authorization: str, room_id: str, message_id: str) -> dict: + """删除聊天消息""" + url = f"{DOMAIN}/chat/{room_id}/messages/{message_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def PostChatSync(Authorization: str, room_id: str, last_sync_timestamp: int) -> dict: + """同步聊天消息""" + body = {"last_sync_timestamp": last_sync_timestamp} + url = f"{DOMAIN}/chat/{room_id}/sync" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body) + +###=========================聊天室API================================= +def GetChatRoom(Authorization: str, id: str) -> dict: + """获取聊天房间信息""" + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchChatRoom(Authorization: str, id: str, name: Optional[str] = None, description: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None, realm_id: Optional[str] = None, is_community: Optional[bool] = None, is_public: Optional[bool] = None) -> dict: + """修改聊天房间信息""" + body = { + "name": name, + "description": description, + "picture_id": picture_id, + "background_id": background_id, + "realm_id": realm_id, + "is_community": is_community, + "is_public": is_public, + } + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body) + +def DeleteChatRoom(Authorization: str, id: str) -> dict: + """删除聊天房间""" + url = f"{DOMAIN}/chat/{id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetChatRooms(Authorization: str) -> dict: + """获取聊天列表""" + url = f"{DOMAIN}/chat" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostChatRoom(Authorization: str, name: str, description: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None, realm_id: Optional[str] = None, is_community: Optional[bool] = None, is_public: Optional[bool] = None) -> dict: + """创建聊天房间""" + body = { + "name": name, + "description": description, + "picture_id": picture_id, + "background_id": background_id, + "realm_id": realm_id, + "is_community": is_community, + "is_public": is_public, + } + url = f"{DOMAIN}/chat" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetDirectChat(Authorization: str, account_id: str) -> dict: + """获取聊天对象用户信息""" + url = f"{DOMAIN}/chat/direct/{account_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetChatRoomSelfInfo(Authorization: str, room_id: str) -> dict: + """获取聊天房间自己信息""" + url = f"{DOMAIN}/chat/{room_id}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def JoinChatRoom(Authorization: str, room_id: str) -> dict: + """加入聊天房间""" + url = f"{DOMAIN}/chat/{room_id}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def LeaveChatRoom(Authorization: str, room_id: str) -> dict: + """退出聊天房间""" + url = f"{DOMAIN}/chat/{room_id}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetChatRoomMembers(Authorization: str, room_id: str, offset: int = 0, take: int = 20, with_status: bool = False) -> dict: + """获取聊天房间成员""" + params = { + "offset": offset, + "take": take, + "withStatus": with_status, + } + url = f"{DOMAIN}/chat/{room_id}/members" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers, params=params) + +def PostChatMemberInvite(Authorization: str, room_id: str, related_user_id: str, role: int) -> dict: + """发送聊天邀请""" + body = { + "related_user_id": related_user_id, + "role": role, + } + url = f"{DOMAIN}/chat/invites/{room_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetChatInvites(Authorization: str) -> dict: + """获取聊天邀请""" + url = f"{DOMAIN}/chat/invites" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def AcceptChatInvite(Authorization: str, room_id: str) -> dict: + """接受聊天邀请""" + url = f"{DOMAIN}/chat/invites/{room_id}/accept" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers) + +def DeclineChatInvite(Authorization: str, room_id: str) -> dict: + """拒绝聊天邀请""" + url = f"{DOMAIN}/chat/invites/{room_id}/decline" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers) + +def SetChatNotify(Authorization: str, room_id: str, notify_level: int, break_until: Optional[str] = None) -> dict: + """设置聊天通知级别""" + body = { + "notify_level": notify_level, + "break_until": break_until, + } + url = f"{DOMAIN}/chat/{room_id}/members/me/notify" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) + +def SetChatMemberRole(Authorization: str, room_id: str, member_id: str, role: int) -> dict: + """设置聊天成员角色""" + body = {"role": role} + url = f"{DOMAIN}/chat/{room_id}/members/{member_id}/role" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) + +def KickChatMember(Authorization: str, room_id: str, member_id: str) -> dict: + """踢出聊天成员""" + url = f"{DOMAIN}/chat/{room_id}/members/{member_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) diff --git a/core/SNAPI/Poll.py b/core/SNAPI/Poll.py new file mode 100644 index 0000000..a36898b --- /dev/null +++ b/core/SNAPI/Poll.py @@ -0,0 +1,69 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###==========================投票功能============================ +def GetPoll(poll_id: str, Authorization: str) -> dict: + """获取投票""" + url = f"{DOMAIN}/polls/{poll_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPoll(Authorization: str, title: Optional[str] = None, description: Optional[str] = None, ended_at: Optional[str] = None, is_anonymous: Optional[bool] = None, questions: Optional[List[Dict[str, Any]]] = None) -> dict: + """创建投票""" + body = { + "title": title, + "description": description, + "ended_at": ended_at, + "is_anonymous": is_anonymous, + "questions": questions, + } + url = f"{DOMAIN}/polls" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePoll(Authorization: str, poll_id: str) -> dict: + """删除投票""" + url = f"{DOMAIN}/polls/{poll_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def AnswerPoll(Authorization: str, poll_id: str, answer: Dict[str, Any]) -> dict: + """回答投票""" + body = {"answer": answer} + url = f"{DOMAIN}/polls/{poll_id}/answer" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePollAnswer(Authorization: str, poll_id: str) -> dict: + """删除投票回答""" + url = f"{DOMAIN}/polls/{poll_id}/answer" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPollFeedback(Authorization: str, poll_id: str, offset: int = 0, take: int = 20) -> dict: + """获取投票反馈""" + url = f"{DOMAIN}/polls/{poll_id}/feedback" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {'offset': offset, 'take': take} + return _make_request('GET', url, headers, params=params) + +def GetMyPolls(Authorization: str, pub: Optional[str] = None, active: bool = False, offset: int = 0, take: int = 20) -> dict: + """获取我的投票""" + url = f"{DOMAIN}/polls/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {'offset': offset, 'take': take, 'pub': pub, 'active': active} + return _make_request('GET', url, headers, params=params) + +def PatchPoll(Authorization: str, poll_id: str, title: Optional[str] = None, description: Optional[str] = None, ended_at: Optional[str] = None, is_anonymous: Optional[bool] = None, questions: Optional[List[Dict[str, Any]]] = None) -> dict: + """修改投票""" + body = { + "title": title, + "description": description, + "ended_at": ended_at, + "is_anonymous": is_anonymous, + "questions": questions, + } + url = f"{DOMAIN}/polls/{poll_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) diff --git a/core/SNAPI/Post.py b/core/SNAPI/Post.py new file mode 100644 index 0000000..a75bd8a --- /dev/null +++ b/core/SNAPI/Post.py @@ -0,0 +1,148 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict +###==========================帖子管理============================ +def GetFeaturedPosts(Authorization: str) -> dict: + """获取精选帖子""" + url = f"{DOMAIN}/posts/featured" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPosts(Authorization: str, offset: int = 0, take: int = 20, pub: Optional[str] = None, realm: Optional[str] = None, post_type: Optional[int] = None, categories: Optional[List[str]] = None, tags: Optional[List[str]] = None, query: Optional[str] = None, vector: bool = False, media: bool = False, shuffle: bool = False, replies: Optional[bool] = None, pinned: Optional[bool] = None) -> dict: + """获取帖子列表""" + url = f"{DOMAIN}/posts" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = { + "offset": offset, + "take": take, + "pub": pub, + "realm": realm, + "type": post_type, + "categories": categories, + "tags": tags, + "query": query, + "vector": vector, + "media": media, + "shuffle": shuffle, + "replies": replies, + "pinned": pinned, + } + return _make_request('GET', url, headers, params=params) + +def PostPost(Authorization: str, title: str, description: str, slug: str, content: str, visibility: int, post_type: int, embed_view: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, categories: Optional[List[str]] = None, attachments: Optional[List[str]] = None, published_at: Optional[str] = None, replied_post_id: Optional[str] = None, forwarded_post_id: Optional[str] = None, realm_id: Optional[str] = None, poll_id: Optional[str] = None) -> dict: + """创建帖子""" + body = { + "title": title, + "description": description, + "slug": slug, + "content": content, + "visibility": visibility, + "type": post_type, + "embed_view": embed_view, + "tags": tags, + "categories": categories, + "attachments": attachments, + "published_at": published_at, + "replied_post_id": replied_post_id, + "forwarded_post_id": forwarded_post_id, + "realm_id": realm_id, + "poll_id": poll_id, + } + url = f"{DOMAIN}/posts" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPostById(Authorization: str, post_id: str) -> dict: + """获取帖子详情""" + url = f"{DOMAIN}/posts/{post_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchPost(Authorization: str, post_id: str, title: Optional[str] = None, description: Optional[str] = None, slug: Optional[str] = None, content: Optional[str] = None, visibility: Optional[int] = None, post_type: Optional[int] = None, embed_view: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, categories: Optional[List[str]] = None, attachments: Optional[List[str]] = None, published_at: Optional[str] = None, replied_post_id: Optional[str] = None, forwarded_post_id: Optional[str] = None, realm_id: Optional[str] = None, poll_id: Optional[str] = None) -> dict: + """修改帖子""" + body = { + "title": title, + "description": description, + "slug": slug, + "content": content, + "visibility": visibility, + "type": post_type, + "embed_view": embed_view, + "tags": tags, + "categories": categories, + "attachments": attachments, + "published_at": published_at, + "replied_post_id": replied_post_id, + "forwarded_post_id": forwarded_post_id, + "realm_id": realm_id, + "poll_id": poll_id, + } + url = f"{DOMAIN}/posts/{post_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body) + +def DeletePost(Authorization: str, post_id: str) -> dict: + """删除帖子""" + url = f"{DOMAIN}/posts/{post_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPostReactions(Authorization: str, post_id: str, symbol: Optional[str] = None, offset: int = 0, take: int = 20) -> dict: + """获取帖子反应""" + url = f"{DOMAIN}/posts/{post_id}/reactions" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"symbol": symbol, "offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def PostPostReaction(Authorization: str, post_id: str, symbol: str, attitude: int) -> dict: + """发送帖子反应""" + body = {"symbol": symbol, "attitude": attitude} + url = f"{DOMAIN}/posts/{post_id}/reactions" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetFeaturedPostReplies(Authorization: str, post_id: str) -> dict: + """获取精选帖子回复""" + url = f"{DOMAIN}/posts/{post_id}/replies/featured" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPinnedPostReplies(Authorization: str, post_id: str) -> dict: + """获取置顶帖子回复""" + url = f"{DOMAIN}/posts/{post_id}/replies/pinned" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPostReplies(Authorization: str, post_id: str, offset: int = 0, take: int = 20) -> dict: + """获取帖子回复""" + url = f"{DOMAIN}/posts/{post_id}/replies" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def GetPostAwards(Authorization: str, post_id: str, offset: int = 0, take: int = 20) -> dict: + """获取帖子奖励""" + url = f"{DOMAIN}/posts/{post_id}/awards" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def PostPostAward(Authorization: str, post_id: str, amount: float, attitude: int, message: Optional[str] = None) -> dict: + """发送帖子奖励""" + body = {"amount": amount, "attitude": attitude, "message": message} + url = f"{DOMAIN}/posts/{post_id}/awards" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def PostPinPost(Authorization: str, post_id: str, mode: int) -> dict: + """置顶帖子""" + body = {"mode": mode} + url = f"{DOMAIN}/posts/{post_id}/pin" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePinPost(Authorization: str, post_id: str) -> dict: + """取消置顶帖子""" + url = f"{DOMAIN}/posts/{post_id}/pin" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) diff --git a/core/SNAPI/PostCategory.py b/core/SNAPI/PostCategory.py new file mode 100644 index 0000000..9eadc2d --- /dev/null +++ b/core/SNAPI/PostCategory.py @@ -0,0 +1,42 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###==========================帖子分类管理============================ +def GetPostCategories(Authorization: str, query: Optional[str] = None, offset: int = 0, take: int = 20, order: Optional[str] = None) -> dict: + """获取帖子分类""" + url = f"{DOMAIN}/posts/categories" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"query": query, "offset": offset, "take": take, "order": order} + return _make_request('GET', url, headers, params=params) + +def PostPostCategory(Authorization: str, slug: Optional[str] = None, name: Optional[str] = None) -> dict: + """创建帖子分类""" + body = {"slug": slug, "name": name} + url = f"{DOMAIN}/posts/categories" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPostCategoryBySlug(Authorization: str, slug: str) -> dict: + """获取帖子分类详情""" + url = f"{DOMAIN}/posts/categories/{slug}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostSubscribeCategory(Authorization: str, slug: str) -> dict: + """订阅帖子分类""" + url = f"{DOMAIN}/posts/categories/{slug}/subscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def PostUnsubscribeCategory(Authorization: str, slug: str) -> dict: + """取消订阅帖子分类""" + url = f"{DOMAIN}/posts/categories/{slug}/unsubscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def GetCategorySubscription(Authorization: str, slug: str) -> dict: + """获取分类订阅状态""" + url = f"{DOMAIN}/posts/categories/{slug}/subscription" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) diff --git a/core/SNAPI/PostTag.py b/core/SNAPI/PostTag.py new file mode 100644 index 0000000..bb7fe2d --- /dev/null +++ b/core/SNAPI/PostTag.py @@ -0,0 +1,42 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict + +###==========================帖子标签管理============================ +def GetPostTags(Authorization: str, query: Optional[str] = None, offset: int = 0, take: int = 20, order: Optional[str] = None) -> dict: + """获取帖子标签""" + url = f"{DOMAIN}/posts/tags" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"query": query, "offset": offset, "take": take, "order": order} + return _make_request('GET', url, headers, params=params) + +def PostTag(Authorization: str, slug: Optional[str] = None, name: Optional[str] = None) -> dict: + """创建帖子标签""" + body = {"slug": slug, "name": name} + url = f"{DOMAIN}/posts/tags" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetTagBySlug(Authorization: str, slug: str) -> dict: + """获取标签详情""" + url = f"{DOMAIN}/posts/tags/{slug}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostSubscribeTag(Authorization: str, slug: str) -> dict: + """订阅标签""" + url = f"{DOMAIN}/posts/tags/{slug}/subscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def PostUnsubscribeTag(Authorization: str, slug: str) -> dict: + """取消订阅标签""" + url = f"{DOMAIN}/posts/tags/{slug}/unsubscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def GetTagSubscription(Authorization: str, slug: str) -> dict: + """获取标签订阅状态""" + url = f"{DOMAIN}/posts/tags/{slug}/subscription" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers)