import requests from requests.exceptions import RequestException import json DOMAIN = "https://solian.app/api" def _make_request(method: str, url: str, headers: dict, params: dict = None, data: dict = None) -> dict: """内部辅助函数,用于发送HTTP请求并处理响应""" try: if method == 'GET': response = requests.get(url, headers=headers, params=params) elif method == 'POST': response = requests.post(url, headers=headers, data=data) elif method == 'DELETE': response = requests.delete(url, headers=headers, params=params) elif method == 'PATCH': response = requests.patch(url, headers=headers, data=data) else: return {"error": "Unsupported HTTP method"} if response.status_code != 200: return {"error": response.status_code} return response.json() except json.JSONDecodeError: return {"error": response.text} except RequestException as e: return {"error": str(e)} def ActivityAPIs(cursor: str = '', filter: str = '', take: int = 20, debuginclude: str = '', Authorization: str = '') -> dict: """获取首页内容""" url = f"{DOMAIN}/activities" headers = {'accept': 'application/json', 'Authorization': Authorization} params = {"cursor": cursor, "filter": filter, "take": take, "debuginclude": debuginclude} return _make_request('GET', url, headers, params) def ChatSummary(Authorization: str) -> dict: """获取聊天摘要""" url = f"{DOMAIN}/chat/summary" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('GET', url, headers) def GetChatMessageAllInfo(Authorization: str, roomid: str, offset: int = 0, take: int = 20) -> dict: """获取聊天消息""" url = f"{DOMAIN}/chat/{roomid}/message" headers = {'accept': 'application/json', 'Authorization': Authorization} params = {"offset": offset, "take": take} return _make_request('GET', url, headers, params) def GetChatMessageBaseInfo(Authorization: str, roomid: str) -> dict: """获取聊天消息基础信息""" url = f"{DOMAIN}/chat/{roomid}/message/base" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('POST', url, headers) def GetAMessageAllInfo(Authorization: str, roomid: str, messageid: str) -> dict: """获取具体聊天消息所有信息""" url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('GET', url, headers) def GetAMessageInfo(Authorization: str, roomid: str, messageid: str) -> dict: """获取聊天消息基础信息。暂时作为保留""" url = f"{DOMAIN}/chat/{roomid}/message/{messageid}/info" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('POST', url, headers) def DelMessage(Authorization: str, roomid: str, messageid: str) -> dict: """删除聊天消息""" url = f"{DOMAIN}/chat/{roomid}/message/{messageid}" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('DELETE', url, headers) def GetChatRoomInfo(Authorization: str, id: str) -> dict: """获取聊天房间信息""" url = f"{DOMAIN}/chat/{id}" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('GET', url, headers) def ModifyChatRoomInfo(Authorization: str, id: str, data: dict) -> dict: """修改聊天房间信息""" url = f"{DOMAIN}/chat/{id}" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('PATCH', url, headers, data=data) def DeleteChatRoom(Authorization: str, id: str) -> dict: """删除聊天房间""" url = f"{DOMAIN}/chat/{id}" headers = {'accept': 'application/json', 'Authorization': Authorization} return _make_request('DELETE', url, headers)