All checks were successful
continuous-integration/drone/push Build is passing
Handle {:ok, user}, {:ok, nil} in addition to {:ok, [user]}, {:ok, []}.
181 lines
6.5 KiB
Elixir
181 lines
6.5 KiB
Elixir
defmodule Mv.OidcRoleSyncTest do
|
|
@moduledoc """
|
|
Tests for OIDC group → Admin/Mitglied role sync (apply_admin_role_from_user_info/2).
|
|
"""
|
|
use Mv.DataCase, async: false
|
|
|
|
alias Mv.Accounts
|
|
alias Mv.Accounts.User
|
|
alias Mv.Authorization.Role
|
|
alias Mv.OidcRoleSync
|
|
require Ash.Query
|
|
|
|
setup do
|
|
ensure_roles_exist()
|
|
restore_config = put_oidc_config(admin_group_name: "mila-admin", groups_claim: "groups")
|
|
on_exit(restore_config)
|
|
:ok
|
|
end
|
|
|
|
describe "apply_admin_role_from_user_info/2" do
|
|
test "when OIDC_ADMIN_GROUP_NAME not configured: does not change user (Mitglied stays)" do
|
|
restore = put_oidc_config(admin_group_name: nil, groups_claim: "groups")
|
|
on_exit(restore)
|
|
|
|
email = "sync-no-config-#{System.unique_integer([:positive])}@test.example.com"
|
|
{:ok, user} = create_user_with_mitglied(email)
|
|
role_id_before = user.role_id
|
|
user_info = %{"groups" => ["mila-admin"]}
|
|
|
|
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
|
|
|
|
{:ok, after_user} = get_user(user.id)
|
|
assert after_user.role_id == role_id_before
|
|
end
|
|
|
|
test "when user_info contains configured admin group: user gets Admin role" do
|
|
email = "sync-to-admin-#{System.unique_integer([:positive])}@test.example.com"
|
|
{:ok, user} = create_user_with_mitglied(email)
|
|
user_info = %{"groups" => ["mila-admin"]}
|
|
|
|
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
|
|
|
|
{:ok, after_user} = get_user(user.id)
|
|
assert after_user.role_id == admin_role_id()
|
|
end
|
|
|
|
test "when user_info does not contain admin group: user gets Mitglied role" do
|
|
email1 = "sync-to-mitglied-#{System.unique_integer([:positive])}@test.example.com"
|
|
email2 = "other-admin-#{System.unique_integer([:positive])}@test.example.com"
|
|
{:ok, user} = create_user_with_admin(email1)
|
|
{:ok, _} = create_user_with_admin(email2)
|
|
user_info = %{"groups" => ["other-group"]}
|
|
|
|
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
|
|
|
|
{:ok, after_user} = get_user(user.id)
|
|
assert after_user.role_id == mitglied_role_id()
|
|
end
|
|
|
|
test "when OIDC_GROUPS_CLAIM is different: reads groups from that claim" do
|
|
restore = put_oidc_config(admin_group_name: "mila-admin", groups_claim: "ak_groups")
|
|
on_exit(restore)
|
|
|
|
email = "sync-claim-#{System.unique_integer([:positive])}@test.example.com"
|
|
{:ok, user} = create_user_with_mitglied(email)
|
|
user_info = %{"ak_groups" => ["mila-admin"]}
|
|
|
|
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info)
|
|
|
|
{:ok, after_user} = get_user(user.id)
|
|
assert after_user.role_id == admin_role_id()
|
|
end
|
|
|
|
test "user already Admin and user_info without admin group: downgrade to Mitglied" do
|
|
email1 = "sync-downgrade-#{System.unique_integer([:positive])}@test.example.com"
|
|
email2 = "sync-other-admin-#{System.unique_integer([:positive])}@test.example.com"
|
|
{:ok, user1} = create_user_with_admin(email1)
|
|
{:ok, _user2} = create_user_with_admin(email2)
|
|
user_info = %{"groups" => []}
|
|
|
|
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user1, user_info)
|
|
|
|
{:ok, after_user} = get_user(user1.id)
|
|
assert after_user.role_id == mitglied_role_id()
|
|
end
|
|
|
|
test "when user_info has no groups, groups are read from access_token JWT (e.g. Rauthy)" do
|
|
email = "sync-from-token-#{System.unique_integer([:positive])}@test.example.com"
|
|
{:ok, user} = create_user_with_mitglied(email)
|
|
user_info = %{"sub" => "oidc-123"}
|
|
|
|
# Minimal JWT: header.payload.signature with "groups" in payload (Rauthy puts groups in access_token)
|
|
payload = Jason.encode!(%{"groups" => ["mila-admin"], "sub" => "oidc-123"})
|
|
payload_b64 = Base.url_encode64(payload, padding: false)
|
|
header_b64 = Base.url_encode64("{\"alg\":\"HS256\",\"typ\":\"JWT\"}", padding: false)
|
|
sig_b64 = Base.url_encode64("sig", padding: false)
|
|
access_token = "#{header_b64}.#{payload_b64}.#{sig_b64}"
|
|
oauth_tokens = %{"access_token" => access_token}
|
|
|
|
assert :ok = OidcRoleSync.apply_admin_role_from_user_info(user, user_info, oauth_tokens)
|
|
|
|
{:ok, after_user} = get_user(user.id)
|
|
assert after_user.role_id == admin_role_id()
|
|
end
|
|
end
|
|
|
|
# B3: Role sync after registration is implemented via after_action in register_with_rauthy.
|
|
# Full integration tests (create_register_with_rauthy + assert role) are skipped: when the
|
|
# nested Ash.update! runs inside the create's after_action, authorization may evaluate in
|
|
# the create context so set_role_from_oidc_sync bypass does not apply. Sync logic is covered
|
|
# by the apply_admin_role_from_user_info tests above. B4 sign-in sync will also use that.
|
|
|
|
defp ensure_roles_exist do
|
|
for {name, perm} <- [{"Admin", "admin"}, {"Mitglied", "own_data"}] do
|
|
case Role
|
|
|> Ash.Query.filter(name == ^name)
|
|
|> Ash.read_one(authorize?: false, domain: Mv.Authorization) do
|
|
{:ok, nil} ->
|
|
Role
|
|
|> Ash.Changeset.for_create(:create_role_with_system_flag, %{
|
|
name: name,
|
|
description: name,
|
|
permission_set_name: perm,
|
|
is_system_role: name == "Mitglied"
|
|
})
|
|
|> Ash.create!(authorize?: false, domain: Mv.Authorization)
|
|
|
|
_ ->
|
|
:ok
|
|
end
|
|
end
|
|
end
|
|
|
|
defp put_oidc_config(opts) do
|
|
current = Application.get_env(:mv, :oidc_role_sync, [])
|
|
merged = Keyword.merge(current, opts)
|
|
Application.put_env(:mv, :oidc_role_sync, merged)
|
|
|
|
fn ->
|
|
Application.put_env(:mv, :oidc_role_sync, current)
|
|
end
|
|
end
|
|
|
|
defp admin_role_id do
|
|
{:ok, role} = Role.get_admin_role()
|
|
role.id
|
|
end
|
|
|
|
defp mitglied_role_id do
|
|
{:ok, role} = Role.get_mitglied_role()
|
|
role.id
|
|
end
|
|
|
|
defp get_user(id) do
|
|
User
|
|
|> Ash.Query.filter(id == ^id)
|
|
|> Ash.read_one(authorize?: false, domain: Mv.Accounts)
|
|
end
|
|
|
|
defp create_user_with_mitglied(email) do
|
|
{:ok, _} = Accounts.create_user(%{email: email}, authorize?: false)
|
|
get_user_by_email(email)
|
|
end
|
|
|
|
defp create_user_with_admin(email) do
|
|
{:ok, _} = Accounts.create_user(%{email: email}, authorize?: false)
|
|
{:ok, u} = get_user_by_email(email)
|
|
|
|
u
|
|
|> Ash.Changeset.for_update(:update_user, %{role_id: admin_role_id()})
|
|
|> Ash.update!(authorize?: false)
|
|
|
|
get_user(u.id)
|
|
end
|
|
|
|
defp get_user_by_email(email) do
|
|
User
|
|
|> Ash.Query.filter(email == ^email)
|
|
|> Ash.read_one(authorize?: false, domain: Mv.Accounts)
|
|
end
|
|
end
|