mitgliederverwaltung/test/mv/helpers/system_actor_test.exs
Moritz e47547e065 Add Role resource policies (defense-in-depth)
- PermissionSets: Role read :all for own_data, read_only, normal_user; admin keeps full CRUD
- Role resource: authorizers and policies with HasPermission
- Tests: role_policies_test.exs (read all, create/update/destroy admin only)
- Fix existing tests to pass actor or authorize?: false for Role operations
2026-02-04 12:37:48 +01:00

429 lines
14 KiB
Elixir

defmodule Mv.Helpers.SystemActorTest do
@moduledoc """
Tests for the SystemActor helper module.
"""
use Mv.DataCase, async: false
alias Mv.Helpers.SystemActor
alias Mv.Authorization
alias Mv.Accounts
require Ash.Query
# Deletes a user row directly via SQL, bypassing Ash validations.
# Use only in tests when setting up "no system user" / "no users" scenarios;
# Ash.destroy! forbids deleting the system actor user.
defp delete_user_bypass_ash(user) do
id = Ecto.UUID.dump!(user.id)
Ecto.Adapters.SQL.query!(Mv.Repo, "DELETE FROM users WHERE id = $1", [id])
end
# Helper function to ensure admin role exists (bootstrap: no actor yet, use authorize?: false)
defp ensure_admin_role do
case Authorization.list_roles(authorize?: false) do
{:ok, roles} ->
case Enum.find(roles, &(&1.permission_set_name == "admin")) do
nil ->
{:ok, role} =
Authorization.create_role(
%{
name: "Admin",
description: "Administrator with full access",
permission_set_name: "admin"
},
authorize?: false
)
role
role ->
role
end
_ ->
{:ok, role} =
Authorization.create_role(
%{
name: "Admin",
description: "Administrator with full access",
permission_set_name: "admin"
},
authorize?: false
)
role
end
end
# Helper function to ensure system user exists with admin role
defp ensure_system_user(admin_role) do
# Use authorize?: false for bootstrap operations
case Accounts.User
|> Ash.Query.filter(email == ^"system@mila.local")
|> Ash.read_one(domain: Mv.Accounts, authorize?: false) do
{:ok, user} when not is_nil(user) ->
user
|> Ash.Changeset.for_update(:update_internal, %{})
|> Ash.Changeset.manage_relationship(:role, admin_role, type: :append_and_remove)
|> Ash.update!(authorize?: false)
|> Ash.load!(:role, domain: Mv.Accounts, authorize?: false)
_ ->
Accounts.create_user!(%{email: "system@mila.local"},
upsert?: true,
upsert_identity: :unique_email,
authorize?: false
)
|> Ash.Changeset.for_update(:update_internal, %{})
|> Ash.Changeset.manage_relationship(:role, admin_role, type: :append_and_remove)
|> Ash.update!(authorize?: false)
|> Ash.load!(:role, domain: Mv.Accounts, authorize?: false)
end
end
# Helper function to ensure admin user exists with admin role
defp ensure_admin_user(admin_role) do
# Use authorize?: false for bootstrap operations
admin_email = System.get_env("ADMIN_EMAIL") || "admin@localhost"
case Accounts.User
|> Ash.Query.filter(email == ^admin_email)
|> Ash.read_one(domain: Mv.Accounts, authorize?: false) do
{:ok, user} when not is_nil(user) ->
user
|> Ash.Changeset.for_update(:update, %{})
|> Ash.Changeset.manage_relationship(:role, admin_role, type: :append_and_remove)
|> Ash.update!(authorize?: false)
|> Ash.load!(:role, domain: Mv.Accounts, authorize?: false)
_ ->
Accounts.create_user!(%{email: admin_email},
upsert?: true,
upsert_identity: :unique_email,
authorize?: false
)
|> Ash.Changeset.for_update(:update, %{})
|> Ash.Changeset.manage_relationship(:role, admin_role, type: :append_and_remove)
|> Ash.update!(authorize?: false)
|> Ash.load!(:role, domain: Mv.Accounts, authorize?: false)
end
end
setup do
admin_role = ensure_admin_role()
system_user = ensure_system_user(admin_role)
admin_user = ensure_admin_user(admin_role)
# Invalidate cache to ensure fresh load
SystemActor.invalidate_cache()
%{admin_role: admin_role, system_user: system_user, admin_user: admin_user}
end
describe "get_system_actor/0" do
test "returns system user with admin role", %{system_user: _system_user} do
system_actor = SystemActor.get_system_actor()
assert %Mv.Accounts.User{} = system_actor
assert to_string(system_actor.email) == "system@mila.local"
assert system_actor.role != nil
assert system_actor.role.permission_set_name == "admin"
end
test "falls back to admin user if system user doesn't exist", %{admin_user: _admin_user} do
# Delete system user if it exists
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^"system@mila.local")
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
# Invalidate cache to force reload
SystemActor.invalidate_cache()
# Should fall back to admin user
system_actor = SystemActor.get_system_actor()
assert %Mv.Accounts.User{} = system_actor
assert system_actor.role != nil
assert system_actor.role.permission_set_name == "admin"
# Should be admin user, not system user
assert to_string(system_actor.email) != "system@mila.local"
end
test "caches system actor for performance" do
# First call
actor1 = SystemActor.get_system_actor()
# Second call should return cached actor (same struct)
actor2 = SystemActor.get_system_actor()
# Should be the same struct (cached)
assert actor1.id == actor2.id
end
test "creates system user in test environment if none exists", %{admin_role: _admin_role} do
# In test environment, system actor should auto-create if missing
# Delete all users to test auto-creation
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^"system@mila.local")
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
admin_email = System.get_env("ADMIN_EMAIL") || "admin@localhost"
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^admin_email)
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
# Invalidate cache
SystemActor.invalidate_cache()
# Should auto-create system user in test environment
system_actor = SystemActor.get_system_actor()
assert %Mv.Accounts.User{} = system_actor
assert to_string(system_actor.email) == "system@mila.local"
assert system_actor.role != nil
assert system_actor.role.permission_set_name == "admin"
end
end
describe "invalidate_cache/0" do
test "forces reload of system actor on next call" do
# Get initial actor
actor1 = SystemActor.get_system_actor()
# Invalidate cache
:ok = SystemActor.invalidate_cache()
# Next call should reload (but should be same user)
actor2 = SystemActor.get_system_actor()
# Should be same user (same ID)
assert actor1.id == actor2.id
end
end
describe "get_system_actor_result/0" do
test "returns ok tuple with system actor" do
assert {:ok, actor} = SystemActor.get_system_actor_result()
assert %Mv.Accounts.User{} = actor
assert actor.role.permission_set_name == "admin"
end
test "returns error tuple when system actor cannot be loaded" do
# Delete all users to force error
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^"system@mila.local")
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
admin_email = System.get_env("ADMIN_EMAIL") || "admin@localhost"
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^admin_email)
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
SystemActor.invalidate_cache()
# In test environment, it should auto-create, so this should succeed
# But if it fails, we should get an error tuple
result = SystemActor.get_system_actor_result()
# Should either succeed (auto-created) or return error
assert match?({:ok, _}, result) or match?({:error, _}, result)
end
end
describe "system_user_email/0" do
test "returns the system user email address" do
assert SystemActor.system_user_email() == "system@mila.local"
end
end
describe "edge cases" do
test "raises error if admin user has invalid role (role loading fails)", %{
admin_user: admin_user
} do
# Delete system user to force fallback to admin user
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^"system@mila.local")
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
# Test that NOT NULL + FK constraints prevent setting role_id to NULL
# We verify this by attempting to set role_id to NULL and expecting a constraint violation
admin_user_id = Ecto.UUID.cast!(admin_user.id)
admin_user_id_binary = Ecto.UUID.dump!(admin_user_id)
# Attempting to set role_id to NULL should fail due to NOT NULL constraint
assert_raise Postgrex.Error,
~r/null value in column.*role_id.*violates not-null constraint/i,
fn ->
Ecto.Adapters.SQL.query!(
Mv.Repo,
"""
UPDATE users
SET role_id = NULL
WHERE id = $1::uuid
""",
[admin_user_id_binary]
)
end
# Note: With NOT NULL + FK constraints, we can't test the "no role" case directly
# because the database prevents it. This is the desired behavior - the constraints
# guarantee that role_id is always valid.
end
test "handles concurrent calls without race conditions" do
# Delete system user and admin user to force creation
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^"system@mila.local")
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
admin_email = System.get_env("ADMIN_EMAIL") || "admin@localhost"
system_actor = SystemActor.get_system_actor()
case Accounts.User
|> Ash.Query.filter(email == ^admin_email)
|> Ash.read_one(domain: Mv.Accounts, actor: system_actor) do
{:ok, user} when not is_nil(user) ->
delete_user_bypass_ash(user)
_ ->
:ok
end
SystemActor.invalidate_cache()
# Call get_system_actor concurrently from multiple processes
tasks =
for _ <- 1..10 do
Task.async(fn -> SystemActor.get_system_actor() end)
end
results = Task.await_many(tasks, :infinity)
# All should succeed and return the same actor
assert length(results) == 10
assert Enum.all?(results, &(&1.role.permission_set_name == "admin"))
assert Enum.all?(results, fn actor -> to_string(actor.email) == "system@mila.local" end)
# All should have the same ID (same user)
ids = Enum.map(results, & &1.id)
assert Enum.uniq(ids) |> length() == 1
end
test "raises error if system user has wrong role", %{system_user: system_user} do
# Create a non-admin role (using read_only as it's a valid permission set)
system_actor = SystemActor.get_system_actor()
{:ok, read_only_role} =
Authorization.create_role(
%{
name: "Read Only Role",
description: "Read-only access",
permission_set_name: "read_only"
},
actor: system_actor
)
system_actor = SystemActor.get_system_actor()
# Assign wrong role to system user (use :update_internal so bootstrap-style update is allowed)
system_user
|> Ash.Changeset.for_update(:update_internal, %{})
|> Ash.Changeset.manage_relationship(:role, read_only_role, type: :append_and_remove)
|> Ash.update!(actor: system_actor)
SystemActor.invalidate_cache()
# Should raise error because system user doesn't have admin role
assert_raise RuntimeError, ~r/System actor must have admin role/, fn ->
SystemActor.get_system_actor()
end
end
test "raises error if system user has invalid role (role loading fails)", %{
system_user: system_user
} do
# Test that NOT NULL + FK constraints prevent setting role_id to NULL
# We verify this by attempting to set role_id to NULL and expecting a constraint violation
system_user_id = Ecto.UUID.cast!(system_user.id)
system_user_id_binary = Ecto.UUID.dump!(system_user_id)
# Attempting to set role_id to NULL should fail due to NOT NULL constraint
assert_raise Postgrex.Error,
~r/null value in column.*role_id.*violates not-null constraint/i,
fn ->
Ecto.Adapters.SQL.query!(
Mv.Repo,
"""
UPDATE users
SET role_id = NULL
WHERE id = $1::uuid
""",
[system_user_id_binary]
)
end
# Note: With NOT NULL + FK constraints, we can't test the "no role" case directly
# because the database prevents it. This is the desired behavior - the constraints
# guarantee that role_id is always valid.
end
end
end