All checks were successful
continuous-integration/drone/push Build is passing
Implement custom Ash Policy Check that reads permissions from PermissionSets module and applies scope filters to Ash queries.
203 lines
6.2 KiB
Elixir
203 lines
6.2 KiB
Elixir
defmodule Mv.Authorization.Checks.HasPermission do
|
|
@moduledoc """
|
|
Custom Ash Policy Check that evaluates permissions from the PermissionSets module.
|
|
|
|
This check:
|
|
1. Reads the actor's role and permission_set_name
|
|
2. Looks up permissions from PermissionSets.get_permissions/1
|
|
3. Finds matching permission for current resource + action
|
|
4. Applies scope filter (:own, :linked, :all)
|
|
|
|
## Usage in Ash Resource
|
|
|
|
policies do
|
|
policy action_type(:read) do
|
|
authorize_if Mv.Authorization.Checks.HasPermission
|
|
end
|
|
end
|
|
|
|
## Scope Behavior
|
|
|
|
- **:all** - Authorizes without filtering (returns all records)
|
|
- **:own** - Filters to records where record.id == actor.id
|
|
- **:linked** - Filters based on resource type:
|
|
- Member: member.user_id == actor.id
|
|
- CustomFieldValue: custom_field_value.member.user_id == actor.id (traverses relationship!)
|
|
|
|
## Error Handling
|
|
|
|
Returns `false` for:
|
|
- Missing actor
|
|
- Actor without role
|
|
- Invalid permission_set_name
|
|
- No matching permission found
|
|
|
|
All errors result in Forbidden (policy fails).
|
|
|
|
## Examples
|
|
|
|
# In a resource policy
|
|
policies do
|
|
policy action_type([:read, :create, :update, :destroy]) do
|
|
authorize_if Mv.Authorization.Checks.HasPermission
|
|
end
|
|
end
|
|
"""
|
|
|
|
use Ash.Policy.Check
|
|
require Ash.Query
|
|
import Ash.Expr
|
|
alias Mv.Authorization.PermissionSets
|
|
require Logger
|
|
|
|
@impl true
|
|
def describe(_opts) do
|
|
"checks if actor has permission via their role's permission set"
|
|
end
|
|
|
|
@impl true
|
|
def strict_check(actor, authorizer, _opts) do
|
|
resource = authorizer.resource
|
|
action = get_action_from_authorizer(authorizer)
|
|
|
|
# Explicit nil check first (fail fast, clear error message)
|
|
if is_nil(actor) do
|
|
log_auth_failure(actor, resource, action, "no actor")
|
|
{:ok, false}
|
|
else
|
|
with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor,
|
|
{:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name),
|
|
permissions <- PermissionSets.get_permissions(ps_atom),
|
|
resource_name <- get_resource_name(resource) do
|
|
case check_permission(permissions.resources, resource_name, action, actor, resource_name) do
|
|
:authorized -> {:ok, true}
|
|
{:filter, _} -> {:ok, :unknown}
|
|
false -> {:ok, false}
|
|
end
|
|
else
|
|
%{role: nil} ->
|
|
log_auth_failure(actor, resource, action, "no role assigned")
|
|
{:ok, false}
|
|
|
|
%{role: %{permission_set_name: nil}} ->
|
|
log_auth_failure(actor, resource, action, "role has no permission_set_name")
|
|
{:ok, false}
|
|
|
|
{:error, :invalid_permission_set} ->
|
|
log_auth_failure(actor, resource, action, "invalid permission_set_name")
|
|
{:ok, false}
|
|
|
|
_ ->
|
|
log_auth_failure(actor, resource, action, "missing data")
|
|
{:ok, false}
|
|
end
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def auto_filter(actor, authorizer, _opts) do
|
|
resource = authorizer.resource
|
|
action = get_action_from_authorizer(authorizer)
|
|
|
|
# Explicit nil check first
|
|
if is_nil(actor) do
|
|
nil
|
|
else
|
|
with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor,
|
|
{:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name),
|
|
permissions <- PermissionSets.get_permissions(ps_atom),
|
|
resource_name <- get_resource_name(resource) do
|
|
case check_permission(permissions.resources, resource_name, action, actor, resource_name) do
|
|
:authorized -> nil
|
|
{:filter, filter_expr} -> filter_expr
|
|
false -> nil
|
|
end
|
|
else
|
|
_ -> nil
|
|
end
|
|
end
|
|
end
|
|
|
|
# Helper to extract action from authorizer
|
|
defp get_action_from_authorizer(authorizer) do
|
|
case authorizer.subject do
|
|
%{action: %{name: action}} -> action
|
|
%{action: action} when is_atom(action) -> action
|
|
_ -> nil
|
|
end
|
|
end
|
|
|
|
# Extract resource name from module (e.g., Mv.Membership.Member -> "Member")
|
|
defp get_resource_name(resource) when is_atom(resource) do
|
|
resource |> Module.split() |> List.last()
|
|
end
|
|
|
|
# Find matching permission and apply scope
|
|
defp check_permission(resource_perms, resource_name, action, actor, resource_module_name) do
|
|
case Enum.find(resource_perms, fn perm ->
|
|
perm.resource == resource_name and perm.action == action and perm.granted
|
|
end) do
|
|
nil ->
|
|
log_auth_failure(actor, resource_module_name, action, "no matching permission found")
|
|
false
|
|
|
|
perm ->
|
|
apply_scope(perm.scope, actor, resource_name)
|
|
end
|
|
end
|
|
|
|
# Scope: all - No filtering, access to all records
|
|
defp apply_scope(:all, _actor, _resource) do
|
|
:authorized
|
|
end
|
|
|
|
# Scope: own - Filter to records where record.id == actor.id
|
|
# Used for User resource (users can access their own user record)
|
|
defp apply_scope(:own, actor, _resource) do
|
|
{:filter, expr(id == ^actor.id)}
|
|
end
|
|
|
|
# Scope: linked - Filter based on user_id relationship (resource-specific!)
|
|
defp apply_scope(:linked, actor, resource_name) do
|
|
case resource_name do
|
|
"Member" ->
|
|
# Member.user_id == actor.id (direct relationship)
|
|
{:filter, expr(user_id == ^actor.id)}
|
|
|
|
"CustomFieldValue" ->
|
|
# CustomFieldValue.member.user_id == actor.id (traverse through member!)
|
|
{:filter, expr(member.user_id == ^actor.id)}
|
|
|
|
_ ->
|
|
# Fallback for other resources: try direct user_id
|
|
{:filter, expr(user_id == ^actor.id)}
|
|
end
|
|
end
|
|
|
|
# Log authorization failures for debugging
|
|
defp log_auth_failure(actor, resource, action, reason) do
|
|
actor_id = if is_map(actor), do: Map.get(actor, :id), else: "nil"
|
|
resource_name = get_resource_name_for_logging(resource)
|
|
|
|
Logger.debug("""
|
|
Authorization failed:
|
|
Actor: #{actor_id}
|
|
Resource: #{resource_name}
|
|
Action: #{action}
|
|
Reason: #{reason}
|
|
""")
|
|
end
|
|
|
|
# Helper to extract resource name for logging (handles both atoms and strings)
|
|
defp get_resource_name_for_logging(resource) when is_atom(resource) do
|
|
resource |> Module.split() |> List.last()
|
|
end
|
|
|
|
defp get_resource_name_for_logging(resource) when is_binary(resource) do
|
|
resource
|
|
end
|
|
|
|
defp get_resource_name_for_logging(_resource) do
|
|
"unknown"
|
|
end
|
|
end
|