diff --git a/areas/apps/apps_service.py b/areas/apps/apps_service.py new file mode 100644 index 0000000..e48d588 --- /dev/null +++ b/areas/apps/apps_service.py @@ -0,0 +1,12 @@ +from .models import App, AppRole + +class AppsService: + @staticmethod + def get_apps(): + apps = App.query.all() + return [{"id": app.id, "name": app.name, "slug": app.slug} for app in apps] + + @staticmethod + def get_app_roles(): + app_roles = AppRole.query.all() + return [{"user_id": app_role.user_id, "app_id": app_role.app_id, "role_id": app_role.role_id} for app_role in app_roles] \ No newline at end of file diff --git a/migrations/versions/b514cca2d47b_add_user_role.py b/migrations/versions/b514cca2d47b_add_user_role.py new file mode 100644 index 0000000..0586942 --- /dev/null +++ b/migrations/versions/b514cca2d47b_add_user_role.py @@ -0,0 +1,76 @@ +"""update apps and add 'user' and 'no access' role + +Revision ID: b514cca2d47b +Revises: 5f462d2d9d25 +Create Date: 2022-06-08 17:24:51.305129 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'b514cca2d47b' +down_revision = '5f462d2d9d25' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### end Alembic commands ### + + # Check and update app table in DB + apps = { + "dashboard": "Dashboard", + "wekan": "Wekan", + "wordpress": "WordPress", + "nextcloud": "Nextcloud", + "zulip": "Zulip" + } + # app table + app_table = sa.table('app', sa.column('id', sa.Integer), sa.column( + 'name', sa.String), sa.column('slug', sa.String)) + + existing_apps = op.get_bind().execute(app_table.select()).fetchall() + existing_app_slugs = [app['slug'] for app in existing_apps] + for app_slug in apps.keys(): + if app_slug in existing_app_slugs: + op.execute(f'UPDATE app SET `name` = "{apps.get(app_slug)}" WHERE slug = "{app_slug}"') + else: + op.execute(f'INSERT INTO app (`name`, slug) VALUES ("{apps.get(app_slug)}","{app_slug}")') + + # Fetch all apps including newly created + existing_apps = op.get_bind().execute(app_table.select()).fetchall() + # Insert role "user" as ID 2 + op.execute("INSERT INTO `role` (id, `name`) VALUES (2, 'user')") + # Insert role "no access" as ID 3 + op.execute("INSERT INTO `role` (id, `name`) VALUES (3, 'no access')") + # Set role_id 2 to all current "user" users which by have NULL role ID + op.execute("UPDATE app_role SET role_id = 2 WHERE role_id IS NULL") + + # Add 'no access' role for all users that don't have any roles for specific apps + app_roles_table = sa.table('app_role', sa.column('user_id', sa.String), sa.column( + 'app_id', sa.Integer), sa.column('role_id', sa.Integer)) + + app_ids = [app['id'] for app in existing_apps] + app_roles = op.get_bind().execute(app_roles_table.select()).fetchall() + user_ids = set([app_role['user_id'] for app_role in app_roles]) + + for user_id in user_ids: + existing_user_app_ids = [x['app_id'] for x in list(filter(lambda role: role['user_id'] == user_id, app_roles))] + missing_user_app_ids = [x for x in app_ids if x not in existing_user_app_ids] + + if len(missing_user_app_ids) > 0: + values = [{'user_id': user_id, 'app_id': app_id, 'role_id': 3} for app_id in missing_user_app_ids] + op.bulk_insert(app_roles_table, values) + + +def downgrade(): + # Revert all users role_id to NULL where role is 'user' + op.execute("UPDATE app_role SET role_id = NULL WHERE role_id = 2") + # Delete role 'user' from roles + op.execute("DELETE FROM `role` WHERE id = 2") + + # Delete all user app roles where role is 'no access' with role_id 3 + op.execute("DELETE FROM app_role WHERE role_id = 3") + # Delete role 'no access' from roles + op.execute("DELETE FROM `role` WHERE id = 3") diff --git a/web/login/login.py b/web/login/login.py index e33b974..ef54a18 100644 --- a/web/login/login.py +++ b/web/login/login.py @@ -256,17 +256,26 @@ def consent(): # Default access level roles = [] if app_obj: - role_objects = ( + role_object = ( db.session.query(AppRole) .filter(AppRole.app_id == app_obj.id) .filter(AppRole.user_id == user.uuid) + .first() ) - for role_obj in role_objects: - app_role = RoleService.get_role_by_id(role_obj.role_id) - if (app_role is None): - roles.append('user') - continue - roles.append(app_role.name) + # Role ID 3 is always "No access" due to migration b514cca2d47b + if role_object is None or role_object.role_id is None or role_object.role_id == 3: + # If there is no role in app_roles or the role_id for an app is null user has no permissions + current_app.logger.error(f"User has no access for: {app_obj.name}") + return redirect( + consent_request.reject( + error="No access", + error_description="The user has no access for app", + error_hint="Contact your administrator", + status_code=401, + ) + ) + else: + roles.append(role_object.role.name) current_app.logger.info(f"Using '{roles}' when applying consent for {kratos_id}")