defmodule Mv.Authorization.Checks.HasPermission do @moduledoc """ Custom Ash Policy Check that evaluates permissions from the PermissionSets module. This check: 1. Reads the actor's role and permission_set_name 2. Looks up permissions from PermissionSets.get_permissions/1 3. Finds matching permission for current resource + action 4. Applies scope filter (:own, :linked, :all) ## Usage in Ash Resource policies do policy action_type(:read) do authorize_if Mv.Authorization.Checks.HasPermission end end ## Scope Behavior - **:all** - Authorizes without filtering (returns all records) - **:own** - Filters to records where record.id == actor.id - **:linked** - Filters based on resource type: - Member: member.user.id == actor.id (via has_one :user relationship) - CustomFieldValue: custom_field_value.member.user.id == actor.id (traverses member → user relationship!) ## Error Handling Returns `false` for: - Missing actor - Actor without role - Invalid permission_set_name - No matching permission found All errors result in Forbidden (policy fails). ## Examples # In a resource policy policies do policy action_type([:read, :create, :update, :destroy]) do authorize_if Mv.Authorization.Checks.HasPermission end end """ use Ash.Policy.Check require Ash.Query import Ash.Expr alias Mv.Authorization.PermissionSets require Logger @impl true def describe(_opts) do "checks if actor has permission via their role's permission set" end @impl true def strict_check(actor, authorizer, _opts) do resource = authorizer.resource action = get_action_from_authorizer(authorizer) cond do is_nil(actor) -> log_auth_failure(actor, resource, action, "no actor") {:ok, false} is_nil(action) -> log_auth_failure( actor, resource, action, "authorizer subject shape unsupported (no action)" ) {:ok, false} true -> strict_check_with_permissions(actor, resource, action) end end # Helper function to reduce nesting depth defp strict_check_with_permissions(actor, resource, action) do with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor, {:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name), permissions <- PermissionSets.get_permissions(ps_atom), resource_name <- get_resource_name(resource) do case check_permission( permissions.resources, resource_name, action, actor, resource_name ) do :authorized -> {:ok, true} {:filter, _} -> {:ok, :unknown} false -> {:ok, false} end else %{role: nil} -> log_auth_failure(actor, resource, action, "no role assigned") {:ok, false} %{role: %{permission_set_name: nil}} -> log_auth_failure(actor, resource, action, "role has no permission_set_name") {:ok, false} {:error, :invalid_permission_set} -> log_auth_failure(actor, resource, action, "invalid permission_set_name") {:ok, false} _ -> log_auth_failure(actor, resource, action, "missing data") {:ok, false} end end @impl true def auto_filter(actor, authorizer, _opts) do resource = authorizer.resource action = get_action_from_authorizer(authorizer) cond do is_nil(actor) -> nil is_nil(action) -> nil true -> auto_filter_with_permissions(actor, resource, action) end end # Helper function to reduce nesting depth defp auto_filter_with_permissions(actor, resource, action) do with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor, {:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name), permissions <- PermissionSets.get_permissions(ps_atom), resource_name <- get_resource_name(resource) do case check_permission( permissions.resources, resource_name, action, actor, resource_name ) do :authorized -> nil {:filter, filter_expr} -> filter_expr false -> nil end else _ -> nil end end # Helper to extract action from authorizer defp get_action_from_authorizer(authorizer) do case authorizer.subject do %{action: %{name: action}} -> action %{action: action} when is_atom(action) -> action _ -> nil end end # Extract resource name from module (e.g., Mv.Membership.Member -> "Member") defp get_resource_name(resource) when is_atom(resource) do resource |> Module.split() |> List.last() end # Find matching permission and apply scope defp check_permission(resource_perms, resource_name, action, actor, resource_name_for_logging) do case Enum.find(resource_perms, fn perm -> perm.resource == resource_name and perm.action == action and perm.granted end) do nil -> log_auth_failure(actor, resource_name_for_logging, action, "no matching permission found") false perm -> apply_scope(perm.scope, actor, resource_name) end end # Scope: all - No filtering, access to all records defp apply_scope(:all, _actor, _resource) do :authorized end # Scope: own - Filter to records where record.id == actor.id # Used for User resource (users can access their own user record) defp apply_scope(:own, actor, _resource) do {:filter, expr(id == ^actor.id)} end # Scope: linked - Filter based on user relationship (resource-specific!) # Uses Ash relationships: Member has_one :user, CustomFieldValue belongs_to :member defp apply_scope(:linked, actor, resource_name) do case resource_name do "Member" -> # Member has_one :user → filter by user.id == actor.id {:filter, expr(user.id == ^actor.id)} "CustomFieldValue" -> # CustomFieldValue belongs_to :member → member has_one :user # Traverse: custom_field_value.member.user.id == actor.id {:filter, expr(member.user.id == ^actor.id)} _ -> # Fallback for other resources: try user relationship first, then user_id {:filter, expr(user.id == ^actor.id or user_id == ^actor.id)} end end # Log authorization failures for debugging (lazy evaluation) defp log_auth_failure(actor, resource, action, reason) do Logger.debug(fn -> actor_id = if is_map(actor), do: Map.get(actor, :id), else: "nil" resource_name = get_resource_name_for_logging(resource) """ Authorization failed: Actor: #{actor_id} Resource: #{resource_name} Action: #{inspect(action)} Reason: #{reason} """ end) end # Helper to extract resource name for logging (handles both atoms and strings) defp get_resource_name_for_logging(resource) when is_atom(resource) do resource |> Module.split() |> List.last() end defp get_resource_name_for_logging(resource) when is_binary(resource) do resource end defp get_resource_name_for_logging(_resource) do "unknown" end end