diff --git a/core/SNAPI/CallServer.py b/core/SNAPI/CallServer.py new file mode 100644 index 0000000..5653fbb --- /dev/null +++ b/core/SNAPI/CallServer.py @@ -0,0 +1,30 @@ +import requests +import json +from requests.exceptions import RequestException +import platform + +UA = f"SolianForPythonApp/0.000.001 ({platform.system()})" + +def _make_request(method: str, url: str, headers: dict, params: dict = None, normal_codes: list = [200], request_body: dict = None) -> dict: + """内部辅助函数,用于发送HTTP请求并处理响应""" + headers['User-Agent'] = UA + try: + if method == 'GET': + response = requests.get(url, headers=headers, params=params) + elif method == 'POST': + response = requests.post(url, headers=headers, data=json.dumps(request_body)) + elif method == 'DELETE': + response = requests.delete(url, headers=headers, params=params) + elif method == 'PATCH': + response = requests.patch(url, headers=headers, data=json.dumps(request_body)) + else: + return {"error": "Unsupported HTTP method"} + + if response.status_code not in normal_codes: + return {"error": f"Unexpected status code: {response.status_code}"} + + return response.json() + except json.JSONDecodeError: + return {"error": response.text} + except RequestException as e: + return {"error": str(e)} \ No newline at end of file diff --git a/core/SNAPI/Publisher.py b/core/SNAPI/Publisher.py new file mode 100644 index 0000000..54baa59 --- /dev/null +++ b/core/SNAPI/Publisher.py @@ -0,0 +1,149 @@ +from ProjectCfg import DOMAIN +from CallServer import _make_request +from typing import List,Any,Optional,Dict +###==========================发布者管理============================ +def GetPublisher(Authorization: str, name: str) -> dict: + """获取发布者信息""" + url = f"{DOMAIN}/publishers/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PatchPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict: + """修改发布者信息""" + body = { + "nick": nick, + "bio": bio, + "picture_id": picture_id, + "background_id": background_id, + } + url = f"{DOMAIN}/publishers/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201]) + +def DeletePublisher(Authorization: str, name: str) -> dict: + """删除发布者""" + url = f"{DOMAIN}/publishers/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPublisherStats(Authorization: str, name: str) -> dict: + """获取发布者统计信息""" + url = f"{DOMAIN}/publishers/{name}/stats" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPublishersOfAccountId(Authorization: str, account_id: str) -> dict: + """获取指定账户的发布者""" + url = f"{DOMAIN}/publishers/of/{account_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPublishers(Authorization: str) -> dict: + """获取所有发布者""" + url = f"{DOMAIN}/publishers" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPublisherInvite(Authorization: str, name: str, related_user_id: str, role: int) -> dict: + """发送发布者邀请""" + body = {"related_user_id": related_user_id, "role": role} + url = f"{DOMAIN}/publishers/invites/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPublisherInvites(Authorization: str, name: str) -> dict: + """获取发布者邀请""" + url = f"{DOMAIN}/publishers/invites/{name}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def AcceptPublisherInvite(Authorization: str, name: str) -> dict: + """接受发布者邀请""" + url = f"{DOMAIN}/publishers/invites/{name}/accept" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def DeclinePublisherInvite(Authorization: str, name: str) -> dict: + """拒绝发布者邀请""" + url = f"{DOMAIN}/publishers/invites/{name}/decline" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, normal_codes=[201]) + +def DeletePublisherMember(Authorization: str, name: str, member_id: str) -> dict: + """删除发布者成员""" + url = f"{DOMAIN}/publishers/{name}/members/{member_id}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def PostIndividualPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict: + """创建个人发布者""" + body = { + "name": name, + "nick": nick, + "bio": bio, + "picture_id": picture_id, + "background_id": background_id, + } + url = f"{DOMAIN}/publishers/individual" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def PostOrganizationPublisher(Authorization: str, name: str, realm_slug: str) -> dict: + """创建组织发布者""" + body = {"name": name, "realm_slug": realm_slug} + url = f"{DOMAIN}/publishers/organization/{realm_slug}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def GetPublisherMembers(Authorization: str, name: str, offset: int = 0, take: int = 20) -> dict: + """获取发布者成员""" + url = f"{DOMAIN}/publishers/{name}/members" + headers = {'accept': 'application/json', 'Authorization': Authorization} + params = {"offset": offset, "take": take} + return _make_request('GET', url, headers, params=params) + +def GetMyPublisherMember(Authorization: str, name: str) -> dict: + """获取我的发布者成员信息""" + url = f"{DOMAIN}/publishers/{name}/members/me" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPublisherFeature(Authorization: str, name: str, flag: str, expired_at: Optional[str] = None) -> dict: + """创建发布者特性""" + body = {"flag": flag, "expired_at": expired_at} + url = f"{DOMAIN}/publishers/{name}/features" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=body, normal_codes=[201]) + +def DeletePublisherFeature(Authorization: str, name: str, flag: str) -> dict: + """删除发布者特性""" + url = f"{DOMAIN}/publishers/{name}/features/{flag}" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('DELETE', url, headers, normal_codes=[204]) + +def GetPublisherSubscriptions(Authorization: str) -> dict: + """获取发布者订阅状态""" + url = f"{DOMAIN}/publishers/subscriptions" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def GetPublisherSubscription(Authorization: str, name: str) -> dict: + """获取特定发布者的订阅状态""" + url = f"{DOMAIN}/api/publishers/{name}/subscription" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('GET', url, headers) + +def PostPublisherSubscribe(Authorization: str, name: str, subscribeRequest: Optional[dict] = None,tier:int=0) -> dict: + """订阅特定发布者""" + RequestsBody={ + "tier":tier + } + url = f"{DOMAIN}/api/publishers/{name}/subscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers, request_body=subscribeRequest) + +def PostPublisherUnsubscribe(Authorization: str, name: str) -> dict: + """取消订阅特定发布者""" + url = f"{DOMAIN}/api/publishers/{name}/unsubscribe" + headers = {'accept': 'application/json', 'Authorization': Authorization} + return _make_request('POST', url, headers)