Merge branch 'main' into feat/batch-create-users

This commit is contained in:
Maarten de Waard 2022-07-21 17:08:41 +02:00
commit 2baae3e61e
No known key found for this signature in database
GPG key ID: 1D3E893A657CC8DA
4 changed files with 54 additions and 16 deletions

View file

@ -1,3 +1,9 @@
import ory_kratos_client
from ory_kratos_client.model.submit_self_service_recovery_flow_body \
import SubmitSelfServiceRecoveryFlowBody
from ory_kratos_client.api import v0alpha2_api as kratos_api
from config import KRATOS_ADMIN_URL
from database import db
from areas.apps import App, AppRole, AppsService
from areas.roles import Role, RoleService
@ -7,6 +13,10 @@ from flask import current_app
from helpers.error_handler import KratosError
kratos_admin_api_configuration = \
ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True)
KRATOS_ADMIN = \
kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(kratos_admin_api_configuration))
class UserService:
@staticmethod
@ -27,7 +37,10 @@ class UserService:
def post_user(data):
kratos_data = {
"schema_id": "default",
"traits": {"email": data["email"], "name": data["name"]},
"traits": {
"name": data["name"],
"email": data["email"],
},
}
res = KratosApi.post("/admin/identities", kratos_data).json()
@ -55,8 +68,32 @@ class UserService:
db.session.add(app_role)
db.session.commit()
UserService.__start_recovery_flow(data["email"])
return UserService.get_user(res["id"])
@staticmethod
def __start_recovery_flow(email):
"""
Start a Kratos recovery flow for the user's email address.
This sends out an email to the user that explains to them how they can
set their password. Make sure the user exists inside Kratos before you
use this function.
:param email: Email to send recovery link to
:type email: str
"""
api_response = KRATOS_ADMIN.initialize_self_service_recovery_flow_without_browser()
flow = api_response['id']
# Submit the recovery flow to send an email to the new user.
submit_self_service_recovery_flow_body = \
SubmitSelfServiceRecoveryFlowBody(method="link", email=email)
api_response = KRATOS_ADMIN.submit_self_service_recovery_flow(flow,
submit_self_service_recovery_flow_body=
submit_self_service_recovery_flow_body)
@staticmethod
def put_user(id, user_editing_id, data):
kratos_data = {