integrate/app/wekan/api.py

79 lines
2.3 KiB
Python

from optparse import Option
import string
import requests
from requests import Request
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional
from app.wekan.models import User, UserBase
class Wekan:
def __init__(self, base: str, api_user: str, password: str):
self.base = base
r = requests.post(f'{base}/users/login', data={
"username": api_user,
"password": password
}, )
if not r.status_code == 200:
raise Exception(r.url, r.text)
t = r.json()
self.token = {
"token": t["token"],
"id": t["id"],
"expires": t["tokenExpires"],
}
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer { self.token["token"] }',
}
def get(self, endpoint) -> Request:
return requests.get(f'{self.base}/api/{endpoint}', headers=self.headers)
def post(self, endpoint: str, data: Dict) -> Request:
return requests.post(url=f"{self.base}/api/{endpoint}", json=data, headers=self.headers)
def delete(self, endpoint: str) -> Request:
return requests.delete(url=f"{self.base}/api/{endpoint}", headers=self.headers)
def get_user(self, _id: string) -> Optional[User]:
r = self.get(f"users/{_id}").json()
if "error" in r:
raise Exception(r)
if r == {}:
return None
print(r)
return User(**r)
def get_all_users(self) -> List[UserBase]:
r = self.get("users")
if r.status_code == 200:
return [UserBase(**u) for u in r.json()]
raise Exception()
def create_user(self, username: str, email: str, password: str) -> Optional[User]:
data = {
"username": username,
"email": email,
"password": ""
}
r = self.post("users/", data).json()
if "error" in r:
if r["reason"] == "Username already exists.":
return self.get_user(username)
raise Exception(r)
if "_id" in r:
return self.get_user(r["_id"])
raise Exception()
def delete_user(self, user: str):
r = self.delete(f"users/{user}").json()
print(r)
if "error" in r:
raise Exception(r)