dashboard/backend/areas/users/user_service.py

196 lines
6.9 KiB
Python
Raw Normal View History

import ory_kratos_client
from ory_kratos_client.model.submit_self_service_recovery_flow_body \
import SubmitSelfServiceRecoveryFlowBody
from ory_kratos_client.api import v0alpha2_api as kratos_api
from config import KRATOS_ADMIN_URL
from database import db
from areas.apps import App, AppRole, AppsService
from areas.roles import Role, RoleService
from helpers import KratosApi
2022-07-11 15:11:18 +02:00
from flask import current_app
from helpers.error_handler import KratosError
kratos_admin_api_configuration = \
ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True)
KRATOS_ADMIN = \
kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(kratos_admin_api_configuration))
2022-07-11 15:11:18 +02:00
class UserService:
@staticmethod
def get_users():
res = KratosApi.get("/admin/identities").json()
userList = []
for r in res:
userList.append(UserService.__insertAppRoleToUser(r["id"], r))
return userList
@staticmethod
def get_user(id):
res = KratosApi.get("/admin/identities/{}".format(id)).json()
return UserService.__insertAppRoleToUser(id, res)
@staticmethod
def post_user(data):
kratos_data = {
"schema_id": "default",
"traits": {
"name": data["name"],
"email": data["email"],
},
}
res = KratosApi.post("/admin/identities", kratos_data).json()
2022-05-16 13:44:15 +02:00
if data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole(
user_id=res["id"],
role_id=ar["role_id"] if "role_id" in ar else Role.NO_ACCESS_ROLE_ID,
app_id=app.id,
)
db.session.add(app_role)
db.session.commit()
else:
all_apps = AppsService.get_all_apps()
for app in all_apps:
app_role = AppRole(
user_id=res["id"],
role_id=Role.NO_ACCESS_ROLE_ID,
2022-05-16 13:44:15 +02:00
app_id=app.id,
)
2022-05-16 13:44:15 +02:00
db.session.add(app_role)
db.session.commit()
UserService.__start_recovery_flow(data["email"])
return UserService.get_user(res["id"])
@staticmethod
def __start_recovery_flow(email):
"""
Start a Kratos recovery flow for the user's email address.
This sends out an email to the user that explains to them how they can
set their password. Make sure the user exists inside Kratos before you
use this function.
:param email: Email to send recovery link to
:type email: str
"""
api_response = KRATOS_ADMIN.initialize_self_service_recovery_flow_without_browser()
flow = api_response['id']
# Submit the recovery flow to send an email to the new user.
submit_self_service_recovery_flow_body = \
SubmitSelfServiceRecoveryFlowBody(method="link", email=email)
api_response = KRATOS_ADMIN.submit_self_service_recovery_flow(flow,
submit_self_service_recovery_flow_body=
submit_self_service_recovery_flow_body)
@staticmethod
def put_user(id, user_editing_id, data):
kratos_data = {
"schema_id": "default",
"traits": {"email": data["email"], "name": data["name"]},
}
KratosApi.put("/admin/identities/{}".format(id), kratos_data)
is_admin = RoleService.is_user_admin(user_editing_id)
2022-06-21 14:41:54 +02:00
2022-07-09 12:18:03 +02:00
if is_admin and data["app_roles"]:
2022-05-16 13:44:15 +02:00
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
2022-07-11 15:11:18 +02:00
app_role = AppRole.query.filter_by(
user_id=id, app_id=app.id).first()
2022-05-16 13:44:15 +02:00
if app_role:
app_role.role_id = ar["role_id"] if "role_id" in ar else None
db.session.commit()
else:
appRole = AppRole(
user_id=id,
role_id=ar["role_id"] if "role_id" in ar else None,
app_id=app.id,
)
db.session.add(appRole)
db.session.commit()
return UserService.get_user(id)
2022-05-16 13:44:15 +02:00
@staticmethod
def delete_user(id):
app_role = AppRole.query.filter_by(user_id=id).all()
2022-05-19 19:01:26 +02:00
for ar in app_role:
db.session.delete(ar)
db.session.commit()
2022-05-16 13:44:15 +02:00
2022-07-11 15:11:18 +02:00
@staticmethod
def post_multiple_users(data):
# check if data is array
# for every item in array call Kratos
created_users = []
existing_users = []
creation_failed_users = []
2022-07-11 15:11:18 +02:00
2022-07-17 19:59:08 +02:00
for user_data in data['users']:
user_email = user_data["email"]
if not user_email:
2022-07-17 19:59:08 +02:00
return
2022-07-11 15:11:18 +02:00
try:
UserService.post_user(user_data)
current_app.logger.info(f"Batch create user: {user_email}")
created_users.append(user_email)
except KratosError as err:
status_code = err.args[1]
if status_code == 409:
existing_users.append(user_email)
elif status_code == 400:
creation_failed_users.append(user_email)
current_app.logger.error(
f"Exception calling Kratos: {err} on creating user: {user_email} {status_code}")
2022-07-17 19:59:08 +02:00
except Exception as error:
current_app.logger.error(
f"Exception: {error} on creating user: {user_email}")
creation_failed_users.append(user_email)
success_response = {}
existing_response = {}
failed_response = {}
if created_users:
success_response = {"users": created_users,
"message": f"{len(created_users)} users created"}
if existing_users:
existing_response = {
"users": existing_users, "message": f"{len(existing_users)} users already exist: {', '.join(existing_users)}"}
if creation_failed_users:
failed_response = {"users": creation_failed_users,
"message": f"{len(creation_failed_users)} users failed to create: {', '.join(creation_failed_users)}"}
return {"success": success_response, "existing": existing_response, "failed": failed_response}
2022-07-11 15:11:18 +02:00
@staticmethod
def __insertAppRoleToUser(userId, userRes):
2022-05-16 13:44:15 +02:00
apps = App.query.all()
app_roles = []
for app in apps:
2022-05-16 14:01:56 +02:00
tmp_app_role = AppRole.query.filter_by(
user_id=userId, app_id=app.id
).first()
2022-05-16 13:44:15 +02:00
app_roles.append(
{
"name": app.slug,
"role_id": tmp_app_role.role_id if tmp_app_role else None,
}
)
2022-05-16 13:44:15 +02:00
userRes["traits"]["app_roles"] = app_roles
return userRes