mitgliederverwaltung/lib/mv/oidc_role_sync.ex
Moritz cd6db4ae28 Refactor: remove debug instrumentation from OidcRoleSync
Drop temporary logging used to diagnose OIDC groups sync in dev.
2026-02-04 18:03:02 +01:00

132 lines
4 KiB
Elixir

defmodule Mv.OidcRoleSync do
@moduledoc """
Syncs user role from OIDC user_info (e.g. groups claim → Admin role).
Used after OIDC registration (register_with_rauthy) and on sign-in so that
users in the configured admin group get the Admin role; others get Mitglied.
Configure via OIDC_ADMIN_GROUP_NAME and OIDC_GROUPS_CLAIM (see OidcRoleSyncConfig).
Groups are read from user_info (ID token claims) first; if missing or empty,
the access_token from oauth_tokens is decoded as JWT and the groups claim is
read from there (e.g. Rauthy puts groups in the access token when scope
includes "groups").
"""
alias Mv.Accounts.User
alias Mv.Authorization.Role
alias Mv.OidcRoleSyncConfig
@doc """
Applies Admin or Mitglied role to the user based on OIDC groups claim.
- If OIDC_ADMIN_GROUP_NAME is not configured: no-op, returns :ok without changing the user.
- If groups (from user_info or access_token) contain the configured admin group: assigns Admin role.
- Otherwise: assigns Mitglied role (downgrade if user was Admin).
user_info is a map (e.g. from ID token claims); oauth_tokens is optional and may
contain "access_token" (JWT) from which the groups claim is read when not in user_info.
"""
@spec apply_admin_role_from_user_info(User.t(), map(), map() | nil) :: :ok
def apply_admin_role_from_user_info(user, user_info, oauth_tokens \\ nil)
when is_map(user_info) do
admin_group = OidcRoleSyncConfig.oidc_admin_group_name()
if is_nil(admin_group) or admin_group == "" do
:ok
else
claim = OidcRoleSyncConfig.oidc_groups_claim()
groups = groups_from_user_info(user_info, claim)
groups =
if Enum.empty?(groups), do: groups_from_access_token(oauth_tokens, claim), else: groups
target_role = if admin_group in groups, do: :admin, else: :mitglied
set_user_role(user, target_role)
end
end
defp groups_from_user_info(user_info, claim) do
value = user_info[claim] || user_info[String.to_existing_atom(claim)]
normalize_groups(value)
rescue
ArgumentError -> normalize_groups(user_info[claim])
end
defp groups_from_access_token(nil, _claim), do: []
defp groups_from_access_token(oauth_tokens, _claim) when not is_map(oauth_tokens), do: []
defp groups_from_access_token(oauth_tokens, claim) do
access_token = oauth_tokens["access_token"] || oauth_tokens[:access_token]
if is_binary(access_token) do
case peek_jwt_claims(access_token) do
{:ok, claims} ->
value = claims[claim] || safe_get_atom(claims, claim)
normalize_groups(value)
_ ->
[]
end
else
[]
end
end
defp safe_get_atom(map, key) when is_binary(key) do
try do
Map.get(map, String.to_existing_atom(key))
rescue
ArgumentError -> nil
end
end
defp safe_get_atom(_map, _key), do: nil
defp peek_jwt_claims(token) do
parts = String.split(token, ".")
if length(parts) == 3 do
[_h, payload_b64, _sig] = parts
case Base.url_decode64(payload_b64, padding: false) do
{:ok, payload} -> Jason.decode(payload)
_ -> :error
end
else
:error
end
end
defp normalize_groups(nil), do: []
defp normalize_groups(list) when is_list(list), do: Enum.map(list, &to_string/1)
defp normalize_groups(single) when is_binary(single), do: [single]
defp normalize_groups(_), do: []
defp set_user_role(user, :admin) do
case Role.get_admin_role() do
{:ok, %Role{} = role} ->
do_set_role(user, role)
_ ->
:ok
end
end
defp set_user_role(user, :mitglied) do
case Role.get_mitglied_role() do
{:ok, %Role{} = role} ->
do_set_role(user, role)
_ ->
:ok
end
end
defp do_set_role(user, role) do
user
|> Ash.Changeset.for_update(:set_role_from_oidc_sync, %{role_id: role.id})
|> Ash.Changeset.set_context(%{private: %{oidc_role_sync: true}})
|> Ash.update!(domain: Mv.Accounts, context: %{private: %{oidc_role_sync: true}})
:ok
end
end