integrate/app/consumer/wekan/api.py

79 lines
2.4 KiB
Python
Raw Normal View History

2022-04-29 14:13:54 +02:00
import logging
2022-03-03 16:36:08 +01:00
from optparse import Option
import string
import requests
from requests import Request
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional
2022-04-29 14:13:54 +02:00
from .models import User, UserBase
from .settings import WekanSettings
2022-03-03 16:36:08 +01:00
2022-04-29 14:13:54 +02:00
class WekanApi:
2022-03-03 16:36:08 +01:00
2022-04-29 14:13:54 +02:00
def __init__(self, settings: WekanSettings):
self.base = settings.baseurl
r = requests.post(f'{self.base}/users/login', data={
"username": settings.user,
"password": settings.password
2022-03-03 16:36:08 +01:00
}, )
if not r.status_code == 200:
2022-04-29 14:13:54 +02:00
raise Exception("[WekanAPI] Failed to init")
2022-03-03 16:36:08 +01:00
t = r.json()
self.token = {
"token": t["token"],
"id": t["id"],
"expires": t["tokenExpires"],
}
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer { self.token["token"] }',
}
def get(self, endpoint) -> Request:
return requests.get(f'{self.base}/api/{endpoint}', headers=self.headers)
def post(self, endpoint: str, data: Dict) -> Request:
return requests.post(url=f"{self.base}/api/{endpoint}", json=data, headers=self.headers)
def delete(self, endpoint: str) -> Request:
return requests.delete(url=f"{self.base}/api/{endpoint}", headers=self.headers)
def get_user(self, _id: string) -> Optional[User]:
r = self.get(f"users/{_id}").json()
if "error" in r:
raise Exception(r)
if r == {}:
return None
return User(**r)
def get_all_users(self) -> List[UserBase]:
r = self.get("users")
if r.status_code == 200:
return [UserBase(**u) for u in r.json()]
raise Exception()
def create_user(self, username: str, email: str, password: str) -> Optional[User]:
data = {
"username": username,
"email": email,
2022-04-29 14:13:54 +02:00
"password": password
2022-03-03 16:36:08 +01:00
}
r = self.post("users/", data).json()
if "error" in r:
if r["reason"] == "Username already exists.":
return self.get_user(username)
raise Exception(r)
if "_id" in r:
return self.get_user(r["_id"])
raise Exception()
2022-04-29 16:32:10 +02:00
def delete_user(self, id: str):
r = self.delete(f"users/{id}").json()
2022-03-03 16:36:08 +01:00
if "error" in r:
raise Exception(r)