integrate/app/consumer/wekan/api.py

79 lines
2.4 KiB
Python

import logging
from optparse import Option
import string
import requests
from requests import Request
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional
from .models import User, UserBase
from .settings import WekanSettings
class WekanApi:
def __init__(self, settings: WekanSettings):
self.base = settings.baseurl
r = requests.post(f'{self.base}/users/login', data={
"username": settings.user,
"password": settings.password
}, )
if not r.status_code == 200:
raise Exception("[WekanAPI] Failed to init")
t = r.json()
self.token = {
"token": t["token"],
"id": t["id"],
"expires": t["tokenExpires"],
}
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer { self.token["token"] }',
}
def get(self, endpoint) -> Request:
return requests.get(f'{self.base}/api/{endpoint}', headers=self.headers)
def post(self, endpoint: str, data: Dict) -> Request:
return requests.post(url=f"{self.base}/api/{endpoint}", json=data, headers=self.headers)
def delete(self, endpoint: str) -> Request:
return requests.delete(url=f"{self.base}/api/{endpoint}", headers=self.headers)
def get_user(self, _id: string) -> Optional[User]:
r = self.get(f"users/{_id}").json()
if "error" in r:
raise Exception(r)
if r == {}:
return None
return User(**r)
def get_all_users(self) -> List[UserBase]:
r = self.get("users")
if r.status_code == 200:
return [UserBase(**u) for u in r.json()]
raise Exception()
def create_user(self, username: str, email: str, password: str) -> Optional[User]:
data = {
"username": username,
"email": email,
"password": password
}
r = self.post("users/", data).json()
if "error" in r:
if r["reason"] == "Username already exists.":
return self.get_user(username)
raise Exception(r)
if "_id" in r:
return self.get_user(r["_id"])
raise Exception()
def delete_user(self, id: str):
r = self.delete(f"users/{id}").json()
if "error" in r:
raise Exception(r)