import ory_kratos_client from ory_kratos_client.model.submit_self_service_recovery_flow_body \ import SubmitSelfServiceRecoveryFlowBody from ory_kratos_client.api import v0alpha2_api as kratos_api from config import KRATOS_ADMIN_URL from database import db from areas.apps.models import App, AppRole from areas.roles.role_service import RoleService from helpers import KratosApi kratos_admin_api_configuration = \ ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True) KRATOS_ADMIN = \ kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(kratos_admin_api_configuration)) class UserService: @staticmethod def get_users(): res = KratosApi.get("/admin/identities").json() userList = [] for r in res: userList.append(UserService.__insertAppRoleToUser(r["id"], r)) return userList @staticmethod def get_user(id): res = KratosApi.get("/admin/identities/{}".format(id)).json() return UserService.__insertAppRoleToUser(id, res) @staticmethod def post_user(data): kratos_data = { "schema_id": "default", "traits": { "name": data["name"], "email": data["email"], }, } res = KratosApi.post("/admin/identities", kratos_data).json() if data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole( user_id=res["id"], role_id=ar["role_id"] if "role_id" in ar else None, app_id=app.id, ) db.session.add(app_role) db.session.commit() UserService.__start_recovery_flow(data["email"]) return UserService.get_user(res["id"]) @staticmethod def __start_recovery_flow(email): """ Start a Kratos recovery flow for the user's email address. This sends out an email to the user that explains to them how they can set their password. Make sure the user exists inside Kratos before you use this function. :param email: Email to send recovery link to :type email: str """ api_response = KRATOS_ADMIN.initialize_self_service_recovery_flow_without_browser() flow = api_response['id'] # Submit the recovery flow to send an email to the new user. submit_self_service_recovery_flow_body = \ SubmitSelfServiceRecoveryFlowBody(method="link", email=email) api_response = KRATOS_ADMIN.submit_self_service_recovery_flow(flow, submit_self_service_recovery_flow_body= submit_self_service_recovery_flow_body) @staticmethod def put_user(id, user_editing_id, data): kratos_data = { "schema_id": "default", "traits": {"email": data["email"], "name": data["name"]}, } KratosApi.put("/admin/identities/{}".format(id), kratos_data) is_admin = RoleService.is_user_admin(user_editing_id) if is_admin and data["app_roles"]: app_roles = data["app_roles"] for ar in app_roles: app = App.query.filter_by(slug=ar["name"]).first() app_role = AppRole.query.filter_by(user_id=id, app_id=app.id).first() if app_role: app_role.role_id = ar["role_id"] if "role_id" in ar else None db.session.commit() else: appRole = AppRole( user_id=id, role_id=ar["role_id"] if "role_id" in ar else None, app_id=app.id, ) db.session.add(appRole) db.session.commit() return UserService.get_user(id) @staticmethod def delete_user(id): app_role = AppRole.query.filter_by(user_id=id).all() for ar in app_role: db.session.delete(ar) db.session.commit() @staticmethod def __insertAppRoleToUser(userId, userRes): apps = App.query.all() app_roles = [] for app in apps: tmp_app_role = AppRole.query.filter_by( user_id=userId, app_id=app.id ).first() app_roles.append( { "name": app.slug, "role_id": tmp_app_role.role_id if tmp_app_role else None, } ) userRes["traits"]["app_roles"] = app_roles return userRes