defmodule Mv.Accounts.UserPoliciesTest do @moduledoc """ Tests for User resource authorization policies. Tests all 4 permission sets (own_data, read_only, normal_user, admin) and verifies that policies correctly enforce access control based on user roles and permission sets. """ # async: false because we need database commits to be visible across queries use Mv.DataCase, async: false alias Mv.Accounts require Ash.Query setup do system_actor = Mv.Helpers.SystemActor.get_system_actor() %{actor: system_actor} end # Shared test setup for permission sets with scope :own access defp setup_user_with_own_access(permission_set, actor) do user = Mv.Fixtures.user_with_role_fixture(permission_set) other_user = Mv.Fixtures.user_with_role_fixture("own_data") # Reload user to ensure role is preloaded {:ok, user} = Ash.get(Accounts.User, user.id, domain: Mv.Accounts, load: [:role], actor: actor) %{user: user, other_user: other_user} end # Data-driven: same behaviour for own_data, read_only, normal_user (scope :own for User) describe "non-admin permission sets (own_data, read_only, normal_user)" do setup %{actor: actor} = context do permission_set = context[:permission_set] || "own_data" setup_user_with_own_access(permission_set, actor) end for permission_set <- ["own_data", "read_only", "normal_user"] do @tag permission_set: permission_set test "can read own user record (#{permission_set})", %{user: user} do {:ok, fetched_user} = Ash.get(Accounts.User, user.id, actor: user, domain: Mv.Accounts) assert fetched_user.id == user.id end @tag permission_set: permission_set test "can update own email (#{permission_set})", %{user: user} do new_email = "updated#{System.unique_integer([:positive])}@example.com" {:ok, updated_user} = user |> Ash.Changeset.for_update(:update, %{email: new_email}) |> Ash.update(actor: user) assert updated_user.email == Ash.CiString.new(new_email) end @tag permission_set: permission_set test "cannot read other users - not found due to auto_filter (#{permission_set})", %{ user: user, other_user: other_user } do assert_raise Ash.Error.Invalid, fn -> Ash.get!(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts) end end @tag permission_set: permission_set test "cannot update other users - forbidden (#{permission_set})", %{ user: user, other_user: other_user } do assert_raise Ash.Error.Forbidden, fn -> other_user |> Ash.Changeset.for_update(:update, %{email: "hacked@example.com"}) |> Ash.update!(actor: user) end end @tag permission_set: permission_set test "list users returns only own user (#{permission_set})", %{user: user} do {:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts) assert length(users) == 1 assert hd(users).id == user.id end @tag permission_set: permission_set test "cannot create user - forbidden (#{permission_set})", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Accounts.User |> Ash.Changeset.for_create(:create_user, %{ email: "new#{System.unique_integer([:positive])}@example.com" }) |> Ash.create!(actor: user) end end @tag permission_set: permission_set test "cannot destroy user - forbidden (#{permission_set})", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Ash.destroy!(user, actor: user) end end @tag permission_set: permission_set test "cannot change role via update_user - forbidden (#{permission_set})", %{ user: user, other_user: other_user } do other_role = Mv.Fixtures.role_fixture("read_only") assert {:error, %Ash.Error.Forbidden{}} = other_user |> Ash.Changeset.for_update(:update_user, %{role_id: other_role.id}) |> Ash.update(actor: user, domain: Mv.Accounts) end end end describe "admin permission set" do setup %{actor: actor} do user = Mv.Fixtures.user_with_role_fixture("admin") other_user = Mv.Fixtures.user_with_role_fixture("own_data") # Reload user to ensure role is preloaded {:ok, user} = Ash.get(Accounts.User, user.id, domain: Mv.Accounts, load: [:role], actor: actor) %{user: user, other_user: other_user} end test "can read all users", %{user: user, other_user: other_user} do {:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts) # Should return all users (scope :all) user_ids = Enum.map(users, & &1.id) assert user.id in user_ids assert other_user.id in user_ids end test "can read other users", %{user: user, other_user: other_user} do {:ok, fetched_user} = Ash.get(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts) assert fetched_user.id == other_user.id end test "can update other users", %{user: user, other_user: other_user} do new_email = "adminupdated#{System.unique_integer([:positive])}@example.com" {:ok, updated_user} = other_user |> Ash.Changeset.for_update(:update_user, %{email: new_email}) |> Ash.update(actor: user) assert updated_user.email == Ash.CiString.new(new_email) end test "can create user", %{user: user} do {:ok, new_user} = Accounts.User |> Ash.Changeset.for_create(:create_user, %{ email: "new#{System.unique_integer([:positive])}@example.com" }) |> Ash.create(actor: user) assert new_user.email end test "can destroy user", %{user: user, other_user: other_user} do :ok = Ash.destroy(other_user, actor: user) # Verify user is deleted assert {:error, _} = Ash.get(Accounts.User, other_user.id, domain: Mv.Accounts) end test "admin can assign role to another user via update_user", %{ other_user: other_user } do admin = Mv.Fixtures.user_with_role_fixture("admin") normal_user_role = Mv.Fixtures.role_fixture("normal_user") {:ok, updated} = other_user |> Ash.Changeset.for_update(:update_user, %{role_id: normal_user_role.id}) |> Ash.update(actor: admin) assert updated.role_id == normal_user_role.id end end describe "admin role assignment and last-admin validation" do test "two admins: one can change own role to normal_user (other remains admin)", %{ actor: _actor } do _admin_role = Mv.Fixtures.role_fixture("admin") normal_user_role = Mv.Fixtures.role_fixture("normal_user") admin_a = Mv.Fixtures.user_with_role_fixture("admin") _admin_b = Mv.Fixtures.user_with_role_fixture("admin") {:ok, updated} = admin_a |> Ash.Changeset.for_update(:update_user, %{role_id: normal_user_role.id}) |> Ash.update(actor: admin_a) assert updated.role_id == normal_user_role.id end test "single admin: changing own role to normal_user returns validation error", %{ actor: _actor } do normal_user_role = Mv.Fixtures.role_fixture("normal_user") single_admin = Mv.Fixtures.user_with_role_fixture("admin") assert {:error, %Ash.Error.Invalid{errors: errors}} = single_admin |> Ash.Changeset.for_update(:update_user, %{role_id: normal_user_role.id}) |> Ash.update(actor: single_admin) error_messages = Enum.flat_map(errors, fn %Ash.Error.Changes.InvalidAttribute{message: msg} when is_binary(msg) -> [msg] %{message: msg} when is_binary(msg) -> [msg] _ -> [] end) assert Enum.any?(error_messages, fn msg -> msg =~ "least one user must keep the Admin role" or msg =~ "Admin role" end), "Expected last-admin validation message, got: #{inspect(error_messages)}" end test "admin can switch to another admin role (two roles with permission_set_name admin)", %{ actor: _actor } do # Two distinct roles both with permission_set_name "admin" (e.g. "Admin" and "Superadmin") admin_role_a = Mv.Fixtures.role_fixture("admin") admin_role_b = Mv.Fixtures.role_fixture("admin") admin_user = Mv.Fixtures.user_with_role_fixture("admin") # Ensure user has role_a so we can switch to role_b {:ok, admin_user} = admin_user |> Ash.Changeset.for_update(:update_user, %{role_id: admin_role_a.id}) |> Ash.update(actor: admin_user) assert admin_user.role_id == admin_role_a.id # Switching to another admin role must be allowed (no last-admin error) {:ok, updated} = admin_user |> Ash.Changeset.for_update(:update_user, %{role_id: admin_role_b.id}) |> Ash.update(actor: admin_user) assert updated.role_id == admin_role_b.id end end describe "AshAuthentication bypass" do test "register_with_password works without actor via AshAuthentication bypass" do # Test that AshAuthentication bypass allows registration without actor # This tests the actual bypass mechanism, not admin permissions changeset = Accounts.User |> Ash.Changeset.for_create(:register_with_password, %{ email: "register#{System.unique_integer([:positive])}@example.com", password: "testpassword123" }) |> Ash.Changeset.set_context(%{private: %{ash_authentication?: true}}) {:ok, user} = Ash.create(changeset, domain: Mv.Accounts) assert user.email # Verify that default "Mitglied" role was assigned {:ok, user_with_role} = Ash.load(user, :role, domain: Mv.Accounts, authorize?: false) assert user_with_role.role != nil assert user_with_role.role.name == "Mitglied" end test "register_with_rauthy works without actor via AshAuthentication bypass" do # Test that AshAuthentication bypass allows OIDC registration without actor user_info = %{ "sub" => "oidc_sub_#{System.unique_integer([:positive])}", "email" => "oidc#{System.unique_integer([:positive])}@example.com" } oauth_tokens = %{access_token: "token", refresh_token: "refresh"} changeset = Accounts.User |> Ash.Changeset.for_create(:register_with_rauthy, %{ user_info: user_info, oauth_tokens: oauth_tokens }) |> Ash.Changeset.set_context(%{private: %{ash_authentication?: true}}) {:ok, user} = Ash.create(changeset) assert user.email assert user.oidc_id == user_info["sub"] end test "sign_in_with_rauthy works without actor via AshAuthentication bypass" do # First create a user with OIDC ID (using system_actor for setup) system_actor = Mv.Helpers.SystemActor.get_system_actor() user_info_create = %{ "sub" => "oidc_sub_#{System.unique_integer([:positive])}", "email" => "oidc#{System.unique_integer([:positive])}@example.com" } oauth_tokens = %{access_token: "token", refresh_token: "refresh"} {:ok, user} = Accounts.User |> Ash.Changeset.for_create(:register_with_rauthy, %{ user_info: user_info_create, oauth_tokens: oauth_tokens }) |> Ash.create(actor: system_actor) # Now test sign_in_with_rauthy without actor (should work via AshAuthentication bypass) query = Accounts.User |> Ash.Query.for_read(:sign_in_with_rauthy, %{ user_info: user_info_create, oauth_tokens: oauth_tokens }) |> Ash.Query.set_context(%{private: %{ash_authentication?: true}}) {:ok, signed_in_user} = Ash.read_one(query) assert signed_in_user.id == user.id end # NOTE: get_by_subject is tested implicitly via AshAuthentication's JWT flow. # Direct testing via Ash.Query.for_read(:get_by_subject) doesn't properly # simulate the AshAuthentication context and would require mocking JWT tokens. # The AshAuthentication bypass policy ensures this action works correctly # when called through the proper authentication flow (sign_in, token refresh, etc.). # Integration tests that use actual JWT tokens cover this functionality. end end