78 lines
2.3 KiB
Python
78 lines
2.3 KiB
Python
from optparse import Option
|
|
import string
|
|
import requests
|
|
from requests import Request
|
|
from pydantic import BaseModel, Field
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from app.wekan.models import User, UserBase
|
|
|
|
|
|
class Wekan:
|
|
|
|
def __init__(self, base: str, api_user: str, password: str):
|
|
self.base = base
|
|
r = requests.post(f'{base}/users/login', data={
|
|
"username": api_user,
|
|
"password": password
|
|
}, )
|
|
if not r.status_code == 200:
|
|
raise Exception(r.url, r.text)
|
|
|
|
t = r.json()
|
|
self.token = {
|
|
"token": t["token"],
|
|
"id": t["id"],
|
|
"expires": t["tokenExpires"],
|
|
}
|
|
|
|
self.headers = {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json',
|
|
'Authorization': f'Bearer { self.token["token"] }',
|
|
}
|
|
|
|
def get(self, endpoint) -> Request:
|
|
return requests.get(f'{self.base}/api/{endpoint}', headers=self.headers)
|
|
|
|
def post(self, endpoint: str, data: Dict) -> Request:
|
|
return requests.post(url=f"{self.base}/api/{endpoint}", json=data, headers=self.headers)
|
|
|
|
def delete(self, endpoint: str) -> Request:
|
|
return requests.delete(url=f"{self.base}/api/{endpoint}", headers=self.headers)
|
|
|
|
def get_user(self, _id: string) -> Optional[User]:
|
|
r = self.get(f"users/{_id}").json()
|
|
if "error" in r:
|
|
raise Exception(r)
|
|
if r == {}:
|
|
return None
|
|
print(r)
|
|
return User(**r)
|
|
|
|
def get_all_users(self) -> List[UserBase]:
|
|
r = self.get("users")
|
|
if r.status_code == 200:
|
|
return [UserBase(**u) for u in r.json()]
|
|
raise Exception()
|
|
|
|
def create_user(self, username: str, email: str, password: str) -> Optional[User]:
|
|
data = {
|
|
"username": username,
|
|
"email": email,
|
|
"password": ""
|
|
}
|
|
r = self.post("users/", data).json()
|
|
if "error" in r:
|
|
if r["reason"] == "Username already exists.":
|
|
return self.get_user(username)
|
|
raise Exception(r)
|
|
if "_id" in r:
|
|
return self.get_user(r["_id"])
|
|
raise Exception()
|
|
|
|
def delete_user(self, user: str):
|
|
r = self.delete(f"users/{user}").json()
|
|
print(r)
|
|
if "error" in r:
|
|
raise Exception(r)
|