from database import db from areas.apps.models import App, AppRole from areas.roles.role_service import RoleService from helpers import KratosApi class UserService: @staticmethod def get_users(): res = KratosApi.get("/admin/identities").json() userList = [] for r in res: userList.append(UserService.__insertAppRoleToUser(r["id"], r)) return userList @staticmethod def get_user(id): res = KratosApi.get("/admin/identities/{}".format(id)).json() return UserService.__insertAppRoleToUser(id, res) @staticmethod def post_user(data): kratos_data = { "schema_id": "default", "traits": {"email": data["email"], "name": data["name"]}, } res = KratosApi.post("/admin/identities", kratos_data).json() if data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole( user_id=res["id"], role_id=ar["role_id"] if "role_id" in ar else None, app_id=app.id, ) db.session.add(app_role) db.session.commit() return UserService.get_user(res["id"]) @staticmethod def put_user(id, data): kratos_data = { "schema_id": "default", "traits": {"email": data["email"], "name": data["name"]}, } KratosApi.put("/admin/identities/{}".format(id), kratos_data) if data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole.query.filter_by(user_id=id, app_id=app.id).first() if app_role: app_role.role_id = ar["role_id"] if "role_id" in ar else None db.session.commit() else: appRole = AppRole( user_id=id, role_id=ar["role_id"] if "role_id" in ar else None, app_id=app.id, ) db.session.add(appRole) db.session.commit() return UserService.get_user(id) @staticmethod def put_personal_info(id, data): kratos_data = { "schema_id": "default", "traits": {"email": data["email"], "name": data["name"]}, } KratosApi.put("/admin/identities/{}".format(id), kratos_data) is_admin = RoleService.is_user_admin(id) if is_admin and data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole.query.filter_by(user_id=id, app_id=app.id).first() if app_role: app_role.role_id = ar["role_id"] if "role_id" in ar else None db.session.commit() else: appRole = AppRole( user_id=id, role_id=ar["role_id"] if "role_id" in ar else None, app_id=app.id, ) db.session.add(appRole) db.session.commit() return UserService.get_user(id) @staticmethod def delete_user(id): app_role = AppRole.query.filter_by(user_id=id).all() for ar in app_role: db.session.delete(ar) db.session.commit() @staticmethod def __insertAppRoleToUser(userId, userRes): apps = App.query.all() app_roles = [] for app in apps: tmp_app_role = AppRole.query.filter_by( user_id=userId, app_id=app.id ).first() app_roles.append( { "name": app.slug, "role_id": tmp_app_role.role_id if tmp_app_role else None, } ) userRes["traits"]["app_roles"] = app_roles return userRes