Use system actor for cycle generation

Update cycle generator, member hooks, and job to use system actor.
Remove actor parameters as cycle generation is a mandatory side effect.
This commit is contained in:
Moritz 2026-01-20 22:09:20 +01:00
parent f0169c95b7
commit c64b74588f
3 changed files with 65 additions and 87 deletions

View file

@ -125,7 +125,7 @@ defmodule Mv.Membership.Member do
{:ok, member} ->
if member.membership_fee_type_id && member.join_date do
actor = Map.get(changeset.context, :actor)
handle_cycle_generation(member, actor: actor)
handle_cycle_generation(member, [])
end
{:error, _} ->
@ -196,9 +196,7 @@ defmodule Mv.Membership.Member do
Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id)
if fee_type_changed && member.membership_fee_type_id && member.join_date do
actor = Map.get(changeset.context, :actor)
case regenerate_cycles_on_type_change(member, actor: actor) do
case regenerate_cycles_on_type_change(member) do
{:ok, notifications} ->
# Return notifications to Ash - they will be sent automatically after commit
{:ok, member, notifications}
@ -231,7 +229,7 @@ defmodule Mv.Membership.Member do
if (join_date_changed || exit_date_changed) && member.membership_fee_type_id do
actor = Map.get(changeset.context, :actor)
handle_cycle_generation(member, actor: actor)
handle_cycle_generation(member, [])
end
{:error, _} ->
@ -790,37 +788,37 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} or {:error, reason} where notifications are collected
# to be sent after transaction commits
@doc false
def regenerate_cycles_on_type_change(member, opts \\ []) do
# Uses system actor for cycle regeneration (mandatory side effect)
def regenerate_cycles_on_type_change(member, _opts \\ []) do
alias Mv.Helpers
alias Mv.Helpers.SystemActor
today = Date.utc_today()
lock_key = :erlang.phash2(member.id)
actor = Keyword.get(opts, :actor)
# Use advisory lock to prevent concurrent deletion and regeneration
# This ensures atomicity when multiple updates happen simultaneously
if Mv.Repo.in_transaction?() do
regenerate_cycles_in_transaction(member, today, lock_key, actor: actor)
regenerate_cycles_in_transaction(member, today, lock_key)
else
regenerate_cycles_new_transaction(member, today, lock_key, actor: actor)
regenerate_cycles_new_transaction(member, today, lock_key)
end
end
# Already in transaction: use advisory lock directly
# Returns {:ok, notifications} - notifications should be returned to after_action hook
defp regenerate_cycles_in_transaction(member, today, lock_key, opts) do
actor = Keyword.get(opts, :actor)
defp regenerate_cycles_in_transaction(member, today, lock_key) do
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
do_regenerate_cycles_on_type_change(member, today, skip_lock?: true, actor: actor)
do_regenerate_cycles_on_type_change(member, today, skip_lock?: true)
end
# Not in transaction: start new transaction with advisory lock
# Returns {:ok, notifications} - notifications should be sent by caller (e.g., via after_action)
defp regenerate_cycles_new_transaction(member, today, lock_key, opts) do
actor = Keyword.get(opts, :actor)
defp regenerate_cycles_new_transaction(member, today, lock_key) do
Mv.Repo.transaction(fn ->
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true, actor: actor) do
case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do
{:ok, notifications} ->
# Return notifications - they will be sent by the caller
notifications
@ -838,11 +836,16 @@ defmodule Mv.Membership.Member do
# Performs the actual cycle deletion and regeneration
# Returns {:ok, notifications} or {:error, reason}
# notifications are collected to be sent after transaction commits
# Uses system actor for all operations
defp do_regenerate_cycles_on_type_change(member, today, opts) do
alias Mv.Helpers
alias Mv.Helpers.SystemActor
require Ash.Query
skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
system_actor = SystemActor.get_system_actor()
actor_opts = Helpers.ash_actor_opts(system_actor)
# Find all unpaid cycles for this member
# We need to check cycle_end for each cycle using its own interval
@ -852,21 +855,11 @@ defmodule Mv.Membership.Member do
|> Ash.Query.filter(status == :unpaid)
|> Ash.Query.load([:membership_fee_type])
result =
if actor do
Ash.read(all_unpaid_cycles_query, actor: actor)
else
Ash.read(all_unpaid_cycles_query)
end
case result do
case Ash.read(all_unpaid_cycles_query, actor_opts) do
{:ok, all_unpaid_cycles} ->
cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today,
skip_lock?: skip_lock?,
actor: actor
)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today, skip_lock?: skip_lock?)
{:error, reason} ->
{:error, reason}
@ -893,16 +886,16 @@ defmodule Mv.Membership.Member do
# Deletes future cycles and regenerates them with the new type/amount
# Passes today to ensure consistent date across deletion and regeneration
# Returns {:ok, notifications} or {:error, reason}
# Uses system actor for cycle generation
defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
if Enum.empty?(cycles_to_delete) do
# No cycles to delete, just regenerate
regenerate_cycles(member_id, today, skip_lock?: skip_lock?, actor: actor)
regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
else
case delete_cycles(cycles_to_delete) do
:ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?, actor: actor)
:ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
{:error, reason} -> {:error, reason}
end
end
@ -928,13 +921,11 @@ defmodule Mv.Membership.Member do
# Returns {:ok, notifications} - notifications should be returned to after_action hook
defp regenerate_cycles(member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false)
actor = Keyword.get(opts, :actor)
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
member_id,
today: today,
skip_lock?: skip_lock?,
actor: actor
skip_lock?: skip_lock?
) do
{:ok, _cycles, notifications} when is_list(notifications) ->
{:ok, notifications}
@ -948,25 +939,22 @@ defmodule Mv.Membership.Member do
# based on environment (test vs production)
# This function encapsulates the common logic for cycle generation
# to avoid code duplication across different hooks
defp handle_cycle_generation(member, opts) do
actor = Keyword.get(opts, :actor)
# Uses system actor for cycle generation (mandatory side effect)
defp handle_cycle_generation(member, _opts) do
if Mv.Config.sql_sandbox?() do
handle_cycle_generation_sync(member, actor: actor)
handle_cycle_generation_sync(member)
else
handle_cycle_generation_async(member, actor: actor)
handle_cycle_generation_async(member)
end
end
# Runs cycle generation synchronously (for test environment)
defp handle_cycle_generation_sync(member, opts) do
defp handle_cycle_generation_sync(member) do
require Logger
actor = Keyword.get(opts, :actor)
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
member.id,
today: Date.utc_today(),
actor: actor
today: Date.utc_today()
) do
{:ok, cycles, notifications} ->
send_notifications_if_any(notifications)
@ -978,11 +966,9 @@ defmodule Mv.Membership.Member do
end
# Runs cycle generation asynchronously (for production environment)
defp handle_cycle_generation_async(member, opts) do
actor = Keyword.get(opts, :actor)
defp handle_cycle_generation_async(member) do
Task.Supervisor.async_nolink(Mv.TaskSupervisor, fn ->
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id, actor: actor) do
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do
{:ok, cycles, notifications} ->
send_notifications_if_any(notifications)
log_cycle_generation_success(member, cycles, notifications, sync: false)