import logging from optparse import Option import string import requests from requests import Request from pydantic import BaseModel, Field from typing import Any, Dict, List, Optional from .models import User, UserBase from .settings import WekanSettings class WekanApi: def __init__(self, settings: WekanSettings): self.base = settings.baseurl r = requests.post(f'{self.base}/users/login', data={ "username": settings.user, "password": settings.password }, ) if not r.status_code == 200: raise Exception("[WekanAPI] Failed to init") t = r.json() self.token = { "token": t["token"], "id": t["id"], "expires": t["tokenExpires"], } self.headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': f'Bearer { self.token["token"] }', } def get(self, endpoint) -> Request: return requests.get(f'{self.base}/api/{endpoint}', headers=self.headers) def post(self, endpoint: str, data: Dict) -> Request: return requests.post(url=f"{self.base}/api/{endpoint}", json=data, headers=self.headers) def delete(self, endpoint: str) -> Request: return requests.delete(url=f"{self.base}/api/{endpoint}", headers=self.headers) def get_user(self, _id: string) -> Optional[User]: r = self.get(f"users/{_id}").json() if "error" in r: raise Exception(r) if r == {}: return None return User(**r) def get_all_users(self) -> List[UserBase]: r = self.get("users") if r.status_code == 200: return [UserBase(**u) for u in r.json()] raise Exception() def create_user(self, username: str, email: str, password: str) -> Optional[User]: data = { "username": username, "email": email, "password": password } r = self.post("users/", data).json() if "error" in r: if r["reason"] == "Username already exists.": return self.get_user(username) raise Exception(r) if "_id" in r: return self.get_user(r["_id"]) raise Exception() def delete_user(self, id: str): r = self.delete(f"users/{id}").json() if "error" in r: raise Exception(r)