Merge branch 'main' into feat/api-role-permission-layer

This commit is contained in:
Maarten de Waard 2022-06-15 14:19:44 +02:00
commit 3eea6ab2bf
No known key found for this signature in database
GPG key ID: 1D3E893A657CC8DA
3 changed files with 104 additions and 7 deletions

View file

@ -0,0 +1,12 @@
from .models import App, AppRole
class AppsService:
@staticmethod
def get_apps():
apps = App.query.all()
return [{"id": app.id, "name": app.name, "slug": app.slug} for app in apps]
@staticmethod
def get_app_roles():
app_roles = AppRole.query.all()
return [{"user_id": app_role.user_id, "app_id": app_role.app_id, "role_id": app_role.role_id} for app_role in app_roles]

View file

@ -0,0 +1,76 @@
"""update apps and add 'user' and 'no access' role
Revision ID: b514cca2d47b
Revises: 5f462d2d9d25
Create Date: 2022-06-08 17:24:51.305129
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b514cca2d47b'
down_revision = '5f462d2d9d25'
branch_labels = None
depends_on = None
def upgrade():
# ### end Alembic commands ###
# Check and update app table in DB
apps = {
"dashboard": "Dashboard",
"wekan": "Wekan",
"wordpress": "WordPress",
"nextcloud": "Nextcloud",
"zulip": "Zulip"
}
# app table
app_table = sa.table('app', sa.column('id', sa.Integer), sa.column(
'name', sa.String), sa.column('slug', sa.String))
existing_apps = op.get_bind().execute(app_table.select()).fetchall()
existing_app_slugs = [app['slug'] for app in existing_apps]
for app_slug in apps.keys():
if app_slug in existing_app_slugs:
op.execute(f'UPDATE app SET `name` = "{apps.get(app_slug)}" WHERE slug = "{app_slug}"')
else:
op.execute(f'INSERT INTO app (`name`, slug) VALUES ("{apps.get(app_slug)}","{app_slug}")')
# Fetch all apps including newly created
existing_apps = op.get_bind().execute(app_table.select()).fetchall()
# Insert role "user" as ID 2
op.execute("INSERT INTO `role` (id, `name`) VALUES (2, 'user')")
# Insert role "no access" as ID 3
op.execute("INSERT INTO `role` (id, `name`) VALUES (3, 'no access')")
# Set role_id 2 to all current "user" users which by have NULL role ID
op.execute("UPDATE app_role SET role_id = 2 WHERE role_id IS NULL")
# Add 'no access' role for all users that don't have any roles for specific apps
app_roles_table = sa.table('app_role', sa.column('user_id', sa.String), sa.column(
'app_id', sa.Integer), sa.column('role_id', sa.Integer))
app_ids = [app['id'] for app in existing_apps]
app_roles = op.get_bind().execute(app_roles_table.select()).fetchall()
user_ids = set([app_role['user_id'] for app_role in app_roles])
for user_id in user_ids:
existing_user_app_ids = [x['app_id'] for x in list(filter(lambda role: role['user_id'] == user_id, app_roles))]
missing_user_app_ids = [x for x in app_ids if x not in existing_user_app_ids]
if len(missing_user_app_ids) > 0:
values = [{'user_id': user_id, 'app_id': app_id, 'role_id': 3} for app_id in missing_user_app_ids]
op.bulk_insert(app_roles_table, values)
def downgrade():
# Revert all users role_id to NULL where role is 'user'
op.execute("UPDATE app_role SET role_id = NULL WHERE role_id = 2")
# Delete role 'user' from roles
op.execute("DELETE FROM `role` WHERE id = 2")
# Delete all user app roles where role is 'no access' with role_id 3
op.execute("DELETE FROM app_role WHERE role_id = 3")
# Delete role 'no access' from roles
op.execute("DELETE FROM `role` WHERE id = 3")

View file

@ -256,17 +256,26 @@ def consent():
# Default access level
roles = []
if app_obj:
role_objects = (
role_object = (
db.session.query(AppRole)
.filter(AppRole.app_id == app_obj.id)
.filter(AppRole.user_id == user.uuid)
.first()
)
for role_obj in role_objects:
app_role = RoleService.get_role_by_id(role_obj.role_id)
if (app_role is None):
roles.append('user')
continue
roles.append(app_role.name)
# Role ID 3 is always "No access" due to migration b514cca2d47b
if role_object is None or role_object.role_id is None or role_object.role_id == 3:
# If there is no role in app_roles or the role_id for an app is null user has no permissions
current_app.logger.error(f"User has no access for: {app_obj.name}")
return redirect(
consent_request.reject(
error="No access",
error_description="The user has no access for app",
error_hint="Contact your administrator",
status_code=401,
)
)
else:
roles.append(role_object.role.name)
current_app.logger.info(f"Using '{roles}' when applying consent for {kratos_id}")