Pass actor parameter through cycle generation

Extract actor from changeset context in Member hooks and pass it
through all cycle generation functions to ensure proper authorization.
This commit is contained in:
Moritz 2026-01-09 05:26:01 +01:00
parent 57bcef17ca
commit c65124c8f4
2 changed files with 95 additions and 45 deletions

View file

@ -117,11 +117,12 @@ defmodule Mv.Membership.Member do
# Only runs if membership_fee_type_id is set
# Note: Cycle generation runs asynchronously to not block the action,
# but in test environment it runs synchronously for DB sandbox compatibility
change after_transaction(fn _changeset, result, _context ->
change after_transaction(fn changeset, result, _context ->
case result do
{:ok, member} ->
if member.membership_fee_type_id && member.join_date do
handle_cycle_generation(member)
actor = Map.get(changeset.context, :actor)
handle_cycle_generation(member, actor: actor)
end
{:error, _} ->
@ -192,7 +193,9 @@ defmodule Mv.Membership.Member do
Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id)
if fee_type_changed && member.membership_fee_type_id && member.join_date do
case regenerate_cycles_on_type_change(member) do
actor = Map.get(changeset.context, :actor)
case regenerate_cycles_on_type_change(member, actor: actor) do
{:ok, notifications} ->
# Return notifications to Ash - they will be sent automatically after commit
{:ok, member, notifications}
@ -224,7 +227,8 @@ defmodule Mv.Membership.Member do
exit_date_changed = Ash.Changeset.changing_attribute?(changeset, :exit_date)
if (join_date_changed || exit_date_changed) && member.membership_fee_type_id do
handle_cycle_generation(member)
actor = Map.get(changeset.context, :actor)
handle_cycle_generation(member, actor: actor)
end
{:error, _} ->
@ -778,33 +782,37 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} or {:error, reason} where notifications are collected
# to be sent after transaction commits
@doc false
def regenerate_cycles_on_type_change(member) do
def regenerate_cycles_on_type_change(member, opts \\ []) do
today = Date.utc_today()
lock_key = :erlang.phash2(member.id)
actor = Keyword.get(opts, :actor)
# Use advisory lock to prevent concurrent deletion and regeneration
# This ensures atomicity when multiple updates happen simultaneously
if Mv.Repo.in_transaction?() do
regenerate_cycles_in_transaction(member, today, lock_key)
regenerate_cycles_in_transaction(member, today, lock_key, actor: actor)
else
regenerate_cycles_new_transaction(member, today, lock_key)
regenerate_cycles_new_transaction(member, today, lock_key, actor: actor)
end
end
# Already in transaction: use advisory lock directly
# Returns {:ok, notifications} - notifications should be returned to after_action hook
defp regenerate_cycles_in_transaction(member, today, lock_key) do
defp regenerate_cycles_in_transaction(member, today, lock_key, opts) do
actor = Keyword.get(opts, :actor)
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
do_regenerate_cycles_on_type_change(member, today, skip_lock?: true)
do_regenerate_cycles_on_type_change(member, today, skip_lock?: true, actor: actor)
end
# Not in transaction: start new transaction with advisory lock
# Returns {:ok, notifications} - notifications should be sent by caller (e.g., via after_action)
defp regenerate_cycles_new_transaction(member, today, lock_key) do
defp regenerate_cycles_new_transaction(member, today, lock_key, opts) do
actor = Keyword.get(opts, :actor)
Mv.Repo.transaction(fn ->
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do
case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true, actor: actor) do
{:ok, notifications} ->
# Return notifications - they will be sent by the caller
notifications
@ -826,6 +834,7 @@ defmodule Mv.Membership.Member do
require Ash.Query
skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
# Find all unpaid cycles for this member
# We need to check cycle_end for each cycle using its own interval
@ -835,10 +844,21 @@ defmodule Mv.Membership.Member do
|> Ash.Query.filter(status == :unpaid)
|> Ash.Query.load([:membership_fee_type])
case Ash.read(all_unpaid_cycles_query) do
result =
if actor do
Ash.read(all_unpaid_cycles_query, actor: actor)
else
Ash.read(all_unpaid_cycles_query)
end
case result do
{:ok, all_unpaid_cycles} ->
cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today, skip_lock?: skip_lock?)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today,
skip_lock?: skip_lock?,
actor: actor
)
{:error, reason} ->
{:error, reason}
@ -867,13 +887,14 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} or {:error, reason}
defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
if Enum.empty?(cycles_to_delete) do
# No cycles to delete, just regenerate
regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
regenerate_cycles(member_id, today, skip_lock?: skip_lock?, actor: actor)
else
case delete_cycles(cycles_to_delete) do
:ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
:ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?, actor: actor)
{:error, reason} -> {:error, reason}
end
end
@ -899,11 +920,13 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} - notifications should be returned to after_action hook
defp regenerate_cycles(member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
member_id,
today: today,
skip_lock?: skip_lock?
skip_lock?: skip_lock?,
actor: actor
) do
{:ok, _cycles, notifications} when is_list(notifications) ->
{:ok, notifications}
@ -917,21 +940,25 @@ defmodule Mv.Membership.Member do
# based on environment (test vs production)
# This function encapsulates the common logic for cycle generation
# to avoid code duplication across different hooks
defp handle_cycle_generation(member) do
defp handle_cycle_generation(member, opts) do
actor = Keyword.get(opts, :actor)
if Mv.Config.sql_sandbox?() do
handle_cycle_generation_sync(member)
handle_cycle_generation_sync(member, actor: actor)
else
handle_cycle_generation_async(member)
handle_cycle_generation_async(member, actor: actor)
end
end
# Runs cycle generation synchronously (for test environment)
defp handle_cycle_generation_sync(member) do
defp handle_cycle_generation_sync(member, opts) do
require Logger
actor = Keyword.get(opts, :actor)
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
member.id,
today: Date.utc_today()
today: Date.utc_today(),
actor: actor
) do
{:ok, cycles, notifications} ->
send_notifications_if_any(notifications)
@ -943,9 +970,11 @@ defmodule Mv.Membership.Member do
end
# Runs cycle generation asynchronously (for production environment)
defp handle_cycle_generation_async(member) do
defp handle_cycle_generation_async(member, opts) do
actor = Keyword.get(opts, :actor)
Task.Supervisor.async_nolink(Mv.TaskSupervisor, fn ->
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id, actor: actor) do
{:ok, cycles, notifications} ->
send_notifications_if_any(notifications)
log_cycle_generation_success(member, cycles, notifications, sync: false)
@ -1197,15 +1226,18 @@ defmodule Mv.Membership.Member do
custom_field_values_arg = Ash.Changeset.get_argument(changeset, :custom_field_values)
if is_nil(custom_field_values_arg) do
extract_existing_values(changeset.data)
extract_existing_values(changeset.data, changeset)
else
extract_argument_values(custom_field_values_arg)
end
end
# Extracts custom field values from existing member data (update scenario)
defp extract_existing_values(member_data) do
case Ash.load(member_data, :custom_field_values) do
defp extract_existing_values(member_data, changeset) do
actor = Map.get(changeset.context, :actor)
opts = if actor, do: [actor: actor], else: []
case Ash.load(member_data, :custom_field_values, opts) do
{:ok, %{custom_field_values: existing_values}} ->
Enum.reduce(existing_values, %{}, &extract_value_from_cfv/2)