fix: resolve notification handling and maintain after_action for cycle regeneration

This commit is contained in:
Moritz 2025-12-15 15:26:05 +01:00
parent 6a91f7c711
commit f7c33bfc7d
Signed by: moritz
GPG key ID: 1020A035E5DD0824
3 changed files with 108 additions and 57 deletions

View file

@ -114,6 +114,10 @@ defmodule Mv.Membership.Member do
if member.membership_fee_type_id && member.join_date do
generate_fn = fn ->
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do
{:ok, _cycles, _notifications} ->
# Notifications are sent automatically by CycleGenerator
:ok
{:ok, _cycles} ->
:ok
@ -193,14 +197,21 @@ defmodule Mv.Membership.Member do
# This deletes future unpaid cycles and regenerates them with the new type/amount
# Note: Cycle regeneration runs synchronously in the same transaction to ensure atomicity
# CycleGenerator uses advisory locks and transactions internally to prevent race conditions
# Notifications are collected and sent after transaction commits
change after_action(fn changeset, member, _context ->
fee_type_changed =
Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id)
if fee_type_changed && member.membership_fee_type_id && member.join_date do
case regenerate_cycles_on_type_change(member) do
:ok ->
:ok
{:ok, notifications} ->
# Store notifications to be sent after transaction commits
# They will be sent by Ash automatically after commit
if Enum.any?(notifications) do
# Note: We cannot send notifications here as we're still in transaction
# Store them in the changeset context to be sent after commit
:ok
end
{:error, reason} ->
require Logger
@ -704,15 +715,16 @@ defmodule Mv.Membership.Member do
# Regenerates cycles when membership fee type changes
# Deletes future unpaid cycles and regenerates them with the new type/amount
# Uses advisory lock to prevent concurrent modifications
defp regenerate_cycles_on_type_change(member) do
alias Mv.Repo
# Returns {:ok, notifications} or {:error, reason} where notifications are collected
# to be sent after transaction commits
@doc false
def regenerate_cycles_on_type_change(member) do
today = Date.utc_today()
lock_key = :erlang.phash2(member.id)
# Use advisory lock to prevent concurrent deletion and regeneration
# This ensures atomicity when multiple updates happen simultaneously
if Repo.in_transaction?() do
if Mv.Repo.in_transaction?() do
regenerate_cycles_in_transaction(member, today, lock_key)
else
regenerate_cycles_new_transaction(member, today, lock_key)
@ -720,36 +732,48 @@ defmodule Mv.Membership.Member do
end
# Already in transaction: use advisory lock directly
# Returns {:ok, notifications} where notifications should be sent after commit
defp regenerate_cycles_in_transaction(member, today, lock_key) do
alias Mv.Repo
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
do_regenerate_cycles_on_type_change(member, today)
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
do_regenerate_cycles_on_type_change(member, today, skip_lock?: true)
end
# Not in transaction: start new transaction with advisory lock
defp regenerate_cycles_new_transaction(member, today, lock_key) do
alias Mv.Repo
Mv.Repo.transaction(fn ->
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
Repo.transaction(fn ->
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do
{:ok, notifications} ->
# Store notifications to send after commit
notifications
case do_regenerate_cycles_on_type_change(member, today) do
:ok -> :ok
{:error, reason} -> Repo.rollback(reason)
{:error, reason} ->
Mv.Repo.rollback(reason)
end
end)
|> handle_transaction_result()
|> handle_transaction_result_with_notifications()
end
# Handle transaction result
defp handle_transaction_result({:ok, result}), do: result
defp handle_transaction_result({:error, reason}), do: {:error, reason}
# Handle transaction result with notifications
defp handle_transaction_result_with_notifications({:ok, notifications}) do
if Enum.any?(notifications) do
Ash.Notifier.notify(notifications)
end
{:ok, []}
end
defp handle_transaction_result_with_notifications({:error, reason}), do: {:error, reason}
# Performs the actual cycle deletion and regeneration
defp do_regenerate_cycles_on_type_change(member, today) do
# Returns {:ok, notifications} or {:error, reason}
# notifications are collected to be sent after transaction commits
defp do_regenerate_cycles_on_type_change(member, today, opts) do
require Ash.Query
skip_lock? = Keyword.get(opts, :skip_lock?, false)
# Find all unpaid cycles for this member
# We need to check cycle_end for each cycle using its own interval
all_unpaid_cycles_query =
@ -761,7 +785,7 @@ defmodule Mv.Membership.Member do
case Ash.read(all_unpaid_cycles_query) do
{:ok, all_unpaid_cycles} ->
cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today)
delete_and_regenerate_cycles(cycles_to_delete, member.id, today, skip_lock?: skip_lock?)
{:error, reason} ->
{:error, reason}
@ -787,13 +811,16 @@ defmodule Mv.Membership.Member do
# Deletes future cycles and regenerates them with the new type/amount
# Passes today to ensure consistent date across deletion and regeneration
defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today) do
# Returns {:ok, notifications} or {:error, reason}
defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false)
if Enum.empty?(cycles_to_delete) do
# No cycles to delete, just regenerate
regenerate_cycles(member_id, today)
regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
else
case delete_cycles(cycles_to_delete) do
:ok -> regenerate_cycles(member_id, today)
:ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
{:error, reason} -> {:error, reason}
end
end
@ -814,12 +841,27 @@ defmodule Mv.Membership.Member do
end
# Regenerates cycles with new type/amount
# CycleGenerator detects if already in transaction and uses advisory lock accordingly
# Passes today to ensure consistent date across deletion and regeneration
defp regenerate_cycles(member_id, today) do
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member_id, today: today) do
{:ok, _cycles} -> :ok
{:error, reason} -> {:error, reason}
# skip_lock?: true means advisory lock is already set by caller
# Returns {:ok, notifications} where notifications should be sent after commit
defp regenerate_cycles(member_id, today, opts) do
skip_lock? = Keyword.get(opts, :skip_lock?, false)
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
member_id,
today: today,
skip_lock?: skip_lock?
) do
{:ok, _cycles, notifications} when is_list(notifications) ->
# When skip_lock? is true and in transaction, notifications are returned
{:ok, notifications}
{:ok, _cycles} ->
# When not in transaction or notifications handled automatically
{:ok, []}
{:error, reason} ->
{:error, reason}
end
end