from database import db from areas.apps import App, AppRole, AppsService from areas.roles import Role, RoleService from helpers import KratosApi from flask import current_app from helpers.error_handler import KratosError class UserService: @staticmethod def get_users(): res = KratosApi.get("/admin/identities").json() userList = [] for r in res: userList.append(UserService.__insertAppRoleToUser(r["id"], r)) return userList @staticmethod def get_user(id): res = KratosApi.get("/admin/identities/{}".format(id)).json() return UserService.__insertAppRoleToUser(id, res) @staticmethod def post_user(data): kratos_data = { "schema_id": "default", "traits": {"email": data["email"], "name": data["name"]}, } res = KratosApi.post("/admin/identities", kratos_data).json() if data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole( user_id=res["id"], role_id=ar["role_id"] if "role_id" in ar else Role.NO_ACCESS_ROLE_ID, app_id=app.id, ) db.session.add(app_role) db.session.commit() else: all_apps = AppsService.get_all_apps() for app in all_apps: app_role = AppRole( user_id=res["id"], role_id=Role.NO_ACCESS_ROLE_ID, app_id=app.id, ) db.session.add(app_role) db.session.commit() return UserService.get_user(res["id"]) @staticmethod def put_user(id, user_editing_id, data): kratos_data = { "schema_id": "default", "traits": {"email": data["email"], "name": data["name"]}, } KratosApi.put("/admin/identities/{}".format(id), kratos_data) is_admin = RoleService.is_user_admin(user_editing_id) if is_admin and data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole.query.filter_by( user_id=id, app_id=app.id).first() if app_role: app_role.role_id = ar["role_id"] if "role_id" in ar else None db.session.commit() else: appRole = AppRole( user_id=id, role_id=ar["role_id"] if "role_id" in ar else None, app_id=app.id, ) db.session.add(appRole) db.session.commit() return UserService.get_user(id) @staticmethod def delete_user(id): app_role = AppRole.query.filter_by(user_id=id).all() for ar in app_role: db.session.delete(ar) db.session.commit() @staticmethod def post_multiple_users(data): # check if data is array # for every item in array call Kratos created_users = [] existing_users = [] creation_failed_users = [] for user_data in data['users']: user_email = user_data["email"] if not user_email: return try: UserService.post_user(user_data) current_app.logger.info(f"Batch create user: {user_email}") created_users.append(user_email) except KratosError as err: status_code = err.args[1] if status_code == 409: existing_users.append(user_email) elif status_code == 400: creation_failed_users.append(user_email) current_app.logger.error( f"Exception calling Kratos: {err} on creating user: {user_email} {status_code}") except Exception as error: current_app.logger.error( f"Exception: {error} on creating user: {user_email}") creation_failed_users.append(user_email) success_response = {} existing_response = {} failed_response = {} if created_users: success_response = {"users": created_users, "message": f"{len(created_users)} users created"} if existing_users: existing_response = { "users": existing_users, "message": f"{len(existing_users)} users already exist: {', '.join(existing_users)}"} if creation_failed_users: failed_response = {"users": creation_failed_users, "message": f"{len(creation_failed_users)} users failed to create: {', '.join(creation_failed_users)}"} return {"success": success_response, "existing": existing_response, "failed": failed_response} @staticmethod def __insertAppRoleToUser(userId, userRes): apps = App.query.all() app_roles = [] for app in apps: tmp_app_role = AppRole.query.filter_by( user_id=userId, app_id=app.id ).first() app_roles.append( { "name": app.slug, "role_id": tmp_app_role.role_id if tmp_app_role else None, } ) userRes["traits"]["app_roles"] = app_roles return userRes