feat: add CheckPagePermission plug for page-level authorization

- Plug checks PermissionSets page list; redirects unauthorized to profile or sign-in.
- Router: add plug to :browser pipeline; LiveHelpers: check_page_permission_on_params
  for client-side navigation (push_patch).
This commit is contained in:
Moritz 2026-01-29 23:55:58 +01:00
parent d7f6d1c03c
commit b10b9c893c
Signed by: moritz
GPG key ID: 1020A035E5DD0824
3 changed files with 355 additions and 1 deletions

View file

@ -5,15 +5,18 @@ defmodule MvWeb.LiveHelpers do
## on_mount Hooks
- `:default` - Sets the user's locale from session (defaults to "de")
- `:ensure_user_role_loaded` - Ensures current_user has role relationship loaded
- `:check_page_permission_on_params` - Attaches handle_params hook to enforce page permission on client-side navigation (push_patch)
## Usage
Add to LiveView modules via:
```elixir
on_mount {MvWeb.LiveHelpers, :default}
on_mount {MvWeb.LiveHelpers, :ensure_user_role_loaded}
on_mount {MvWeb.LiveHelpers, :check_page_permission_on_params}
```
"""
import Phoenix.Component
alias MvWeb.Plugs.CheckPagePermission
def on_mount(:default, _params, session, socket) do
locale = session["locale"] || "de"
@ -26,6 +29,40 @@ defmodule MvWeb.LiveHelpers do
{:cont, socket}
end
def on_mount(:check_page_permission_on_params, _params, _session, socket) do
{:cont,
Phoenix.LiveView.attach_hook(
socket,
:check_page_permission,
:handle_params,
&check_page_permission_handle_params/3
)}
end
defp check_page_permission_handle_params(_params, uri, socket) do
path = uri |> URI.parse() |> Map.get(:path, "/") || "/"
if CheckPagePermission.public_path?(path) do
{:cont, socket}
else
user = socket.assigns[:current_user]
host = uri |> URI.parse() |> Map.get(:host) || "localhost"
if CheckPagePermission.user_can_access_page?(user, path, router: MvWeb.Router, host: host) do
{:cont, socket}
else
redirect_to = CheckPagePermission.redirect_target_for_user(user)
socket =
socket
|> Phoenix.LiveView.put_flash(:error, "You don't have permission to access this page.")
|> Phoenix.LiveView.push_navigate(to: redirect_to)
{:halt, socket}
end
end
end
defp ensure_user_role_loaded(socket) do
user = socket.assigns[:current_user]

View file

@ -0,0 +1,315 @@
defmodule MvWeb.Plugs.CheckPagePermission do
@moduledoc """
Plug that checks if the current user has permission to access the requested page.
Runs in the router pipeline before LiveView mounts. Uses PermissionSets page list
and matches the current route template (or request path) against allowed patterns.
## How It Works
1. Public paths (e.g. /auth, /register) are exempt and pass through.
2. Extracts page path from conn via `Phoenix.Router.route_info/4` (route template
like "/members/:id") or falls back to `conn.request_path`.
3. Gets current user from `conn.assigns[:current_user]`.
4. Gets user's permission_set_name from role and calls `PermissionSets.get_permissions/1`.
5. Matches requested path against allowed patterns (exact, dynamic `:param`, wildcard "*").
6. If unauthorized: redirects to "/sign-in" (no user) or "/users/:id" (user profile) with flash error and halts.
## Pattern Matching
- Exact: "/members" == "/members"
- Dynamic: "/members/:id" matches "/members/123"
- Wildcard: "*" matches everything (admin)
- Reserved: the segment "new" is never matched by `:id` or `:slug` (e.g. `/members/new` and `/groups/new` require an explicit page permission).
"""
import Plug.Conn
import Phoenix.Controller
alias Mv.Authorization.PermissionSets
require Logger
def init(opts), do: opts
def call(conn, _opts) do
if public_path?(conn.request_path) do
conn
else
# Ensure role is loaded (load_from_session does not load it; required for permission check)
user =
conn.assigns[:current_user]
|> Mv.Authorization.Actor.ensure_loaded()
conn = Plug.Conn.assign(conn, :current_user, user)
page_path = get_page_path(conn)
request_path = conn.request_path
if has_page_permission?(user, page_path, request_path) do
conn
else
log_page_access_denied(user, page_path)
redirect_to = redirect_target(user)
conn
|> fetch_session()
|> fetch_flash()
|> put_flash(:error, "You don't have permission to access this page.")
|> redirect(to: redirect_to)
|> halt()
end
end
end
@doc """
Returns the redirect URL for an unauthorized user (for LiveView push_redirect).
"""
def redirect_target_for_user(nil), do: "/sign-in"
def redirect_target_for_user(user) when is_map(user) or is_struct(user) do
id = Map.get(user, :id) || Map.get(user, "id")
if id, do: "/users/#{to_string(id)}", else: "/sign-in"
end
def redirect_target_for_user(_), do: "/sign-in"
defp redirect_target(user), do: redirect_target_for_user(user)
@doc """
Returns true if the path is public (no auth/permission check).
Used by LiveView hook to skip redirect on sign-in etc.
"""
def public_path?(path) when is_binary(path) do
path in ["/register", "/reset", "/set_locale", "/sign-in", "/sign-out"] or
String.starts_with?(path, "/auth") or
String.starts_with?(path, "/confirm") or
String.starts_with?(path, "/password-reset")
end
defp get_page_path(conn) do
router = conn.private[:phoenix_router]
get_page_path_from_router(router, conn.method, conn.request_path, conn.host)
end
@doc """
Returns whether the user is allowed to access the given request path.
Used by the plug and by LiveView on_mount/handle_params for client-side navigation.
Options: `:router` (default MvWeb.Router), `:host` (default "localhost").
"""
def user_can_access_page?(user, request_path, opts \\ []) do
router = Keyword.get(opts, :router, MvWeb.Router)
host = Keyword.get(opts, :host, "localhost")
page_path = get_page_path_from_router(router, "GET", request_path, host)
has_page_permission?(user, page_path, request_path)
end
defp get_page_path_from_router(router, method, request_path, host) do
case Phoenix.Router.route_info(router, method, request_path, host) do
%{route: route} -> route
_ -> request_path
end
end
defp has_page_permission?(nil, _page_path, _request_path), do: false
defp has_page_permission?(user, page_path, request_path) do
with ps_name when is_binary(ps_name) <- permission_set_name_from_user(user),
{:ok, ps_atom} <- PermissionSets.permission_set_name_to_atom(ps_name),
permissions <- PermissionSets.get_permissions(ps_atom) do
page_matches?(permissions.pages, page_path, request_path, user)
else
_ -> false
end
end
defp permission_set_name_from_user(user) when is_map(user) or is_struct(user) do
get_in(user, [Access.key(:role), Access.key(:permission_set_name)]) ||
get_in(user, [Access.key("role"), Access.key("permission_set_name")])
end
defp permission_set_name_from_user(_), do: nil
defp user_id_from_user(user) when is_map(user) or is_struct(user) do
id = Map.get(user, :id) || Map.get(user, "id")
if id, do: to_string(id), else: nil
end
defp user_id_from_user(_), do: nil
# Reserved path segments that must not match a single :id param (e.g. /members/new, /users/new).
@reserved_id_segments ["new"]
# For "/users/:id" with own_data we only allow when the id in the path equals the current user's id.
# For "/members/:id" we reject when the segment is reserved (e.g. "new") so /members/new is not allowed.
defp page_matches?(allowed_pages, requested_path, request_path, user) do
Enum.any?(allowed_pages, fn pattern ->
pattern_match?(pattern, requested_path, request_path, user)
end)
end
defp pattern_match?("*", _requested_path, _request_path, _user), do: true
defp pattern_match?(pattern, _requested_path, request_path, user)
when pattern == "/users/:id" do
match_dynamic_route?(pattern, request_path) and
path_param_equals(pattern, request_path, "id", user_id_from_user(user))
end
defp pattern_match?(pattern, _requested_path, request_path, user)
when pattern in ["/users/:id/edit", "/users/:id/show/edit"] do
match_dynamic_route?(pattern, request_path) and
path_param_equals(pattern, request_path, "id", user_id_from_user(user))
end
defp pattern_match?(pattern, _requested_path, request_path, user)
when pattern == "/members/:id" do
match_dynamic_route?(pattern, request_path) and
path_param_not_reserved(pattern, request_path, "id", @reserved_id_segments) and
members_show_allowed?(pattern, request_path, user)
end
defp pattern_match?(pattern, _requested_path, request_path, user)
when pattern in ["/members/:id/edit", "/members/:id/show/edit"] do
match_dynamic_route?(pattern, request_path) and
members_edit_allowed?(pattern, request_path, user)
end
defp pattern_match?(pattern, _requested_path, request_path, _user)
when pattern == "/groups/:slug" do
match_dynamic_route?(pattern, request_path) and
path_param_not_reserved(pattern, request_path, "slug", @reserved_id_segments)
end
defp pattern_match?(pattern, requested_path, _request_path, _user)
when pattern == requested_path do
true
end
defp pattern_match?(pattern, _requested_path, request_path, _user) do
if String.contains?(pattern, ":") do
match_dynamic_route?(pattern, request_path)
else
false
end
end
defp path_param_not_reserved(pattern, request_path, param_name, reserved)
when is_list(reserved) do
segments = String.split(request_path, "/", trim: true)
idx = param_index(pattern, param_name)
if idx < 0 do
false
else
value = Enum.at(segments, idx)
value not in reserved
end
end
defp path_param_equals(pattern, request_path, param_name, expected_value)
when is_binary(expected_value) do
segments = String.split(request_path, "/", trim: true)
idx = param_index(pattern, param_name)
if idx < 0 do
false
else
value = Enum.at(segments, idx)
value == expected_value
end
end
defp path_param_equals(_, _, _, _), do: false
# For own_data: only allow show/edit when :id is the user's linked member. For other permission sets: allow when not reserved.
defp members_show_allowed?(pattern, request_path, user) do
if permission_set_name_from_user(user) == "own_data" do
path_param_equals(pattern, request_path, "id", user_member_id(user))
else
true
end
end
defp members_edit_allowed?(pattern, request_path, user) do
if permission_set_name_from_user(user) == "own_data" do
path_param_equals(pattern, request_path, "id", user_member_id(user))
else
path_param_not_reserved(pattern, request_path, "id", @reserved_id_segments)
end
end
defp user_member_id(user) when is_map(user) or is_struct(user) do
member_id = Map.get(user, :member_id) || Map.get(user, "member_id")
if is_nil(member_id) do
load_member_id_for_user(user)
else
to_string(member_id)
end
end
defp user_member_id(_), do: nil
defp load_member_id_for_user(user) do
id = user_id_from_user(user)
if id do
case Ash.get(Mv.Accounts.User, id, load: [:member], domain: Mv.Accounts, authorize?: false) do
{:ok, loaded} when not is_nil(loaded.member_id) -> to_string(loaded.member_id)
_ -> nil
end
else
nil
end
end
defp param_index(pattern, param_name) do
pattern
|> String.split("/", trim: true)
|> Enum.find_index(fn seg ->
String.starts_with?(seg, ":") and String.trim_leading(seg, ":") == param_name
end)
|> case do
nil -> -1
i -> i
end
end
defp match_dynamic_route?(pattern, path) do
pattern_segments = String.split(pattern, "/", trim: true)
path_segments = String.split(path, "/", trim: true)
if length(pattern_segments) == length(path_segments) do
Enum.zip(pattern_segments, path_segments)
|> Enum.all?(fn {pattern_seg, path_seg} ->
String.starts_with?(pattern_seg, ":") or pattern_seg == path_seg
end)
else
false
end
end
defp log_page_access_denied(user, page_path) do
user_id =
if user do
Map.get(user, :id) || Map.get(user, "id") || "nil"
else
"nil"
end
role_name =
if user do
get_in(user, [Access.key(:role), Access.key(:name)]) ||
get_in(user, [Access.key("role"), Access.key("name")]) || "nil"
else
"nil"
end
Logger.info("""
Page access denied:
User: #{user_id}
Role: #{role_name}
Page: #{page_path}
""")
end
end

View file

@ -14,6 +14,7 @@ defmodule MvWeb.Router do
plug :put_secure_browser_headers
plug :load_from_session
plug :set_locale
plug MvWeb.Plugs.CheckPagePermission
end
pipeline :api do
@ -48,7 +49,8 @@ defmodule MvWeb.Router do
ash_authentication_live_session :authentication_required,
on_mount: [
{MvWeb.LiveUserAuth, :live_user_required},
{MvWeb.LiveHelpers, :ensure_user_role_loaded}
{MvWeb.LiveHelpers, :ensure_user_role_loaded},
{MvWeb.LiveHelpers, :check_page_permission_on_params}
] do
live "/", MemberLive.Index, :index