Pass actor parameter through cycle generation

Extract actor from changeset context in Member hooks and pass it
through all cycle generation functions to ensure proper authorization.
This commit is contained in:
Moritz 2026-01-09 05:26:01 +01:00
parent 01cc5aa3a1
commit dbd79075f5
Signed by: moritz
GPG key ID: 1020A035E5DD0824
2 changed files with 95 additions and 45 deletions

View file

@ -119,11 +119,12 @@ defmodule Mv.Membership.Member do
# Only runs if membership_fee_type_id is set # Only runs if membership_fee_type_id is set
# Note: Cycle generation runs asynchronously to not block the action, # Note: Cycle generation runs asynchronously to not block the action,
# but in test environment it runs synchronously for DB sandbox compatibility # but in test environment it runs synchronously for DB sandbox compatibility
change after_transaction(fn _changeset, result, _context -> change after_transaction(fn changeset, result, _context ->
case result do case result do
{:ok, member} -> {:ok, member} ->
if member.membership_fee_type_id && member.join_date do if member.membership_fee_type_id && member.join_date do
handle_cycle_generation(member) actor = Map.get(changeset.context, :actor)
handle_cycle_generation(member, actor: actor)
end end
{:error, _} -> {:error, _} ->
@ -194,7 +195,9 @@ defmodule Mv.Membership.Member do
Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id) Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id)
if fee_type_changed && member.membership_fee_type_id && member.join_date do if fee_type_changed && member.membership_fee_type_id && member.join_date do
case regenerate_cycles_on_type_change(member) do actor = Map.get(changeset.context, :actor)
case regenerate_cycles_on_type_change(member, actor: actor) do
{:ok, notifications} -> {:ok, notifications} ->
# Return notifications to Ash - they will be sent automatically after commit # Return notifications to Ash - they will be sent automatically after commit
{:ok, member, notifications} {:ok, member, notifications}
@ -226,7 +229,8 @@ defmodule Mv.Membership.Member do
exit_date_changed = Ash.Changeset.changing_attribute?(changeset, :exit_date) exit_date_changed = Ash.Changeset.changing_attribute?(changeset, :exit_date)
if (join_date_changed || exit_date_changed) && member.membership_fee_type_id do if (join_date_changed || exit_date_changed) && member.membership_fee_type_id do
handle_cycle_generation(member) actor = Map.get(changeset.context, :actor)
handle_cycle_generation(member, actor: actor)
end end
{:error, _} -> {:error, _} ->
@ -783,33 +787,37 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} or {:error, reason} where notifications are collected # Returns {:ok, notifications} or {:error, reason} where notifications are collected
# to be sent after transaction commits # to be sent after transaction commits
@doc false @doc false
def regenerate_cycles_on_type_change(member) do def regenerate_cycles_on_type_change(member, opts \\ []) do
today = Date.utc_today() today = Date.utc_today()
lock_key = :erlang.phash2(member.id) lock_key = :erlang.phash2(member.id)
actor = Keyword.get(opts, :actor)
# Use advisory lock to prevent concurrent deletion and regeneration # Use advisory lock to prevent concurrent deletion and regeneration
# This ensures atomicity when multiple updates happen simultaneously # This ensures atomicity when multiple updates happen simultaneously
if Mv.Repo.in_transaction?() do if Mv.Repo.in_transaction?() do
regenerate_cycles_in_transaction(member, today, lock_key) regenerate_cycles_in_transaction(member, today, lock_key, actor: actor)
else else
regenerate_cycles_new_transaction(member, today, lock_key) regenerate_cycles_new_transaction(member, today, lock_key, actor: actor)
end end
end end
# Already in transaction: use advisory lock directly # Already in transaction: use advisory lock directly
# Returns {:ok, notifications} - notifications should be returned to after_action hook # Returns {:ok, notifications} - notifications should be returned to after_action hook
defp regenerate_cycles_in_transaction(member, today, lock_key) do defp regenerate_cycles_in_transaction(member, today, lock_key, opts) do
actor = Keyword.get(opts, :actor)
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do_regenerate_cycles_on_type_change(member, today, skip_lock?: true, actor: actor)
end end
# Not in transaction: start new transaction with advisory lock # Not in transaction: start new transaction with advisory lock
# Returns {:ok, notifications} - notifications should be sent by caller (e.g., via after_action) # Returns {:ok, notifications} - notifications should be sent by caller (e.g., via after_action)
defp regenerate_cycles_new_transaction(member, today, lock_key) do defp regenerate_cycles_new_transaction(member, today, lock_key, opts) do
actor = Keyword.get(opts, :actor)
Mv.Repo.transaction(fn -> Mv.Repo.transaction(fn ->
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true, actor: actor) do
{:ok, notifications} -> {:ok, notifications} ->
# Return notifications - they will be sent by the caller # Return notifications - they will be sent by the caller
notifications notifications
@ -831,6 +839,7 @@ defmodule Mv.Membership.Member do
require Ash.Query require Ash.Query
skip_lock? = Keyword.get(opts, :skip_lock?, false) skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
# Find all unpaid cycles for this member # Find all unpaid cycles for this member
# We need to check cycle_end for each cycle using its own interval # We need to check cycle_end for each cycle using its own interval
@ -840,10 +849,21 @@ defmodule Mv.Membership.Member do
|> Ash.Query.filter(status == :unpaid) |> Ash.Query.filter(status == :unpaid)
|> Ash.Query.load([:membership_fee_type]) |> Ash.Query.load([:membership_fee_type])
case Ash.read(all_unpaid_cycles_query) do result =
if actor do
Ash.read(all_unpaid_cycles_query, actor: actor)
else
Ash.read(all_unpaid_cycles_query)
end
case result do
{:ok, all_unpaid_cycles} -> {:ok, all_unpaid_cycles} ->
cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today) cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today, skip_lock?: skip_lock?)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today,
skip_lock?: skip_lock?,
actor: actor
)
{:error, reason} -> {:error, reason} ->
{:error, reason} {:error, reason}
@ -872,13 +892,14 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} or {:error, reason} # Returns {:ok, notifications} or {:error, reason}
defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today, opts) do defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false) skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
if Enum.empty?(cycles_to_delete) do if Enum.empty?(cycles_to_delete) do
# No cycles to delete, just regenerate # No cycles to delete, just regenerate
regenerate_cycles(member_id, today, skip_lock?: skip_lock?) regenerate_cycles(member_id, today, skip_lock?: skip_lock?, actor: actor)
else else
case delete_cycles(cycles_to_delete) do case delete_cycles(cycles_to_delete) do
:ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?) :ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?, actor: actor)
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end end
end end
@ -904,11 +925,13 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} - notifications should be returned to after_action hook # Returns {:ok, notifications} - notifications should be returned to after_action hook
defp regenerate_cycles(member_id, today, opts) do defp regenerate_cycles(member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false) skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member( case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
member_id, member_id,
today: today, today: today,
skip_lock?: skip_lock? skip_lock?: skip_lock?,
actor: actor
) do ) do
{:ok, _cycles, notifications} when is_list(notifications) -> {:ok, _cycles, notifications} when is_list(notifications) ->
{:ok, notifications} {:ok, notifications}
@ -922,21 +945,25 @@ defmodule Mv.Membership.Member do
# based on environment (test vs production) # based on environment (test vs production)
# This function encapsulates the common logic for cycle generation # This function encapsulates the common logic for cycle generation
# to avoid code duplication across different hooks # to avoid code duplication across different hooks
defp handle_cycle_generation(member) do defp handle_cycle_generation(member, opts) do
actor = Keyword.get(opts, :actor)
if Mv.Config.sql_sandbox?() do if Mv.Config.sql_sandbox?() do
handle_cycle_generation_sync(member) handle_cycle_generation_sync(member, actor: actor)
else else
handle_cycle_generation_async(member) handle_cycle_generation_async(member, actor: actor)
end end
end end
# Runs cycle generation synchronously (for test environment) # Runs cycle generation synchronously (for test environment)
defp handle_cycle_generation_sync(member) do defp handle_cycle_generation_sync(member, opts) do
require Logger require Logger
actor = Keyword.get(opts, :actor)
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member( case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
member.id, member.id,
today: Date.utc_today() today: Date.utc_today(),
actor: actor
) do ) do
{:ok, cycles, notifications} -> {:ok, cycles, notifications} ->
send_notifications_if_any(notifications) send_notifications_if_any(notifications)
@ -948,9 +975,11 @@ defmodule Mv.Membership.Member do
end end
# Runs cycle generation asynchronously (for production environment) # Runs cycle generation asynchronously (for production environment)
defp handle_cycle_generation_async(member) do defp handle_cycle_generation_async(member, opts) do
actor = Keyword.get(opts, :actor)
Task.Supervisor.async_nolink(Mv.TaskSupervisor, fn -> Task.Supervisor.async_nolink(Mv.TaskSupervisor, fn ->
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id, actor: actor) do
{:ok, cycles, notifications} -> {:ok, cycles, notifications} ->
send_notifications_if_any(notifications) send_notifications_if_any(notifications)
log_cycle_generation_success(member, cycles, notifications, sync: false) log_cycle_generation_success(member, cycles, notifications, sync: false)
@ -1179,15 +1208,18 @@ defmodule Mv.Membership.Member do
custom_field_values_arg = Ash.Changeset.get_argument(changeset, :custom_field_values) custom_field_values_arg = Ash.Changeset.get_argument(changeset, :custom_field_values)
if is_nil(custom_field_values_arg) do if is_nil(custom_field_values_arg) do
extract_existing_values(changeset.data) extract_existing_values(changeset.data, changeset)
else else
extract_argument_values(custom_field_values_arg) extract_argument_values(custom_field_values_arg)
end end
end end
# Extracts custom field values from existing member data (update scenario) # Extracts custom field values from existing member data (update scenario)
defp extract_existing_values(member_data) do defp extract_existing_values(member_data, changeset) do
case Ash.load(member_data, :custom_field_values) do actor = Map.get(changeset.context, :actor)
opts = if actor, do: [actor: actor], else: []
case Ash.load(member_data, :custom_field_values, opts) do
{:ok, %{custom_field_values: existing_values}} -> {:ok, %{custom_field_values: existing_values}} ->
Enum.reduce(existing_values, %{}, &extract_value_from_cfv/2) Enum.reduce(existing_values, %{}, &extract_value_from_cfv/2)

View file

@ -77,7 +77,9 @@ defmodule Mv.MembershipFees.CycleGenerator do
def generate_cycles_for_member(member_or_id, opts \\ []) def generate_cycles_for_member(member_or_id, opts \\ [])
def generate_cycles_for_member(member_id, opts) when is_binary(member_id) do def generate_cycles_for_member(member_id, opts) when is_binary(member_id) do
case load_member(member_id) do actor = Keyword.get(opts, :actor)
case load_member(member_id, actor: actor) do
{:ok, member} -> generate_cycles_for_member(member, opts) {:ok, member} -> generate_cycles_for_member(member, opts)
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end end
@ -87,25 +89,27 @@ defmodule Mv.MembershipFees.CycleGenerator do
today = Keyword.get(opts, :today, Date.utc_today()) today = Keyword.get(opts, :today, Date.utc_today())
skip_lock? = Keyword.get(opts, :skip_lock?, false) skip_lock? = Keyword.get(opts, :skip_lock?, false)
do_generate_cycles_with_lock(member, today, skip_lock?) do_generate_cycles_with_lock(member, today, skip_lock?, opts)
end end
# Generate cycles with lock handling # Generate cycles with lock handling
# Returns {:ok, cycles, notifications} - notifications are never sent here, # Returns {:ok, cycles, notifications} - notifications are never sent here,
# they should be returned to the caller (e.g., via after_action hook) # they should be returned to the caller (e.g., via after_action hook)
defp do_generate_cycles_with_lock(member, today, true = _skip_lock?) do defp do_generate_cycles_with_lock(member, today, true = _skip_lock?, opts) do
# Lock already set by caller (e.g., regenerate_cycles_on_type_change) # Lock already set by caller (e.g., regenerate_cycles_on_type_change)
# Just generate cycles without additional locking # Just generate cycles without additional locking
do_generate_cycles(member, today) actor = Keyword.get(opts, :actor)
do_generate_cycles(member, today, actor: actor)
end end
defp do_generate_cycles_with_lock(member, today, false) do defp do_generate_cycles_with_lock(member, today, false, opts) do
lock_key = :erlang.phash2(member.id) lock_key = :erlang.phash2(member.id)
actor = Keyword.get(opts, :actor)
Repo.transaction(fn -> Repo.transaction(fn ->
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
case do_generate_cycles(member, today) do case do_generate_cycles(member, today, actor: actor) do
{:ok, cycles, notifications} -> {:ok, cycles, notifications} ->
# Return cycles and notifications - do NOT send notifications here # Return cycles and notifications - do NOT send notifications here
# They will be sent by the caller (e.g., via after_action hook) # They will be sent by the caller (e.g., via after_action hook)
@ -222,21 +226,33 @@ defmodule Mv.MembershipFees.CycleGenerator do
# Private functions # Private functions
defp load_member(member_id) do defp load_member(member_id, opts) do
Member actor = Keyword.get(opts, :actor)
|> Ash.Query.filter(id == ^member_id)
|> Ash.Query.load([:membership_fee_type, :membership_fee_cycles]) query =
|> Ash.read_one() Member
|> case do |> Ash.Query.filter(id == ^member_id)
|> Ash.Query.load([:membership_fee_type, :membership_fee_cycles])
result =
if actor do
Ash.read_one(query, actor: actor)
else
Ash.read_one(query)
end
case result do
{:ok, nil} -> {:error, :member_not_found} {:ok, nil} -> {:error, :member_not_found}
{:ok, member} -> {:ok, member} {:ok, member} -> {:ok, member}
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end end
end end
defp do_generate_cycles(member, today) do defp do_generate_cycles(member, today, opts) do
actor = Keyword.get(opts, :actor)
# Reload member with relationships to ensure fresh data # Reload member with relationships to ensure fresh data
case load_member(member.id) do case load_member(member.id, actor: actor) do
{:ok, member} -> {:ok, member} ->
cond do cond do
is_nil(member.membership_fee_type_id) -> is_nil(member.membership_fee_type_id) ->
@ -246,7 +262,7 @@ defmodule Mv.MembershipFees.CycleGenerator do
{:error, :no_join_date} {:error, :no_join_date}
true -> true ->
generate_missing_cycles(member, today) generate_missing_cycles(member, today, actor: actor)
end end
{:error, reason} -> {:error, reason} ->
@ -254,7 +270,8 @@ defmodule Mv.MembershipFees.CycleGenerator do
end end
end end
defp generate_missing_cycles(member, today) do defp generate_missing_cycles(member, today, opts) do
actor = Keyword.get(opts, :actor)
fee_type = member.membership_fee_type fee_type = member.membership_fee_type
interval = fee_type.interval interval = fee_type.interval
amount = fee_type.amount amount = fee_type.amount
@ -270,7 +287,7 @@ defmodule Mv.MembershipFees.CycleGenerator do
# Only generate if start_date <= end_date # Only generate if start_date <= end_date
if start_date && Date.compare(start_date, end_date) != :gt do if start_date && Date.compare(start_date, end_date) != :gt do
cycle_starts = generate_cycle_starts(start_date, end_date, interval) cycle_starts = generate_cycle_starts(start_date, end_date, interval)
create_cycles(cycle_starts, member.id, fee_type.id, amount) create_cycles(cycle_starts, member.id, fee_type.id, amount, actor: actor)
else else
{:ok, [], []} {:ok, [], []}
end end
@ -365,7 +382,8 @@ defmodule Mv.MembershipFees.CycleGenerator do
end end
end end
defp create_cycles(cycle_starts, member_id, fee_type_id, amount) do defp create_cycles(cycle_starts, member_id, fee_type_id, amount, opts) do
actor = Keyword.get(opts, :actor)
# Always use return_notifications?: true to collect notifications # Always use return_notifications?: true to collect notifications
# Notifications will be returned to the caller, who is responsible for # Notifications will be returned to the caller, who is responsible for
# sending them (e.g., via after_action hook returning {:ok, result, notifications}) # sending them (e.g., via after_action hook returning {:ok, result, notifications})
@ -380,7 +398,7 @@ defmodule Mv.MembershipFees.CycleGenerator do
} }
handle_cycle_creation_result( handle_cycle_creation_result(
Ash.create(MembershipFeeCycle, attrs, return_notifications?: true), Ash.create(MembershipFeeCycle, attrs, return_notifications?: true, actor: actor),
cycle_start cycle_start
) )
end) end)