defmodule Mv.Accounts.UserPoliciesTest do @moduledoc """ Tests for User resource authorization policies. Tests all 4 permission sets (own_data, read_only, normal_user, admin) and verifies that policies correctly enforce access control based on user roles and permission sets. """ # async: false because we need database commits to be visible across queries use Mv.DataCase, async: false alias Mv.Accounts alias Mv.Authorization require Ash.Query setup do system_actor = Mv.Helpers.SystemActor.get_system_actor() %{actor: system_actor} end # Helper to create a role with a specific permission set defp create_role_with_permission_set(permission_set_name) do role_name = "Test Role #{permission_set_name} #{System.unique_integer([:positive])}" case Authorization.create_role(%{ name: role_name, description: "Test role for #{permission_set_name}", permission_set_name: permission_set_name }) do {:ok, role} -> role {:error, error} -> raise "Failed to create role: #{inspect(error)}" end end # Helper to create a user with a specific permission set # Returns user with role preloaded (required for authorization) defp create_user_with_permission_set(permission_set_name, actor) do # Create role with permission set role = create_role_with_permission_set(permission_set_name) # Create user {:ok, user} = Accounts.User |> Ash.Changeset.for_create(:register_with_password, %{ email: "user#{System.unique_integer([:positive])}@example.com", password: "testpassword123" }) |> Ash.create(actor: actor) # Assign role to user {:ok, user} = user |> Ash.Changeset.for_update(:update, %{}) |> Ash.Changeset.manage_relationship(:role, role, type: :append_and_remove) |> Ash.update(actor: actor) # Reload user with role preloaded (critical for authorization!) {:ok, user_with_role} = Ash.load(user, :role, domain: Mv.Accounts, actor: actor) user_with_role end # Helper to create another user (for testing access to other users) defp create_other_user(actor) do create_user_with_permission_set("own_data", actor) end # Shared test setup for permission sets with scope :own access defp setup_user_with_own_access(permission_set, actor) do user = create_user_with_permission_set(permission_set, actor) other_user = create_other_user(actor) # Reload user to ensure role is preloaded {:ok, user} = Ash.get(Accounts.User, user.id, domain: Mv.Accounts, load: [:role], actor: actor) %{user: user, other_user: other_user} end describe "own_data permission set (Mitglied)" do setup %{actor: actor} do setup_user_with_own_access("own_data", actor) end test "can read own user record", %{user: user} do {:ok, fetched_user} = Ash.get(Accounts.User, user.id, actor: user, domain: Mv.Accounts) assert fetched_user.id == user.id end test "can update own email", %{user: user} do new_email = "updated#{System.unique_integer([:positive])}@example.com" {:ok, updated_user} = user |> Ash.Changeset.for_update(:update_user, %{email: new_email}) |> Ash.update(actor: user) assert updated_user.email == Ash.CiString.new(new_email) end test "cannot read other users (returns not found due to auto_filter)", %{ user: user, other_user: other_user } do # Note: With auto_filter policies, when a user tries to read a user that doesn't # match the filter (id == actor.id), Ash returns NotFound, not Forbidden. # This is the expected behavior - the filter makes the record "invisible" to the user. assert_raise Ash.Error.Invalid, fn -> Ash.get!(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts) end end test "cannot update other users (returns forbidden)", %{user: user, other_user: other_user} do assert_raise Ash.Error.Forbidden, fn -> other_user |> Ash.Changeset.for_update(:update_user, %{email: "hacked@example.com"}) |> Ash.update!(actor: user) end end test "list users returns only own user", %{user: user} do {:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts) # Should only return the own user (scope :own filters) assert length(users) == 1 assert hd(users).id == user.id end test "cannot create user (returns forbidden)", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Accounts.User |> Ash.Changeset.for_create(:create_user, %{ email: "new#{System.unique_integer([:positive])}@example.com" }) |> Ash.create!(actor: user) end end test "cannot destroy user (returns forbidden)", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Ash.destroy!(user, actor: user) end end end describe "read_only permission set (Vorstand/Buchhaltung)" do setup %{actor: actor} do setup_user_with_own_access("read_only", actor) end test "can read own user record", %{user: user} do {:ok, fetched_user} = Ash.get(Accounts.User, user.id, actor: user, domain: Mv.Accounts) assert fetched_user.id == user.id end test "can update own email", %{user: user} do new_email = "updated#{System.unique_integer([:positive])}@example.com" {:ok, updated_user} = user |> Ash.Changeset.for_update(:update_user, %{email: new_email}) |> Ash.update(actor: user) assert updated_user.email == Ash.CiString.new(new_email) end test "cannot read other users (returns not found due to auto_filter)", %{ user: user, other_user: other_user } do # Note: With auto_filter policies, when a user tries to read a user that doesn't # match the filter (id == actor.id), Ash returns NotFound, not Forbidden. # This is the expected behavior - the filter makes the record "invisible" to the user. assert_raise Ash.Error.Invalid, fn -> Ash.get!(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts) end end test "cannot update other users (returns forbidden)", %{user: user, other_user: other_user} do assert_raise Ash.Error.Forbidden, fn -> other_user |> Ash.Changeset.for_update(:update_user, %{email: "hacked@example.com"}) |> Ash.update!(actor: user) end end test "list users returns only own user", %{user: user} do {:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts) # Should only return the own user (scope :own filters) assert length(users) == 1 assert hd(users).id == user.id end test "cannot create user (returns forbidden)", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Accounts.User |> Ash.Changeset.for_create(:create_user, %{ email: "new#{System.unique_integer([:positive])}@example.com" }) |> Ash.create!(actor: user) end end test "cannot destroy user (returns forbidden)", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Ash.destroy!(user, actor: user) end end end describe "normal_user permission set (Kassenwart)" do setup %{actor: actor} do setup_user_with_own_access("normal_user", actor) end test "can read own user record", %{user: user} do {:ok, fetched_user} = Ash.get(Accounts.User, user.id, actor: user, domain: Mv.Accounts) assert fetched_user.id == user.id end test "can update own email", %{user: user} do new_email = "updated#{System.unique_integer([:positive])}@example.com" {:ok, updated_user} = user |> Ash.Changeset.for_update(:update_user, %{email: new_email}) |> Ash.update(actor: user) assert updated_user.email == Ash.CiString.new(new_email) end test "cannot read other users (returns not found due to auto_filter)", %{ user: user, other_user: other_user } do # Note: With auto_filter policies, when a user tries to read a user that doesn't # match the filter (id == actor.id), Ash returns NotFound, not Forbidden. # This is the expected behavior - the filter makes the record "invisible" to the user. assert_raise Ash.Error.Invalid, fn -> Ash.get!(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts) end end test "cannot update other users (returns forbidden)", %{user: user, other_user: other_user} do assert_raise Ash.Error.Forbidden, fn -> other_user |> Ash.Changeset.for_update(:update_user, %{email: "hacked@example.com"}) |> Ash.update!(actor: user) end end test "list users returns only own user", %{user: user} do {:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts) # Should only return the own user (scope :own filters) assert length(users) == 1 assert hd(users).id == user.id end test "cannot create user (returns forbidden)", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Accounts.User |> Ash.Changeset.for_create(:create_user, %{ email: "new#{System.unique_integer([:positive])}@example.com" }) |> Ash.create!(actor: user) end end test "cannot destroy user (returns forbidden)", %{user: user} do assert_raise Ash.Error.Forbidden, fn -> Ash.destroy!(user, actor: user) end end end describe "admin permission set" do setup %{actor: actor} do user = create_user_with_permission_set("admin", actor) other_user = create_other_user(actor) # Reload user to ensure role is preloaded {:ok, user} = Ash.get(Accounts.User, user.id, domain: Mv.Accounts, load: [:role], actor: actor) %{user: user, other_user: other_user} end test "can read all users", %{user: user, other_user: other_user} do {:ok, users} = Ash.read(Accounts.User, actor: user, domain: Mv.Accounts) # Should return all users (scope :all) user_ids = Enum.map(users, & &1.id) assert user.id in user_ids assert other_user.id in user_ids end test "can read other users", %{user: user, other_user: other_user} do {:ok, fetched_user} = Ash.get(Accounts.User, other_user.id, actor: user, domain: Mv.Accounts) assert fetched_user.id == other_user.id end test "can update other users", %{user: user, other_user: other_user} do new_email = "adminupdated#{System.unique_integer([:positive])}@example.com" {:ok, updated_user} = other_user |> Ash.Changeset.for_update(:update_user, %{email: new_email}) |> Ash.update(actor: user) assert updated_user.email == Ash.CiString.new(new_email) end test "can create user", %{user: user} do {:ok, new_user} = Accounts.User |> Ash.Changeset.for_create(:create_user, %{ email: "new#{System.unique_integer([:positive])}@example.com" }) |> Ash.create(actor: user) assert new_user.email end test "can destroy user", %{user: user, other_user: other_user} do :ok = Ash.destroy(other_user, actor: user) # Verify user is deleted assert {:error, _} = Ash.get(Accounts.User, other_user.id, domain: Mv.Accounts) end end describe "AshAuthentication bypass" do test "register_with_password works without actor" do # Registration should work without actor (AshAuthentication bypass) # Note: When directly calling Ash actions in tests, the AshAuthentication bypass # may not be active, so we use system_actor system_actor = Mv.Helpers.SystemActor.get_system_actor() {:ok, user} = Accounts.User |> Ash.Changeset.for_create(:register_with_password, %{ email: "register#{System.unique_integer([:positive])}@example.com", password: "testpassword123" }) |> Ash.create(actor: system_actor) assert user.email end test "register_with_rauthy works with OIDC user_info" do # OIDC registration should work (AshAuthentication bypass) # Note: When directly calling Ash actions in tests, the AshAuthentication bypass # may not be active, so we use system_actor system_actor = Mv.Helpers.SystemActor.get_system_actor() user_info = %{ "sub" => "oidc_sub_#{System.unique_integer([:positive])}", "email" => "oidc#{System.unique_integer([:positive])}@example.com" } oauth_tokens = %{access_token: "token", refresh_token: "refresh"} {:ok, user} = Accounts.User |> Ash.Changeset.for_create(:register_with_rauthy, %{ user_info: user_info, oauth_tokens: oauth_tokens }) |> Ash.create(actor: system_actor) assert user.email assert user.oidc_id == user_info["sub"] end test "sign_in_with_rauthy works with OIDC user_info" do # First create a user with OIDC ID user_info_create = %{ "sub" => "oidc_sub_#{System.unique_integer([:positive])}", "email" => "oidc#{System.unique_integer([:positive])}@example.com" } oauth_tokens = %{access_token: "token", refresh_token: "refresh"} system_actor = Mv.Helpers.SystemActor.get_system_actor() {:ok, user} = Accounts.User |> Ash.Changeset.for_create(:register_with_rauthy, %{ user_info: user_info_create, oauth_tokens: oauth_tokens }) |> Ash.create(actor: system_actor) # Now test sign_in_with_rauthy (should work via AshAuthentication bypass) {:ok, signed_in_user} = Accounts.User |> Ash.Query.for_read(:sign_in_with_rauthy, %{ user_info: user_info_create, oauth_tokens: oauth_tokens }) |> Ash.read_one(actor: system_actor) assert signed_in_user.id == user.id end # NOTE: get_by_subject is tested implicitly via AshAuthentication's JWT flow. # Direct testing via Ash.Query.for_read(:get_by_subject) doesn't properly # simulate the AshAuthentication context and would require mocking JWT tokens. # The AshAuthentication bypass policy ensures this action works correctly # when called through the proper authentication flow (sign_in, token refresh, etc.). # Integration tests that use actual JWT tokens cover this functionality. end end