import ory_kratos_client from ory_kratos_client.model.submit_self_service_recovery_flow_body \ import SubmitSelfServiceRecoveryFlowBody from ory_kratos_client.api import v0alpha2_api as kratos_api from config import KRATOS_ADMIN_URL from database import db from areas.apps import App, AppRole, AppsService from areas.roles import Role, RoleService from helpers import KratosApi from flask import current_app from helpers.error_handler import KratosError kratos_admin_api_configuration = \ ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True) KRATOS_ADMIN = \ kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(kratos_admin_api_configuration)) class UserService: @staticmethod def get_users(): res = KratosApi.get("/admin/identities").json() userList = [] for r in res: userList.append(UserService.__insertAppRoleToUser(r["id"], r)) return userList @staticmethod def get_user(id): res = KratosApi.get("/admin/identities/{}".format(id)).json() return UserService.__insertAppRoleToUser(id, res) @staticmethod def post_user(data): kratos_data = { "schema_id": "default", "traits": { "name": data["name"], "email": data["email"], }, } res = KratosApi.post("/admin/identities", kratos_data).json() if data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole( user_id=res["id"], role_id=ar["role_id"] if "role_id" in ar else Role.NO_ACCESS_ROLE_ID, app_id=app.id, ) db.session.add(app_role) db.session.commit() else: all_apps = AppsService.get_all_apps() for app in all_apps: app_role = AppRole( user_id=res["id"], role_id=Role.NO_ACCESS_ROLE_ID, app_id=app.id, ) db.session.add(app_role) db.session.commit() UserService.__start_recovery_flow(data["email"]) return UserService.get_user(res["id"]) @staticmethod def __start_recovery_flow(email): """ Start a Kratos recovery flow for the user's email address. This sends out an email to the user that explains to them how they can set their password. Make sure the user exists inside Kratos before you use this function. :param email: Email to send recovery link to :type email: str """ api_response = KRATOS_ADMIN.initialize_self_service_recovery_flow_without_browser() flow = api_response['id'] # Submit the recovery flow to send an email to the new user. submit_self_service_recovery_flow_body = \ SubmitSelfServiceRecoveryFlowBody(method="link", email=email) api_response = KRATOS_ADMIN.submit_self_service_recovery_flow(flow, submit_self_service_recovery_flow_body= submit_self_service_recovery_flow_body) @staticmethod def put_user(id, user_editing_id, data): kratos_data = { "schema_id": "default", "traits": {"email": data["email"], "name": data["name"]}, } KratosApi.put("/admin/identities/{}".format(id), kratos_data) is_admin = RoleService.is_user_admin(user_editing_id) if is_admin and data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole.query.filter_by( user_id=id, app_id=app.id).first() if app_role: app_role.role_id = ar["role_id"] if "role_id" in ar else None db.session.commit() else: appRole = AppRole( user_id=id, role_id=ar["role_id"] if "role_id" in ar else None, app_id=app.id, ) db.session.add(appRole) db.session.commit() return UserService.get_user(id) @staticmethod def delete_user(id): app_role = AppRole.query.filter_by(user_id=id).all() for ar in app_role: db.session.delete(ar) db.session.commit() @staticmethod def post_multiple_users(data): # check if data is array # for every item in array call Kratos created_users = [] existing_users = [] creation_failed_users = [] for user_data in data['users']: user_email = user_data["email"] if not user_email: return try: UserService.post_user(user_data) current_app.logger.info(f"Batch create user: {user_email}") created_users.append(user_email) except KratosError as err: status_code = err.args[1] if status_code == 409: existing_users.append(user_email) elif status_code == 400: creation_failed_users.append(user_email) current_app.logger.error( f"Exception calling Kratos: {err} on creating user: {user_email} {status_code}") except Exception as error: current_app.logger.error( f"Exception: {error} on creating user: {user_email}") creation_failed_users.append(user_email) success_response = {} existing_response = {} failed_response = {} if created_users: success_response = {"users": created_users, "message": f"{len(created_users)} users created"} if existing_users: existing_response = { "users": existing_users, "message": f"{len(existing_users)} users already exist: {', '.join(existing_users)}"} if creation_failed_users: failed_response = {"users": creation_failed_users, "message": f"{len(creation_failed_users)} users failed to create: {', '.join(creation_failed_users)}"} return {"success": success_response, "existing": existing_response, "failed": failed_response} @staticmethod def __insertAppRoleToUser(userId, userRes): apps = App.query.all() app_roles = [] for app in apps: tmp_app_role = AppRole.query.filter_by( user_id=userId, app_id=app.id ).first() app_roles.append( { "name": app.slug, "role_id": tmp_app_role.role_id if tmp_app_role else None, } ) userRes["traits"]["app_roles"] = app_roles return userRes