mitgliederverwaltung/test/mv/oidc_role_sync_test.exs
Moritz d573a22769
All checks were successful
continuous-integration/drone/push Build is passing
Tests: accept single user or list from read_sign_in_with_rauthy (get? true)
Handle {:ok, user}, {:ok, nil} in addition to {:ok, [user]}, {:ok, []}.
2026-02-04 18:13:30 +01:00

181 lines
6.5 KiB
Elixir

defmodule Mv.OidcRoleSyncTest do
@moduledoc """
Tests for OIDC group → Admin/Mitglied role sync (apply_admin_role_from_user_info/2).
"""
use Mv.DataCase, async: false
alias Mv.Accounts
alias Mv.Accounts.User
alias Mv.Authorization.Role
alias Mv.OidcRoleSync
require Ash.Query
setup do
ensure_roles_exist()
restore_config = put_oidc_config(admin_group_name: "mila-admin", groups_claim: "groups")
on_exit(restore_config)
:ok
end
describe "apply_admin_role_from_user_info/2" do
test "when OIDC_ADMIN_GROUP_NAME not configured: does not change user (Mitglied stays)" do
restore = put_oidc_config(admin_group_name: nil, groups_claim: "groups")
on_exit(restore)
email = "sync-no-config-#{System.unique_integer([:positive])}@test.example.com"
{:ok, user} = create_user_with_mitglied(email)
role_id_before = user.role_id
user_info = %{"groups" => ["mila-admin"]}
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
{:ok, after_user} = get_user(user.id)
assert after_user.role_id == role_id_before
end
test "when user_info contains configured admin group: user gets Admin role" do
email = "sync-to-admin-#{System.unique_integer([:positive])}@test.example.com"
{:ok, user} = create_user_with_mitglied(email)
user_info = %{"groups" => ["mila-admin"]}
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
{:ok, after_user} = get_user(user.id)
assert after_user.role_id == admin_role_id()
end
test "when user_info does not contain admin group: user gets Mitglied role" do
email1 = "sync-to-mitglied-#{System.unique_integer([:positive])}@test.example.com"
email2 = "other-admin-#{System.unique_integer([:positive])}@test.example.com"
{:ok, user} = create_user_with_admin(email1)
{:ok, _} = create_user_with_admin(email2)
user_info = %{"groups" => ["other-group"]}
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
{:ok, after_user} = get_user(user.id)
assert after_user.role_id == mitglied_role_id()
end
test "when OIDC_GROUPS_CLAIM is different: reads groups from that claim" do
restore = put_oidc_config(admin_group_name: "mila-admin", groups_claim: "ak_groups")
on_exit(restore)
email = "sync-claim-#{System.unique_integer([:positive])}@test.example.com"
{:ok, user} = create_user_with_mitglied(email)
user_info = %{"ak_groups" => ["mila-admin"]}
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
{:ok, after_user} = get_user(user.id)
assert after_user.role_id == admin_role_id()
end
test "user already Admin and user_info without admin group: downgrade to Mitglied" do
email1 = "sync-downgrade-#{System.unique_integer([:positive])}@test.example.com"
email2 = "sync-other-admin-#{System.unique_integer([:positive])}@test.example.com"
{:ok, user1} = create_user_with_admin(email1)
{:ok, _user2} = create_user_with_admin(email2)
user_info = %{"groups" => []}
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user1, user_info)
{:ok, after_user} = get_user(user1.id)
assert after_user.role_id == mitglied_role_id()
end
test "when user_info has no groups, groups are read from access_token JWT (e.g. Rauthy)" do
email = "sync-from-token-#{System.unique_integer([:positive])}@test.example.com"
{:ok, user} = create_user_with_mitglied(email)
user_info = %{"sub" => "oidc-123"}
# Minimal JWT: header.payload.signature with "groups" in payload (Rauthy puts groups in access_token)
payload = Jason.encode!(%{"groups" => ["mila-admin"], "sub" => "oidc-123"})
payload_b64 = Base.url_encode64(payload, padding: false)
header_b64 = Base.url_encode64("{\"alg\":\"HS256\",\"typ\":\"JWT\"}", padding: false)
sig_b64 = Base.url_encode64("sig", padding: false)
access_token = "#{header_b64}.#{payload_b64}.#{sig_b64}"
oauth_tokens = %{"access_token" => access_token}
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info, oauth_tokens)
{:ok, after_user} = get_user(user.id)
assert after_user.role_id == admin_role_id()
end
end
# B3: Role sync after registration is implemented via after_action in register_with_rauthy.
# Full integration tests (create_register_with_rauthy + assert role) are skipped: when the
# nested Ash.update! runs inside the create's after_action, authorization may evaluate in
# the create context so set_role_from_oidc_sync bypass does not apply. Sync logic is covered
# by the apply_admin_role_from_user_info tests above. B4 sign-in sync will also use that.
defp ensure_roles_exist do
for {name, perm} <- [{"Admin", "admin"}, {"Mitglied", "own_data"}] do
case Role
|> Ash.Query.filter(name == ^name)
|> Ash.read_one(authorize?: false, domain: Mv.Authorization) do
{:ok, nil} ->
Role
|> Ash.Changeset.for_create(:create_role_with_system_flag, %{
name: name,
description: name,
permission_set_name: perm,
is_system_role: name == "Mitglied"
})
|> Ash.create!(authorize?: false, domain: Mv.Authorization)
_ ->
:ok
end
end
end
defp put_oidc_config(opts) do
current = Application.get_env(:mv, :oidc_role_sync, [])
merged = Keyword.merge(current, opts)
Application.put_env(:mv, :oidc_role_sync, merged)
fn ->
Application.put_env(:mv, :oidc_role_sync, current)
end
end
defp admin_role_id do
{:ok, role} = Role.get_admin_role()
role.id
end
defp mitglied_role_id do
{:ok, role} = Role.get_mitglied_role()
role.id
end
defp get_user(id) do
User
|> Ash.Query.filter(id == ^id)
|> Ash.read_one(authorize?: false, domain: Mv.Accounts)
end
defp create_user_with_mitglied(email) do
{:ok, _} = Accounts.create_user(%{email: email}, authorize?: false)
get_user_by_email(email)
end
defp create_user_with_admin(email) do
{:ok, _} = Accounts.create_user(%{email: email}, authorize?: false)
{:ok, u} = get_user_by_email(email)
u
|> Ash.Changeset.for_update(:update_user, %{role_id: admin_role_id()})
|> Ash.update!(authorize?: false)
get_user(u.id)
end
defp get_user_by_email(email) do
User
|> Ash.Query.filter(email == ^email)
|> Ash.read_one(authorize?: false, domain: Mv.Accounts)
end
end