mitgliederverwaltung/lib/mv/authorization/checks/has_permission.ex
Moritz 40e75f4066
Some checks reported errors
continuous-integration/drone/push Build was killed
continuous-integration/drone/promote/production Build is passing
refactor: reduce nesting in HasPermission.strict_check_with_permissions
Extract strict_check_filter_scope/4 to satisfy Credo max depth 2.
2026-02-04 13:29:41 +01:00

402 lines
13 KiB
Elixir

defmodule Mv.Authorization.Checks.HasPermission do
@moduledoc """
Custom Ash Policy Check that evaluates permissions from the PermissionSets module.
This check:
1. Reads the actor's role and permission_set_name
2. Looks up permissions from PermissionSets.get_permissions/1
3. Finds matching permission for current resource + action
4. Applies scope filter (:own, :linked, :all)
## Important: strict_check Behavior
For filter-based scopes (`:own`, `:linked`):
- **WITH record**: Evaluates filter against record (returns `true`/`false`)
- **WITHOUT record** (queries/lists): Returns `false`
**Why `false` instead of `:unknown`?**
Ash's policy evaluation doesn't reliably call `auto_filter` when `strict_check`
returns `:unknown`. To ensure list queries work correctly, resources **MUST** use
bypass policies with `expr()` for READ operations (see `docs/policy-bypass-vs-haspermission.md`).
This means `HasPermission` is **NOT** generically reusable for query authorization
with filter scopes - it requires companion bypass policies.
## Usage Pattern
See `docs/policy-bypass-vs-haspermission.md` for the two-tier pattern:
- **READ**: `bypass` with `expr()` (handles auto_filter)
- **UPDATE/CREATE/DESTROY**: `HasPermission` (handles scope evaluation)
## Usage in Ash Resource
policies do
# READ: Bypass for list queries
bypass action_type(:read) do
authorize_if expr(id == ^actor(:id))
end
# UPDATE: HasPermission for scope evaluation
policy action_type([:update, :create, :destroy]) do
authorize_if Mv.Authorization.Checks.HasPermission
end
end
## Scope Behavior
- **:all** - Authorizes without filtering (returns all records)
- **:own** - Filters to records where record.id == actor.id
- **:linked** - Filters based on resource type:
- Member: `id == actor.member_id` (User.member_id → Member.id, inverse relationship)
- CustomFieldValue: `member_id == actor.member_id` (CustomFieldValue.member_id → Member.id → User.member_id)
- MemberGroup: `member_id == actor.member_id` (MemberGroup.member_id → Member.id → User.member_id)
## Error Handling
Returns `false` for:
- Missing actor
- Actor without role
- Invalid permission_set_name
- No matching permission found
All errors result in Forbidden (policy fails).
## Role Loading Fallback
If the actor's `:role` relationship is `%Ash.NotLoaded{}`, this check will
attempt to load it automatically. This provides a fallback if `on_mount` hooks
didn't run (e.g., in non-LiveView contexts).
## Examples
# In a resource policy
policies do
policy action_type([:read, :create, :update, :destroy]) do
authorize_if Mv.Authorization.Checks.HasPermission
end
end
"""
use Ash.Policy.Check
require Ash.Query
import Ash.Expr
alias Mv.Authorization.PermissionSets
require Logger
@impl true
def describe(_opts) do
"checks if actor has permission via their role's permission set"
end
@impl true
def strict_check(actor, authorizer, _opts) do
resource = authorizer.resource
action = get_action_from_authorizer(authorizer)
record = get_record_from_authorizer(authorizer)
cond do
is_nil(actor) ->
log_auth_failure(actor, resource, action, "no actor")
{:ok, false}
is_nil(action) ->
log_auth_failure(
actor,
resource,
action,
"authorizer subject shape unsupported (no action)"
)
{:ok, false}
true ->
strict_check_with_permissions(actor, resource, action, record)
end
end
# Helper function to reduce nesting depth
defp strict_check_with_permissions(actor, resource, action, record) do
# Ensure role is loaded (fallback if on_mount didn't run)
actor = ensure_role_loaded(actor)
with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor,
{:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name),
permissions <- PermissionSets.get_permissions(ps_atom),
resource_name <- get_resource_name(resource) do
case check_permission(
permissions.resources,
resource_name,
action,
actor,
resource_name
) do
:authorized ->
{:ok, true}
{:filter, filter_expr} ->
strict_check_filter_scope(record, filter_expr, actor, resource_name)
false ->
{:ok, false}
end
else
%{role: nil} ->
log_auth_failure(actor, resource, action, "no role assigned")
{:ok, false}
%{role: %{permission_set_name: nil}} ->
log_auth_failure(actor, resource, action, "role has no permission_set_name")
{:ok, false}
{:error, :invalid_permission_set} ->
log_auth_failure(actor, resource, action, "invalid permission_set_name")
{:ok, false}
_ ->
log_auth_failure(actor, resource, action, "missing data")
{:ok, false}
end
end
# For :own/:linked scope: with record evaluate filter; without record deny (resources use bypass + expr).
defp strict_check_filter_scope(record, filter_expr, actor, resource_name) do
if record do
evaluate_filter_for_strict_check(filter_expr, actor, record, resource_name)
else
{:ok, false}
end
end
@impl true
def auto_filter(actor, authorizer, _opts) do
resource = authorizer.resource
action = get_action_from_authorizer(authorizer)
cond do
is_nil(actor) ->
# No actor - deny access (fail-closed)
# Return filter that never matches (expr(false) = match none)
deny_filter()
is_nil(action) ->
# Cannot determine action - deny access (fail-closed)
deny_filter()
true ->
auto_filter_with_permissions(actor, resource, action)
end
end
# Helper function to reduce nesting depth
defp auto_filter_with_permissions(actor, resource, action) do
with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor,
{:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name),
permissions <- PermissionSets.get_permissions(ps_atom),
resource_name <- get_resource_name(resource) do
case check_permission(
permissions.resources,
resource_name,
action,
actor,
resource_name
) do
:authorized ->
# :all scope - allow all records (no filter)
# Return empty keyword list (no filtering)
[]
{:filter, filter_expr} ->
# :linked or :own scope - apply filter
# Create actions must not use HasPermission (use a dedicated check, e.g. CustomFieldValueCreateScope)
filter_expr
false ->
# No permission - deny access (fail-closed)
deny_filter()
end
else
_ ->
# Error case (no role, invalid permission set, etc.) - deny access (fail-closed)
deny_filter()
end
end
# Helper function to return a filter that never matches (deny all records)
# Used when authorization should be denied (fail-closed)
#
# Using `expr(false)` avoids depending on the primary key being named `:id`.
# This is more robust than [id: {:in, []}] which assumes the primary key is `:id`.
defp deny_filter do
expr(false)
end
# Helper to extract action type from authorizer
# CRITICAL: Must use action_type, not action.name!
# Action types: :create, :read, :update, :destroy
# Action names: :create_member, :update_member, etc.
# PermissionSets uses action types, not action names
#
# Prefer authorizer.action.type (stable API) over authorizer.subject (varies by context)
defp get_action_from_authorizer(authorizer) do
# Primary: Use authorizer.action.type (stable API)
case Map.get(authorizer, :action) do
%{type: action_type} when action_type in [:create, :read, :update, :destroy] ->
action_type
_ ->
# Fallback: Try authorizer.subject (for compatibility with different Ash versions/contexts)
case Map.get(authorizer, :subject) do
%{action_type: action_type} when action_type in [:create, :read, :update, :destroy] ->
action_type
%{action: %{type: action_type}}
when action_type in [:create, :read, :update, :destroy] ->
action_type
_ ->
nil
end
end
end
# Helper to extract record from authorizer for strict_check
defp get_record_from_authorizer(authorizer) do
case authorizer.subject do
%{data: data} when not is_nil(data) -> data
_ -> nil
end
end
# Evaluate filter expression for strict_check on single records
# For :own scope with User resource: id == actor.id
# For :linked scope with Member resource: id == actor.member_id
defp evaluate_filter_for_strict_check(_filter_expr, actor, record, resource_name) do
result =
case {resource_name, record} do
# Scope :own
{"User", %{id: user_id}} when not is_nil(user_id) ->
user_id == actor.id
# Scope :linked
{"Member", %{id: member_id}} when not is_nil(member_id) ->
member_id == actor.member_id
{"CustomFieldValue", %{member_id: cfv_member_id}} when not is_nil(cfv_member_id) ->
cfv_member_id == actor.member_id
{"MemberGroup", %{member_id: mg_member_id}} when not is_nil(mg_member_id) ->
mg_member_id == actor.member_id
_ ->
:unknown
end
out = if result == :unknown, do: {:ok, :unknown}, else: {:ok, result}
out
end
# Extract resource name from module (e.g., Mv.Membership.Member -> "Member")
defp get_resource_name(resource) when is_atom(resource) do
resource |> Module.split() |> List.last()
end
# Find matching permission and apply scope
defp check_permission(resource_perms, resource_name, action, actor, resource_name_for_logging) do
case Enum.find(resource_perms, fn perm ->
perm.resource == resource_name and perm.action == action and perm.granted
end) do
nil ->
log_auth_failure(actor, resource_name_for_logging, action, "no matching permission found")
false
perm ->
apply_scope(perm.scope, actor, resource_name)
end
end
# Scope: all - No filtering, access to all records
defp apply_scope(:all, _actor, _resource) do
:authorized
end
# Scope: own - Filter to records where record.id == actor.id
# Used for User resource (users can access their own user record)
defp apply_scope(:own, actor, _resource) do
{:filter, expr(id == ^actor.id)}
end
# Scope: linked - Filter based on user relationship (resource-specific!)
# IMPORTANT: Understand the relationship direction!
# - User belongs_to :member (User.member_id → Member.id)
# - Member has_one :user (inverse, no FK on Member)
defp apply_scope(:linked, actor, resource_name) do
case resource_name do
"Member" ->
# User.member_id → Member.id (inverse relationship). Filter: member.id == actor.member_id
linked_filter_by_member_id(actor, :id)
"CustomFieldValue" ->
# CustomFieldValue.member_id → Member.id → User.member_id
linked_filter_by_member_id(actor, :member_id)
"MemberGroup" ->
# MemberGroup.member_id → Member.id → User.member_id (own linked member's group associations)
linked_filter_by_member_id(actor, :member_id)
"MembershipFeeCycle" ->
# MembershipFeeCycle.member_id → Member.id → User.member_id (own linked member's cycles)
linked_filter_by_member_id(actor, :member_id)
_ ->
# Fallback for other resources
{:filter, expr(user_id == ^actor.id)}
end
end
# Returns {:filter, expr(false)} if actor has no member_id; otherwise {:filter, expr(field == ^actor.member_id)}.
# Used for :linked scope on Member (field :id), CustomFieldValue and MemberGroup (field :member_id).
defp linked_filter_by_member_id(actor, _field) when is_nil(actor.member_id) do
{:filter, expr(false)}
end
defp linked_filter_by_member_id(actor, :id), do: {:filter, expr(id == ^actor.member_id)}
defp linked_filter_by_member_id(actor, :member_id),
do: {:filter, expr(member_id == ^actor.member_id)}
# Log authorization failures for debugging (lazy evaluation)
defp log_auth_failure(actor, resource, action, reason) do
Logger.debug(fn ->
actor_id = if is_map(actor), do: Map.get(actor, :id), else: "nil"
resource_name = get_resource_name_for_logging(resource)
"""
Authorization failed:
Actor: #{actor_id}
Resource: #{resource_name}
Action: #{inspect(action)}
Reason: #{reason}
"""
end)
end
# Helper to extract resource name for logging (handles both atoms and strings)
defp get_resource_name_for_logging(resource) when is_atom(resource) do
resource |> Module.split() |> List.last()
end
defp get_resource_name_for_logging(resource) when is_binary(resource) do
resource
end
defp get_resource_name_for_logging(_resource) do
"unknown"
end
# Fallback: Load role if not loaded (in case on_mount didn't run)
# Delegates to centralized Actor helper
defp ensure_role_loaded(actor) do
Mv.Authorization.Actor.ensure_loaded(actor)
end
end