Merge pull request 'Custom Policy Check - HasPermission closes #343' (#344) from feature/343_haspermission into main
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #344
This commit is contained in:
moritz 2026-01-08 18:05:14 +01:00
commit 35aff50bea
3 changed files with 590 additions and 0 deletions

View file

@ -0,0 +1,239 @@
defmodule Mv.Authorization.Checks.HasPermission do
@moduledoc """
Custom Ash Policy Check that evaluates permissions from the PermissionSets module.
This check:
1. Reads the actor's role and permission_set_name
2. Looks up permissions from PermissionSets.get_permissions/1
3. Finds matching permission for current resource + action
4. Applies scope filter (:own, :linked, :all)
## Usage in Ash Resource
policies do
policy action_type(:read) do
authorize_if Mv.Authorization.Checks.HasPermission
end
end
## Scope Behavior
- **:all** - Authorizes without filtering (returns all records)
- **:own** - Filters to records where record.id == actor.id
- **:linked** - Filters based on resource type:
- Member: member.user.id == actor.id (via has_one :user relationship)
- CustomFieldValue: custom_field_value.member.user.id == actor.id (traverses member user relationship!)
## Error Handling
Returns `false` for:
- Missing actor
- Actor without role
- Invalid permission_set_name
- No matching permission found
All errors result in Forbidden (policy fails).
## Examples
# In a resource policy
policies do
policy action_type([:read, :create, :update, :destroy]) do
authorize_if Mv.Authorization.Checks.HasPermission
end
end
"""
use Ash.Policy.Check
require Ash.Query
import Ash.Expr
alias Mv.Authorization.PermissionSets
require Logger
@impl true
def describe(_opts) do
"checks if actor has permission via their role's permission set"
end
@impl true
def strict_check(actor, authorizer, _opts) do
resource = authorizer.resource
action = get_action_from_authorizer(authorizer)
cond do
is_nil(actor) ->
log_auth_failure(actor, resource, action, "no actor")
{:ok, false}
is_nil(action) ->
log_auth_failure(
actor,
resource,
action,
"authorizer subject shape unsupported (no action)"
)
{:ok, false}
true ->
strict_check_with_permissions(actor, resource, action)
end
end
# Helper function to reduce nesting depth
defp strict_check_with_permissions(actor, resource, action) do
with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor,
{:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name),
permissions <- PermissionSets.get_permissions(ps_atom),
resource_name <- get_resource_name(resource) do
case check_permission(
permissions.resources,
resource_name,
action,
actor,
resource_name
) do
:authorized -> {:ok, true}
{:filter, _} -> {:ok, :unknown}
false -> {:ok, false}
end
else
%{role: nil} ->
log_auth_failure(actor, resource, action, "no role assigned")
{:ok, false}
%{role: %{permission_set_name: nil}} ->
log_auth_failure(actor, resource, action, "role has no permission_set_name")
{:ok, false}
{:error, :invalid_permission_set} ->
log_auth_failure(actor, resource, action, "invalid permission_set_name")
{:ok, false}
_ ->
log_auth_failure(actor, resource, action, "missing data")
{:ok, false}
end
end
@impl true
def auto_filter(actor, authorizer, _opts) do
resource = authorizer.resource
action = get_action_from_authorizer(authorizer)
cond do
is_nil(actor) -> nil
is_nil(action) -> nil
true -> auto_filter_with_permissions(actor, resource, action)
end
end
# Helper function to reduce nesting depth
defp auto_filter_with_permissions(actor, resource, action) do
with %{role: %{permission_set_name: ps_name}} when not is_nil(ps_name) <- actor,
{:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name),
permissions <- PermissionSets.get_permissions(ps_atom),
resource_name <- get_resource_name(resource) do
case check_permission(
permissions.resources,
resource_name,
action,
actor,
resource_name
) do
:authorized -> nil
{:filter, filter_expr} -> filter_expr
false -> nil
end
else
_ -> nil
end
end
# Helper to extract action from authorizer
defp get_action_from_authorizer(authorizer) do
case authorizer.subject do
%{action: %{name: action}} -> action
%{action: action} when is_atom(action) -> action
_ -> nil
end
end
# Extract resource name from module (e.g., Mv.Membership.Member -> "Member")
defp get_resource_name(resource) when is_atom(resource) do
resource |> Module.split() |> List.last()
end
# Find matching permission and apply scope
defp check_permission(resource_perms, resource_name, action, actor, resource_name_for_logging) do
case Enum.find(resource_perms, fn perm ->
perm.resource == resource_name and perm.action == action and perm.granted
end) do
nil ->
log_auth_failure(actor, resource_name_for_logging, action, "no matching permission found")
false
perm ->
apply_scope(perm.scope, actor, resource_name)
end
end
# Scope: all - No filtering, access to all records
defp apply_scope(:all, _actor, _resource) do
:authorized
end
# Scope: own - Filter to records where record.id == actor.id
# Used for User resource (users can access their own user record)
defp apply_scope(:own, actor, _resource) do
{:filter, expr(id == ^actor.id)}
end
# Scope: linked - Filter based on user relationship (resource-specific!)
# Uses Ash relationships: Member has_one :user, CustomFieldValue belongs_to :member
defp apply_scope(:linked, actor, resource_name) do
case resource_name do
"Member" ->
# Member has_one :user → filter by user.id == actor.id
{:filter, expr(user.id == ^actor.id)}
"CustomFieldValue" ->
# CustomFieldValue belongs_to :member → member has_one :user
# Traverse: custom_field_value.member.user.id == actor.id
{:filter, expr(member.user.id == ^actor.id)}
_ ->
# Fallback for other resources: try user relationship first, then user_id
{:filter, expr(user.id == ^actor.id or user_id == ^actor.id)}
end
end
# Log authorization failures for debugging (lazy evaluation)
defp log_auth_failure(actor, resource, action, reason) do
Logger.debug(fn ->
actor_id = if is_map(actor), do: Map.get(actor, :id), else: "nil"
resource_name = get_resource_name_for_logging(resource)
"""
Authorization failed:
Actor: #{actor_id}
Resource: #{resource_name}
Action: #{inspect(action)}
Reason: #{reason}
"""
end)
end
# Helper to extract resource name for logging (handles both atoms and strings)
defp get_resource_name_for_logging(resource) when is_atom(resource) do
resource |> Module.split() |> List.last()
end
defp get_resource_name_for_logging(resource) when is_binary(resource) do
resource
end
defp get_resource_name_for_logging(_resource) do
"unknown"
end
end

View file

@ -0,0 +1,87 @@
defmodule Mv.Authorization.Checks.HasPermissionIntegrationTest do
@moduledoc """
Integration tests for HasPermission policy check.
These tests verify that the filter expressions generated by HasPermission
have the correct structure for relationship-based filtering.
Note: Full integration tests with real queries require resources to have
policies that use HasPermission. These tests validate filter expression
structure and ensure the relationship paths are correct.
"""
use ExUnit.Case, async: true
alias Mv.Authorization.Checks.HasPermission
# Helper to create mock actor with role
defp create_actor_with_role(permission_set_name) do
%{
id: "user-#{System.unique_integer([:positive])}",
role: %{permission_set_name: permission_set_name}
}
end
describe "Filter Expression Structure - :linked scope" do
test "Member filter uses user.id relationship path" do
actor = create_actor_with_role("own_data")
authorizer = create_authorizer(Mv.Membership.Member, :read)
filter = HasPermission.auto_filter(actor, authorizer, [])
# Verify filter is not nil (should return a filter for :linked scope)
assert not is_nil(filter)
# The filter should be a valid expression (keyword list or Ash.Expr)
# We verify it's not nil and can be used in queries
assert is_list(filter) or is_map(filter)
end
test "CustomFieldValue filter uses member.user.id relationship path" do
actor = create_actor_with_role("own_data")
authorizer = create_authorizer(Mv.Membership.CustomFieldValue, :read)
filter = HasPermission.auto_filter(actor, authorizer, [])
# Verify filter is not nil
assert not is_nil(filter)
# The filter should be a valid expression
assert is_list(filter) or is_map(filter)
end
end
describe "Filter Expression Structure - :own scope" do
test "User filter uses id == actor.id" do
actor = create_actor_with_role("own_data")
authorizer = create_authorizer(Mv.Accounts.User, :read)
filter = HasPermission.auto_filter(actor, authorizer, [])
# Verify filter is not nil (should return a filter for :own scope)
assert not is_nil(filter)
# The filter should be a valid expression
assert is_list(filter) or is_map(filter)
end
end
describe "Filter Expression Structure - :all scope" do
test "Admin can read all members without filter" do
actor = create_actor_with_role("admin")
authorizer = create_authorizer(Mv.Membership.Member, :read)
filter = HasPermission.auto_filter(actor, authorizer, [])
# :all scope should return nil (no filter needed)
assert is_nil(filter)
end
end
# Helper to create a mock authorizer
defp create_authorizer(resource, action) do
%Ash.Policy.Authorizer{
resource: resource,
subject: %{action: %{name: action}}
}
end
end

View file

@ -0,0 +1,264 @@
defmodule Mv.Authorization.Checks.HasPermissionTest do
@moduledoc """
Tests for the HasPermission Ash Policy Check.
This check evaluates permissions from the PermissionSets module and applies
scope filters to Ash queries.
"""
use ExUnit.Case, async: true
alias Mv.Authorization.Checks.HasPermission
# Helper to create a mock authorizer for strict_check/3
defp create_authorizer(resource, action) do
%Ash.Policy.Authorizer{
resource: resource,
subject: %{action: %{name: action}}
}
end
# Helper to create actor with role
defp create_actor(id, permission_set_name) do
%{
id: id,
role: %{permission_set_name: permission_set_name}
}
end
describe "describe/1" do
test "returns human-readable description" do
description = HasPermission.describe([])
assert is_binary(description)
assert description =~ "permission"
end
end
describe "strict_check/3 - Permission Lookup" do
test "admin has permission for all resources/actions" do
admin = create_actor("admin-123", "admin")
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(admin, authorizer, [])
assert result == true or result == :unknown
end
test "read_only has read permission for Member" do
read_only_user = create_actor("read-only-123", "read_only")
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(read_only_user, authorizer, [])
assert result == true or result == :unknown
end
test "read_only does NOT have create permission for Member" do
read_only_user = create_actor("read-only-123", "read_only")
authorizer = create_authorizer(Mv.Membership.Member, :create)
{:ok, result} = HasPermission.strict_check(read_only_user, authorizer, [])
assert result == false
end
test "own_data has update permission for User with scope :own" do
own_data_user = create_actor("user-123", "own_data")
authorizer = create_authorizer(Mv.Accounts.User, :update)
{:ok, result} = HasPermission.strict_check(own_data_user, authorizer, [])
# Should return :unknown for :own scope (needs filter)
assert result == :unknown
end
end
describe "strict_check/3 - Scope :all" do
test "actor with scope :all can access any record" do
admin = create_actor("admin-123", "admin")
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(admin, authorizer, [])
# :all scope should return true (no filter needed)
assert result == true
end
test "admin can read all members without filter" do
admin = create_actor("admin-123", "admin")
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(admin, authorizer, [])
# Should return true for :all scope
assert result == true
end
end
describe "strict_check/3 - Scope :own" do
test "actor with scope :own returns :unknown (needs filter)" do
user = create_actor("user-123", "own_data")
authorizer = create_authorizer(Mv.Accounts.User, :read)
{:ok, result} = HasPermission.strict_check(user, authorizer, [])
# Should return :unknown for :own scope (needs filter via auto_filter)
assert result == :unknown
end
end
describe "auto_filter/3 - Scope :own" do
test "scope :own returns filter expression" do
user = create_actor("user-123", "own_data")
authorizer = create_authorizer(Mv.Accounts.User, :update)
filter = HasPermission.auto_filter(user, authorizer, [])
# Should return a filter expression
assert not is_nil(filter)
end
end
describe "auto_filter/3 - Scope :linked" do
test "scope :linked for Member returns user_id filter" do
user = create_actor("user-123", "own_data")
authorizer = create_authorizer(Mv.Membership.Member, :read)
filter = HasPermission.auto_filter(user, authorizer, [])
# Should return a filter expression
assert not is_nil(filter)
end
test "scope :linked for CustomFieldValue returns member.user_id filter" do
user = create_actor("user-123", "own_data")
authorizer = create_authorizer(Mv.Membership.CustomFieldValue, :update)
filter = HasPermission.auto_filter(user, authorizer, [])
# Should return a filter expression that traverses member relationship
assert not is_nil(filter)
end
end
describe "strict_check/3 - Error Handling" do
test "returns {:ok, false} for nil actor" do
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(nil, authorizer, [])
assert result == false
end
test "returns {:ok, false} for actor missing role" do
actor_without_role = %{id: "user-123"}
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(actor_without_role, authorizer, [])
assert result == false
end
test "returns {:ok, false} for actor with nil role" do
actor_with_nil_role = %{id: "user-123", role: nil}
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(actor_with_nil_role, authorizer, [])
assert result == false
end
test "returns {:ok, false} for invalid permission_set_name" do
actor_with_invalid_permission = %{
id: "user-123",
role: %{permission_set_name: "invalid_set"}
}
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(actor_with_invalid_permission, authorizer, [])
assert result == false
end
test "returns {:ok, false} for no matching permission" do
read_only_user = create_actor("read-only-123", "read_only")
authorizer = create_authorizer(Mv.Authorization.Role, :create)
{:ok, result} = HasPermission.strict_check(read_only_user, authorizer, [])
assert result == false
end
test "handles role with nil permission_set_name gracefully" do
actor_with_nil_permission_set = %{
id: "user-123",
role: %{permission_set_name: nil}
}
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(actor_with_nil_permission_set, authorizer, [])
assert result == false
end
end
describe "strict_check/3 - Logging" do
import ExUnit.CaptureLog
test "logs authorization failure for nil actor" do
authorizer = create_authorizer(Mv.Membership.Member, :read)
log =
capture_log(fn ->
HasPermission.strict_check(nil, authorizer, [])
end)
assert log =~ "Authorization failed" or log == ""
end
test "logs authorization failure for missing role" do
actor_without_role = %{id: "user-123"}
authorizer = create_authorizer(Mv.Membership.Member, :read)
log =
capture_log(fn ->
HasPermission.strict_check(actor_without_role, authorizer, [])
end)
assert log =~ "Authorization failed" or log == ""
end
end
describe "strict_check/3 - Resource Name Extraction" do
test "correctly extracts resource name from nested module" do
admin = create_actor("admin-123", "admin")
authorizer = create_authorizer(Mv.Membership.Member, :read)
{:ok, result} = HasPermission.strict_check(admin, authorizer, [])
# Should work correctly (not crash)
assert result == true or result == :unknown or result == false
end
test "works with different resource modules" do
admin = create_actor("admin-123", "admin")
resources = [
Mv.Accounts.User,
Mv.Membership.Member,
Mv.Membership.CustomFieldValue,
Mv.Membership.CustomField,
Mv.Authorization.Role
]
for resource <- resources do
authorizer = create_authorizer(resource, :read)
{:ok, result} = HasPermission.strict_check(admin, authorizer, [])
# Should not crash and should return valid result
assert result == true or result == :unknown or result == false
end
end
end
end