Single describe with @tag permission_set and for-loop; one setup per permission set.
311 lines
11 KiB
Elixir
311 lines
11 KiB
Elixir
defmodule Mv.Accounts.UserPoliciesTest do
|
|
@moduledoc """
|
|
Tests for User resource authorization policies.
|
|
|
|
Tests all 4 permission sets (own_data, read_only, normal_user, admin)
|
|
and verifies that policies correctly enforce access control based on
|
|
user roles and permission sets.
|
|
"""
|
|
# async: false because we need database commits to be visible across queries
|
|
use Mv.DataCase, async: false
|
|
|
|
alias Mv.Accounts
|
|
|
|
require Ash.Query
|
|
|
|
setup do
|
|
system_actor = Mv.Helpers.SystemActor.get_system_actor()
|
|
%{actor: system_actor}
|
|
end
|
|
|
|
# Shared test setup for permission sets with scope :own access
|
|
defp setup_user_with_own_access(permission_set, actor) do
|
|
user = Mv.Fixtures.user_with_role_fixture(permission_set)
|
|
other_user = Mv.Fixtures.user_with_role_fixture("own_data")
|
|
|
|
# Reload user to ensure role is preloaded
|
|
{:ok, user} =
|
|
Ash.get(Accounts.User, user.id, domain: Mv.Accounts, load: [:role], actor: actor)
|
|
|
|
%{user: user, other_user: other_user}
|
|
end
|
|
|
|
# Data-driven: same behaviour for own_data, read_only, normal_user (scope :own for User)
|
|
describe "non-admin permission sets (own_data, read_only, normal_user)" do
|
|
setup %{actor: actor} = context do
|
|
permission_set = context[:permission_set] || "own_data"
|
|
setup_user_with_own_access(permission_set, actor)
|
|
end
|
|
|
|
for permission_set <- ["own_data", "read_only", "normal_user"] do
|
|
@tag permission_set: permission_set
|
|
test "can read own user record (#{permission_set})", %{user: user} do
|
|
{:ok, fetched_user} =
|
|
Ash.get(Accounts.User, user.id, actor: user, domain: Mv.Accounts)
|
|
|
|
assert fetched_user.id == user.id
|
|
end
|
|
|
|
@tag permission_set: permission_set
|
|
test "can update own email (#{permission_set})", %{user: user} do
|
|
new_email = "updated#{System.unique_integer([:positive])}@example.com"
|
|
|
|
{:ok, updated_user} =
|
|
user
|
|
|> Ash.Changeset.for_update(:update, %{email: new_email})
|
|
|> Ash.update(actor: user)
|
|
|
|
assert updated_user.email == Ash.CiString.new(new_email)
|
|
end
|
|
|
|
@tag permission_set: permission_set
|
|
test "cannot read other users - not found due to auto_filter (#{permission_set})", %{
|
|
user: user,
|
|
other_user: other_user
|
|
} do
|
|
assert_raise Ash.Error.Invalid, fn ->
|
|
Ash.get!(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts)
|
|
end
|
|
end
|
|
|
|
@tag permission_set: permission_set
|
|
test "cannot update other users - forbidden (#{permission_set})", %{
|
|
user: user,
|
|
other_user: other_user
|
|
} do
|
|
assert_raise Ash.Error.Forbidden, fn ->
|
|
other_user
|
|
|> Ash.Changeset.for_update(:update, %{email: "hacked@example.com"})
|
|
|> Ash.update!(actor: user)
|
|
end
|
|
end
|
|
|
|
@tag permission_set: permission_set
|
|
test "list users returns only own user (#{permission_set})", %{user: user} do
|
|
{:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts)
|
|
|
|
assert length(users) == 1
|
|
assert hd(users).id == user.id
|
|
end
|
|
|
|
@tag permission_set: permission_set
|
|
test "cannot create user - forbidden (#{permission_set})", %{user: user} do
|
|
assert_raise Ash.Error.Forbidden, fn ->
|
|
Accounts.User
|
|
|> Ash.Changeset.for_create(:create_user, %{
|
|
email: "new#{System.unique_integer([:positive])}@example.com"
|
|
})
|
|
|> Ash.create!(actor: user)
|
|
end
|
|
end
|
|
|
|
@tag permission_set: permission_set
|
|
test "cannot destroy user - forbidden (#{permission_set})", %{user: user} do
|
|
assert_raise Ash.Error.Forbidden, fn ->
|
|
Ash.destroy!(user, actor: user)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "admin permission set" do
|
|
setup %{actor: actor} do
|
|
user = Mv.Fixtures.user_with_role_fixture("admin")
|
|
other_user = Mv.Fixtures.user_with_role_fixture("own_data")
|
|
|
|
# Reload user to ensure role is preloaded
|
|
{:ok, user} =
|
|
Ash.get(Accounts.User, user.id, domain: Mv.Accounts, load: [:role], actor: actor)
|
|
|
|
%{user: user, other_user: other_user}
|
|
end
|
|
|
|
test "can read all users", %{user: user, other_user: other_user} do
|
|
{:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts)
|
|
|
|
# Should return all users (scope :all)
|
|
user_ids = Enum.map(users, & &1.id)
|
|
assert user.id in user_ids
|
|
assert other_user.id in user_ids
|
|
end
|
|
|
|
test "can read other users", %{user: user, other_user: other_user} do
|
|
{:ok, fetched_user} =
|
|
Ash.get(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts)
|
|
|
|
assert fetched_user.id == other_user.id
|
|
end
|
|
|
|
test "can update other users", %{user: user, other_user: other_user} do
|
|
new_email = "adminupdated#{System.unique_integer([:positive])}@example.com"
|
|
|
|
{:ok, updated_user} =
|
|
other_user
|
|
|> Ash.Changeset.for_update(:update_user, %{email: new_email})
|
|
|> Ash.update(actor: user)
|
|
|
|
assert updated_user.email == Ash.CiString.new(new_email)
|
|
end
|
|
|
|
test "can create user", %{user: user} do
|
|
{:ok, new_user} =
|
|
Accounts.User
|
|
|> Ash.Changeset.for_create(:create_user, %{
|
|
email: "new#{System.unique_integer([:positive])}@example.com"
|
|
})
|
|
|> Ash.create(actor: user)
|
|
|
|
assert new_user.email
|
|
end
|
|
|
|
test "can destroy user", %{user: user, other_user: other_user} do
|
|
:ok = Ash.destroy(other_user, actor: user)
|
|
|
|
# Verify user is deleted
|
|
assert {:error, _} = Ash.get(Accounts.User, other_user.id, domain: Mv.Accounts)
|
|
end
|
|
|
|
test "admin can assign role to another user via update_user", %{
|
|
other_user: other_user
|
|
} do
|
|
admin = Mv.Fixtures.user_with_role_fixture("admin")
|
|
normal_user_role = Mv.Fixtures.role_fixture("normal_user")
|
|
|
|
{:ok, updated} =
|
|
other_user
|
|
|> Ash.Changeset.for_update(:update_user, %{role_id: normal_user_role.id})
|
|
|> Ash.update(actor: admin)
|
|
|
|
assert updated.role_id == normal_user_role.id
|
|
end
|
|
end
|
|
|
|
describe "admin role assignment and last-admin validation" do
|
|
test "two admins: one can change own role to normal_user (other remains admin)", %{
|
|
actor: _actor
|
|
} do
|
|
_admin_role = Mv.Fixtures.role_fixture("admin")
|
|
normal_user_role = Mv.Fixtures.role_fixture("normal_user")
|
|
|
|
admin_a = Mv.Fixtures.user_with_role_fixture("admin")
|
|
_admin_b = Mv.Fixtures.user_with_role_fixture("admin")
|
|
|
|
{:ok, updated} =
|
|
admin_a
|
|
|> Ash.Changeset.for_update(:update_user, %{role_id: normal_user_role.id})
|
|
|> Ash.update(actor: admin_a)
|
|
|
|
assert updated.role_id == normal_user_role.id
|
|
end
|
|
|
|
test "single admin: changing own role to normal_user returns validation error", %{
|
|
actor: _actor
|
|
} do
|
|
normal_user_role = Mv.Fixtures.role_fixture("normal_user")
|
|
single_admin = Mv.Fixtures.user_with_role_fixture("admin")
|
|
|
|
assert {:error, %Ash.Error.Invalid{errors: errors}} =
|
|
single_admin
|
|
|> Ash.Changeset.for_update(:update_user, %{role_id: normal_user_role.id})
|
|
|> Ash.update(actor: single_admin)
|
|
|
|
error_messages =
|
|
Enum.flat_map(errors, fn
|
|
%Ash.Error.Changes.InvalidAttribute{message: msg} when is_binary(msg) -> [msg]
|
|
%{message: msg} when is_binary(msg) -> [msg]
|
|
_ -> []
|
|
end)
|
|
|
|
assert Enum.any?(error_messages, fn msg ->
|
|
msg =~ "least one user must keep the Admin role" or msg =~ "Admin role"
|
|
end),
|
|
"Expected last-admin validation message, got: #{inspect(error_messages)}"
|
|
end
|
|
end
|
|
|
|
describe "AshAuthentication bypass" do
|
|
test "register_with_password works without actor via AshAuthentication bypass" do
|
|
# Test that AshAuthentication bypass allows registration without actor
|
|
# This tests the actual bypass mechanism, not admin permissions
|
|
changeset =
|
|
Accounts.User
|
|
|> Ash.Changeset.for_create(:register_with_password, %{
|
|
email: "register#{System.unique_integer([:positive])}@example.com",
|
|
password: "testpassword123"
|
|
})
|
|
|> Ash.Changeset.set_context(%{private: %{ash_authentication?: true}})
|
|
|
|
{:ok, user} = Ash.create(changeset, domain: Mv.Accounts)
|
|
|
|
assert user.email
|
|
|
|
# Verify that default "Mitglied" role was assigned
|
|
{:ok, user_with_role} = Ash.load(user, :role, domain: Mv.Accounts, authorize?: false)
|
|
assert user_with_role.role != nil
|
|
assert user_with_role.role.name == "Mitglied"
|
|
end
|
|
|
|
test "register_with_rauthy works without actor via AshAuthentication bypass" do
|
|
# Test that AshAuthentication bypass allows OIDC registration without actor
|
|
user_info = %{
|
|
"sub" => "oidc_sub_#{System.unique_integer([:positive])}",
|
|
"email" => "oidc#{System.unique_integer([:positive])}@example.com"
|
|
}
|
|
|
|
oauth_tokens = %{access_token: "token", refresh_token: "refresh"}
|
|
|
|
changeset =
|
|
Accounts.User
|
|
|> Ash.Changeset.for_create(:register_with_rauthy, %{
|
|
user_info: user_info,
|
|
oauth_tokens: oauth_tokens
|
|
})
|
|
|> Ash.Changeset.set_context(%{private: %{ash_authentication?: true}})
|
|
|
|
{:ok, user} = Ash.create(changeset)
|
|
|
|
assert user.email
|
|
assert user.oidc_id == user_info["sub"]
|
|
end
|
|
|
|
test "sign_in_with_rauthy works without actor via AshAuthentication bypass" do
|
|
# First create a user with OIDC ID (using system_actor for setup)
|
|
system_actor = Mv.Helpers.SystemActor.get_system_actor()
|
|
|
|
user_info_create = %{
|
|
"sub" => "oidc_sub_#{System.unique_integer([:positive])}",
|
|
"email" => "oidc#{System.unique_integer([:positive])}@example.com"
|
|
}
|
|
|
|
oauth_tokens = %{access_token: "token", refresh_token: "refresh"}
|
|
|
|
{:ok, user} =
|
|
Accounts.User
|
|
|> Ash.Changeset.for_create(:register_with_rauthy, %{
|
|
user_info: user_info_create,
|
|
oauth_tokens: oauth_tokens
|
|
})
|
|
|> Ash.create(actor: system_actor)
|
|
|
|
# Now test sign_in_with_rauthy without actor (should work via AshAuthentication bypass)
|
|
query =
|
|
Accounts.User
|
|
|> Ash.Query.for_read(:sign_in_with_rauthy, %{
|
|
user_info: user_info_create,
|
|
oauth_tokens: oauth_tokens
|
|
})
|
|
|> Ash.Query.set_context(%{private: %{ash_authentication?: true}})
|
|
|
|
{:ok, signed_in_user} = Ash.read_one(query)
|
|
|
|
assert signed_in_user.id == user.id
|
|
end
|
|
|
|
# NOTE: get_by_subject is tested implicitly via AshAuthentication's JWT flow.
|
|
# Direct testing via Ash.Query.for_read(:get_by_subject) doesn't properly
|
|
# simulate the AshAuthentication context and would require mocking JWT tokens.
|
|
# The AshAuthentication bypass policy ensures this action works correctly
|
|
# when called through the proper authentication flow (sign_in, token refresh, etc.).
|
|
# Integration tests that use actual JWT tokens cover this functionality.
|
|
end
|
|
end
|