dashboard/areas/users/user_service.py
2022-07-21 10:51:14 +02:00

132 lines
4.4 KiB
Python

import ory_kratos_client
from ory_kratos_client.model.submit_self_service_recovery_flow_body \
import SubmitSelfServiceRecoveryFlowBody
from ory_kratos_client.api import v0alpha2_api as kratos_api
from config import KRATOS_ADMIN_URL
from database import db
from areas.apps.models import App, AppRole
from areas.roles.role_service import RoleService
from helpers import KratosApi
tmp = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
class UserService:
@staticmethod
def get_users():
res = KratosApi.get("/admin/identities").json()
userList = []
for r in res:
userList.append(UserService.__insertAppRoleToUser(r["id"], r))
return userList
@staticmethod
def get_user(id):
res = KratosApi.get("/admin/identities/{}".format(id)).json()
return UserService.__insertAppRoleToUser(id, res)
@staticmethod
def post_user(data):
kratos_data = {
"schema_id": "default",
"traits": {
"name": data["name"],
"email": data["email"],
},
}
res = KratosApi.post("/admin/identities", kratos_data).json()
if data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole(
user_id=res["id"],
role_id=ar["role_id"] if "role_id" in ar else None,
app_id=app.id,
)
db.session.add(app_role)
db.session.commit()
UserService.__start_user_recovery_flow(data["email"])
return UserService.get_user(res["id"])
@staticmethod
def __start_user_recovery_flow(email):
"""
Start a Kratos recovery flow for the user's email address.
This sends out an email to the user that explains to them how they can
set their password.
:param email: Email to send recovery link to
:type email: str
"""
api_response = KRATOS_ADMIN.initialize_self_service_recovery_flow_without_browser()
flow = api_response['id']
# Submit the recovery flow to send an email to the new user.
submit_self_service_recovery_flow_body = \
SubmitSelfServiceRecoveryFlowBody(method="link", email=email)
api_response = KRATOS_ADMIN.submit_self_service_recovery_flow(flow,
submit_self_service_recovery_flow_body=
submit_self_service_recovery_flow_body)
@staticmethod
def put_user(id, user_editing_id, data):
kratos_data = {
"schema_id": "default",
"traits": {"email": data["email"], "name": data["name"]},
}
KratosApi.put("/admin/identities/{}".format(id), kratos_data)
is_admin = RoleService.is_user_admin(user_editing_id)
if is_admin and data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole.query.filter_by(user_id=id, app_id=app.id).first()
if app_role:
app_role.role_id = ar["role_id"] if "role_id" in ar else None
db.session.commit()
else:
appRole = AppRole(
user_id=id,
role_id=ar["role_id"] if "role_id" in ar else None,
app_id=app.id,
)
db.session.add(appRole)
db.session.commit()
return UserService.get_user(id)
@staticmethod
def delete_user(id):
app_role = AppRole.query.filter_by(user_id=id).all()
for ar in app_role:
db.session.delete(ar)
db.session.commit()
@staticmethod
def __insertAppRoleToUser(userId, userRes):
apps = App.query.all()
app_roles = []
for app in apps:
tmp_app_role = AppRole.query.filter_by(
user_id=userId, app_id=app.id
).first()
app_roles.append(
{
"name": app.slug,
"role_id": tmp_app_role.role_id if tmp_app_role else None,
}
)
userRes["traits"]["app_roles"] = app_roles
return userRes