""" Implement the Kratos model to interact with kratos users """ import json import re import urllib.parse import urllib.request from typing import Dict from urllib.request import Request # Some imports commented out to satisfy pylint. They will be used once more # functions are migrated to this model from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody from ory_kratos_client.model.admin_create_self_service_recovery_link_body \ import AdminCreateSelfServiceRecoveryLinkBody from ory_kratos_client.model.admin_update_identity_body import AdminUpdateIdentityBody from ory_kratos_client.rest import ApiException as KratosApiException from .classes import RedirectFilter from .exceptions import BackendError # pylint: disable=too-many-instance-attributes class KratosUser(): """ The User object, interact with the User. It both calls to Kratos as to the database for storing and retrieving data. """ api = None __uuid = None email = None name = None username = None state = None created_at = None updated_at = None def __init__(self, api, uuid = None): self.api = api self.state = 'active' if uuid: try: obj = api.admin_get_identity(uuid) if obj: self.__uuid = uuid try: self.name = obj.traits['name'] except KeyError: self.name = "" try: self.username = obj.traits['username'] except KeyError: self.username = "" self.email = obj.traits['email'] self.state = obj.state self.created_at = obj.created_at self.updated_at = obj.updated_at except KratosApiException as error: raise BackendError(f"Unable to get entry, kratos replied with: {error}") from error def __repr__(self): return f"\"{self.name}\" <{self.email}>" @property def uuid(self): """Gets the protected UUID propery""" return self.__uuid def save(self): """Saves this object into the kratos backend database. If the object is new, it will create, otherwise update an entry. :raise: BackendError is an error with Kratos happened. """ # Traits are the "profile" values we will set, kratos will complain on # empty values, so we check if "name" is set and only add it if so. traits = {'email':self.email} if self.name: traits['name'] = self.name # If we have a UUID, we are updating if self.__uuid: body = AdminUpdateIdentityBody( schema_id="default", state=self.state, traits=traits, ) try: api_response = self.api.admin_update_identity(self.__uuid, admin_update_identity_body=body) except KratosApiException as error: raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error else: body = AdminCreateIdentityBody( schema_id="default", traits=traits, ) try: # Create an Identity api_response = self.api.admin_create_identity( admin_create_identity_body=body) if api_response.id: self.__uuid = api_response.id except KratosApiException as error: raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error def delete(self): """Deletes the object from kratos :raise: BackendError if Krator API call fails """ if self.__uuid: try: self.api.admin_delete_identity(self.__uuid) return True except KratosApiException as error: raise BackendError( f"Unable to delete entry, kratos replied with: {error}" ) from error return False @staticmethod def find_by_email(api, email): """Queries Kratos to find kratos ID for this given identifier :param api: Kratos ADMIN API Object :param email: Identifier to look for :return: Return none or string with ID """ kratos_id = None # Get out user ID by iterating over all available IDs data = api.admin_list_identities() for kratos_obj in data.value: # Unique identifier we use if kratos_obj.traits['email'] == email: kratos_id = str(kratos_obj.id) return KratosUser(api, kratos_id) return None @staticmethod def find_all(api): """Queries Kratos to find all kratos users and return them as a list of KratosUser objects :return: Return list """ kratos_id = None return_list = [] # Get out user ID by iterating over all available IDs data = api.admin_list_identities() for kratos_obj in data.value: kratos_id = str(kratos_obj.id) return_list.append(KratosUser(api, kratos_id)) return return_list @staticmethod def extract_cookies(cookies): """Extract session and CSRF cookie from a list of cookies. Iterate over a list of cookies and extract the session cookies required for Kratos User Panel UI :param cookies: str[], list of cookies :return: Cookies concatenated as string :rtype: str """ # Find kratos session cookie & csrf cookie_csrf = None cookie_session = None for cookie in cookies: search = re.match(r'ory_kratos_session=([^;]*);.*$', cookie) if search: cookie_session = "ory_kratos_session=" + search.group(1) search = re.match(r'(csrf_token[^;]*);.*$', cookie) if search: cookie_csrf = search.group(1) if not cookie_csrf or not cookie_session: raise BackendError("Flow started, but expected cookies not found") # Combined the relevant cookies cookie = cookie_csrf + "; " + cookie_session return cookie def get_recovery_link(self): """Call the kratos API to create a recovery URL for a kratos ID :param: api Kratos ADMIN API Object :param: kratos_id UUID of kratos object :return: Return none or string with recovery URL """ try: # Create body request to get recovery link with admin API body = AdminCreateSelfServiceRecoveryLinkBody( expires_in="15m", identity_id=self.__uuid ) # Get recovery link from admin API call = self.api.admin_create_self_service_recovery_link( admin_create_self_service_recovery_link_body=body) url = call.recovery_link except KratosApiException: return None return url def ui_set_password(self, api_url, recovery_url, password): """Follow a Kratos UI sequence to set password Kratos does not provide an interface to set a password directly. However we still can set a password by following the UI sequence. To so so we to follow the steps which are normally done in a browser once someone clicks the recovery link. :param: api_url URL to public endpoint of API :param: recovery_url Recovery URL as generated by Kratos :param: password Password :raise: Exception with error message as first argument :return: boolean True on success, False on failure (usualy password to simple) """ # Step 1: Open the recovery link and extract the cookies, as we need them # for the next steps try: # We override the default Redirect handler with our custom handler to # be able to catch the cookies. opener = urllib.request.build_opener(RedirectFilter) # We rewrite the URL we got. It can be we run this from an enviroment # with different KRATUS_PUBLIC_URL API endpoint then kratos provide # itself. For example in the case running as a job to create an admin # account before TLS is setup/working search = re.match(r'.*(self-service.recovery.flow.*)$', recovery_url) if search: recovery_url = api_url + search.group(1) else: raise BackendError('Did not find recovery flow') opener.open(recovery_url) # If we do not have a 2xx status, urllib throws an error, as we "stopped" # at our redirect, we expect a 3xx status except urllib.error.HTTPError as http_error: # Kratos pre-0.8 returned 302, kratos 0.8 returns 303 if http_error.status in (302, 303): # Get the cookie and redirect location from the response # headers cookies = http_error.headers.get_all('Set-Cookie') url = http_error.headers.get('Location') else: raise BackendError('Unable to fetch recovery link') from http_error else: raise BackendError('Recovery link returned unexpected data') # Step 2: Extract cookies and data for next step. We expect to have an # authorized session now. We need the cookies for followup calls # to make changes to the account (set password) # Get flow id search = re.match(r'.*\?flow=(.*)', url) if search: flow = search.group(1) else: raise BackendError('No Flow ID found for recovery sequence') # Extract cookies with helper function cookie = self.extract_cookies(cookies) # Step 3: Get the "UI", kratos expect us to call the API to get the UI # elements which contains the CSRF token, which is needed when # posting the password data try: url = api_url + "/self-service/settings/flows?id=" + flow req = Request(url, headers={'Cookie':cookie}) opener = urllib.request.build_opener() # Execute the request, read the data, decode the JSON, get the # right CSRF token out of the decoded JSON obj = json.loads(opener.open(req).read()) csrf_token = obj['ui']['nodes'][0]['attributes']['value'] except Exception as error: raise BackendError("Unable to get password reset UI") from error # Step 4: Post out password url = api_url + "self-service/settings?flow=" + flow # Create POST data as form data data = { 'method': 'password', 'password': password, 'csrf_token': csrf_token } data = urllib.parse.urlencode(data) data = data.encode('ascii') # POST the new password try: req = Request(url, data = data, headers={'Cookie':cookie}, method="POST") opener = urllib.request.build_opener(RedirectFilter) opener.open(req) # If we do not have a 2xx status, urllib throws an error, as we "stopped" # at our redirect, we expect a 3xx status except urllib.error.HTTPError as http_error: # Kratos pre-0.8 returned 302, kratos 0.8 returns 303 if http_error.status in (302, 303): # Kratos only sends HTTP codes after our submission. We should # now call the `settings` endpoint to see if our call # succeeded, or else, if there are any messages about why it # failed try: url = api_url + "/self-service/settings/flows?id=" + flow req = Request(url, headers={'Cookie':cookie, "Accept": "application/json"}) opener = urllib.request.build_opener() # Execute the request, read the data, decode the JSON obj = json.loads(opener.open(req).read()) # If the 'state' has changed to 'success', the password was # set successfully if obj['state'] == 'success': return True # Failure: we check if there are error messages for node in obj['ui']['nodes']: if node['messages']: print(f"Problems with field '{node['meta']['label']['text']}':") for message in node['messages']: print(message['text']) raise BackendError("Password not set") from http_error except Exception as error: raise BackendError("Unable to get password reset UI") from error return False raise BackendError("Unable to set password by submitting form") # Pylint complains about app not used. That is correct, but we will use that # in the future. Ignore this error # pylint: disable=unused-argument def get_claims(self, app, roles, mapping=None) -> Dict[str, Dict[str, str]]: """Create openID Connect token Use the userdata stored in the user object to create an OpenID Connect token. The token returned by this function can be passed to Hydra, which will store it and serve it to OpenID Connect Clients to retrieve user information. If you need to relabel a field pass an array of tuples to mapping. Example: getClaims('nextcloud', mapping=[("name", "username"),("roles", "groups")]) Attributes: appname - Name or ID of app to connect to roles - List of roles to add to the `stackspin_roles` claim mapping - Mapping of the fields Returns: OpenID Connect token of type dict """ # Name should be set, however, we do not enforce this yet. # if somebody does not set it's name, we use the email address # as name if self.name: name = self.name else: name = self.email if self.username: username = self.username else: username = self.email token = { "name": name, "preferred_username": username, "email": self.email, "stackspin_roles": roles, } # Relabel field names if mapping: for old_field_name, new_field_name in mapping: token[new_field_name] = token[old_field_name] del token[old_field_name] return dict(id_token=token)