from optparse import Option import string import requests from requests import Request from pydantic import BaseModel, Field from typing import Any, Dict, List, Optional from app.wekan.models import User, UserBase class Wekan: def __init__(self, base: str, api_user: str, password: str): self.base = base r = requests.post(f'{base}/users/login', data={ "username": api_user, "password": password }, ) if not r.status_code == 200: raise Exception(r.url, r.text) t = r.json() self.token = { "token": t["token"], "id": t["id"], "expires": t["tokenExpires"], } self.headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': f'Bearer { self.token["token"] }', } def get(self, endpoint) -> Request: return requests.get(f'{self.base}/api/{endpoint}', headers=self.headers) def post(self, endpoint: str, data: Dict) -> Request: return requests.post(url=f"{self.base}/api/{endpoint}", json=data, headers=self.headers) def delete(self, endpoint: str) -> Request: return requests.delete(url=f"{self.base}/api/{endpoint}", headers=self.headers) def get_user(self, _id: string) -> Optional[User]: r = self.get(f"users/{_id}").json() if "error" in r: raise Exception(r) if r == {}: return None print(r) return User(**r) def get_all_users(self) -> List[UserBase]: r = self.get("users") if r.status_code == 200: return [UserBase(**u) for u in r.json()] raise Exception() def create_user(self, username: str, email: str, password: str) -> Optional[User]: data = { "username": username, "email": email, "password": "" } r = self.post("users/", data).json() if "error" in r: if r["reason"] == "Username already exists.": return self.get_user(username) raise Exception(r) if "_id" in r: return self.get_user(r["_id"]) raise Exception() def delete_user(self, user: str): r = self.delete(f"users/{user}").json() print(r) if "error" in r: raise Exception(r)