dashboard/areas/users/user_service.py
2022-07-17 19:59:08 +02:00

133 lines
4.3 KiB
Python

from database import db
from areas.apps import App, AppRole, AppsService
from helpers import KratosApi
from flask import current_app
class UserService:
no_access_role_id = 3
@staticmethod
def get_users():
res = KratosApi.get("/admin/identities").json()
userList = []
for r in res:
userList.append(UserService.__insertAppRoleToUser(r["id"], r))
return userList
@staticmethod
def get_user(id):
res = KratosApi.get("/admin/identities/{}".format(id)).json()
return UserService.__insertAppRoleToUser(id, res)
@staticmethod
def post_user(data):
kratos_data = {
"schema_id": "default",
"traits": {"email": data["email"], "name": data["name"]},
}
res = KratosApi.post("/admin/identities", kratos_data).json()
if data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole(
user_id=res["id"],
role_id=ar["role_id"] if "role_id" in ar else UserService.no_access_role_id,
app_id=app.id,
)
db.session.add(app_role)
db.session.commit()
else:
all_apps = AppsService.get_all_apps()
for app in all_apps:
app_role = AppRole(
user_id=res["id"],
role_id=UserService.no_access_role_id,
app_id=app.id,
)
db.session.add(app_role)
db.session.commit()
return UserService.get_user(res["id"])
@staticmethod
def put_user(id, data):
kratos_data = {
"schema_id": "default",
"traits": {"email": data["email"], "name": data["name"]},
}
KratosApi.put("/admin/identities/{}".format(id), kratos_data)
if data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole.query.filter_by(
user_id=id, app_id=app.id).first()
if app_role:
app_role.role_id = ar["role_id"] if "role_id" in ar else None
db.session.commit()
else:
appRole = AppRole(
user_id=id,
role_id=ar["role_id"] if "role_id" in ar else None,
app_id=app.id,
)
db.session.add(appRole)
db.session.commit()
return UserService.get_user(id)
@staticmethod
def delete_user(id):
app_role = AppRole.query.filter_by(user_id=id).all()
for ar in app_role:
db.session.delete(ar)
db.session.commit()
@staticmethod
def post_multiple_users(data):
# check if data is array
# for every item in array call Kratos - check if there can be batch create on Kratos
# - if yes, what happens with the batch if there is at least one existing email
created_users = []
not_created_users = []
for user_data in data['users']:
user_mail = user_data["email"]
if not user_mail:
return
try:
user = UserService.post_user(user_data)
current_app.logger.info(f"Batch create user: {user_mail}")
created_users.append(user)
except Exception as error:
current_app.logger.error(f"Exception calling Kratos: {error} on creating user: {user_mail}")
not_created_users.append(user_mail)
return {"created_users": created_users, "not_created_users": not_created_users}
@staticmethod
def __insertAppRoleToUser(userId, userRes):
apps = App.query.all()
app_roles = []
for app in apps:
tmp_app_role = AppRole.query.filter_by(
user_id=userId, app_id=app.id
).first()
app_roles.append(
{
"name": app.slug,
"role_id": tmp_app_role.role_id if tmp_app_role else None,
}
)
userRes["traits"]["app_roles"] = app_roles
return userRes