上传文件至 core/SNAPI
This commit is contained in:
30
core/SNAPI/CallServer.py
Normal file
30
core/SNAPI/CallServer.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import requests
|
||||
import json
|
||||
from requests.exceptions import RequestException
|
||||
import platform
|
||||
|
||||
UA = f"SolianForPythonApp/0.000.001 ({platform.system()})"
|
||||
|
||||
def _make_request(method: str, url: str, headers: dict, params: dict = None, normal_codes: list = [200], request_body: dict = None) -> dict:
|
||||
"""内部辅助函数,用于发送HTTP请求并处理响应"""
|
||||
headers['User-Agent'] = UA
|
||||
try:
|
||||
if method == 'GET':
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
elif method == 'POST':
|
||||
response = requests.post(url, headers=headers, data=json.dumps(request_body))
|
||||
elif method == 'DELETE':
|
||||
response = requests.delete(url, headers=headers, params=params)
|
||||
elif method == 'PATCH':
|
||||
response = requests.patch(url, headers=headers, data=json.dumps(request_body))
|
||||
else:
|
||||
return {"error": "Unsupported HTTP method"}
|
||||
|
||||
if response.status_code not in normal_codes:
|
||||
return {"error": f"Unexpected status code: {response.status_code}"}
|
||||
|
||||
return response.json()
|
||||
except json.JSONDecodeError:
|
||||
return {"error": response.text}
|
||||
except RequestException as e:
|
||||
return {"error": str(e)}
|
149
core/SNAPI/Publisher.py
Normal file
149
core/SNAPI/Publisher.py
Normal file
@@ -0,0 +1,149 @@
|
||||
from ProjectCfg import DOMAIN
|
||||
from CallServer import _make_request
|
||||
from typing import List,Any,Optional,Dict
|
||||
###==========================发布者管理============================
|
||||
def GetPublisher(Authorization: str, name: str) -> dict:
|
||||
"""获取发布者信息"""
|
||||
url = f"{DOMAIN}/publishers/{name}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def PatchPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict:
|
||||
"""修改发布者信息"""
|
||||
body = {
|
||||
"nick": nick,
|
||||
"bio": bio,
|
||||
"picture_id": picture_id,
|
||||
"background_id": background_id,
|
||||
}
|
||||
url = f"{DOMAIN}/publishers/{name}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('PATCH', url, headers, request_body=body, normal_codes=[201])
|
||||
|
||||
def DeletePublisher(Authorization: str, name: str) -> dict:
|
||||
"""删除发布者"""
|
||||
url = f"{DOMAIN}/publishers/{name}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('DELETE', url, headers, normal_codes=[204])
|
||||
|
||||
def GetPublisherStats(Authorization: str, name: str) -> dict:
|
||||
"""获取发布者统计信息"""
|
||||
url = f"{DOMAIN}/publishers/{name}/stats"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def GetPublishersOfAccountId(Authorization: str, account_id: str) -> dict:
|
||||
"""获取指定账户的发布者"""
|
||||
url = f"{DOMAIN}/publishers/of/{account_id}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def GetPublishers(Authorization: str) -> dict:
|
||||
"""获取所有发布者"""
|
||||
url = f"{DOMAIN}/publishers"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def PostPublisherInvite(Authorization: str, name: str, related_user_id: str, role: int) -> dict:
|
||||
"""发送发布者邀请"""
|
||||
body = {"related_user_id": related_user_id, "role": role}
|
||||
url = f"{DOMAIN}/publishers/invites/{name}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
|
||||
|
||||
def GetPublisherInvites(Authorization: str, name: str) -> dict:
|
||||
"""获取发布者邀请"""
|
||||
url = f"{DOMAIN}/publishers/invites/{name}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def AcceptPublisherInvite(Authorization: str, name: str) -> dict:
|
||||
"""接受发布者邀请"""
|
||||
url = f"{DOMAIN}/publishers/invites/{name}/accept"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers, normal_codes=[201])
|
||||
|
||||
def DeclinePublisherInvite(Authorization: str, name: str) -> dict:
|
||||
"""拒绝发布者邀请"""
|
||||
url = f"{DOMAIN}/publishers/invites/{name}/decline"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers, normal_codes=[201])
|
||||
|
||||
def DeletePublisherMember(Authorization: str, name: str, member_id: str) -> dict:
|
||||
"""删除发布者成员"""
|
||||
url = f"{DOMAIN}/publishers/{name}/members/{member_id}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('DELETE', url, headers, normal_codes=[204])
|
||||
|
||||
def PostIndividualPublisher(Authorization: str, name: str, nick: Optional[str] = None, bio: Optional[str] = None, picture_id: Optional[str] = None, background_id: Optional[str] = None) -> dict:
|
||||
"""创建个人发布者"""
|
||||
body = {
|
||||
"name": name,
|
||||
"nick": nick,
|
||||
"bio": bio,
|
||||
"picture_id": picture_id,
|
||||
"background_id": background_id,
|
||||
}
|
||||
url = f"{DOMAIN}/publishers/individual"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
|
||||
|
||||
def PostOrganizationPublisher(Authorization: str, name: str, realm_slug: str) -> dict:
|
||||
"""创建组织发布者"""
|
||||
body = {"name": name, "realm_slug": realm_slug}
|
||||
url = f"{DOMAIN}/publishers/organization/{realm_slug}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
|
||||
|
||||
def GetPublisherMembers(Authorization: str, name: str, offset: int = 0, take: int = 20) -> dict:
|
||||
"""获取发布者成员"""
|
||||
url = f"{DOMAIN}/publishers/{name}/members"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
params = {"offset": offset, "take": take}
|
||||
return _make_request('GET', url, headers, params=params)
|
||||
|
||||
def GetMyPublisherMember(Authorization: str, name: str) -> dict:
|
||||
"""获取我的发布者成员信息"""
|
||||
url = f"{DOMAIN}/publishers/{name}/members/me"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def PostPublisherFeature(Authorization: str, name: str, flag: str, expired_at: Optional[str] = None) -> dict:
|
||||
"""创建发布者特性"""
|
||||
body = {"flag": flag, "expired_at": expired_at}
|
||||
url = f"{DOMAIN}/publishers/{name}/features"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers, request_body=body, normal_codes=[201])
|
||||
|
||||
def DeletePublisherFeature(Authorization: str, name: str, flag: str) -> dict:
|
||||
"""删除发布者特性"""
|
||||
url = f"{DOMAIN}/publishers/{name}/features/{flag}"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('DELETE', url, headers, normal_codes=[204])
|
||||
|
||||
def GetPublisherSubscriptions(Authorization: str) -> dict:
|
||||
"""获取发布者订阅状态"""
|
||||
url = f"{DOMAIN}/publishers/subscriptions"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def GetPublisherSubscription(Authorization: str, name: str) -> dict:
|
||||
"""获取特定发布者的订阅状态"""
|
||||
url = f"{DOMAIN}/api/publishers/{name}/subscription"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('GET', url, headers)
|
||||
|
||||
def PostPublisherSubscribe(Authorization: str, name: str, subscribeRequest: Optional[dict] = None,tier:int=0) -> dict:
|
||||
"""订阅特定发布者"""
|
||||
RequestsBody={
|
||||
"tier":tier
|
||||
}
|
||||
url = f"{DOMAIN}/api/publishers/{name}/subscribe"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers, request_body=subscribeRequest)
|
||||
|
||||
def PostPublisherUnsubscribe(Authorization: str, name: str) -> dict:
|
||||
"""取消订阅特定发布者"""
|
||||
url = f"{DOMAIN}/api/publishers/{name}/unsubscribe"
|
||||
headers = {'accept': 'application/json', 'Authorization': Authorization}
|
||||
return _make_request('POST', url, headers)
|
Reference in New Issue
Block a user