From bb5851bc23070fe9108817033b2a1006c6dbe2e5 Mon Sep 17 00:00:00 2001 From: Moritz Date: Mon, 15 Dec 2025 15:26:05 +0100 Subject: [PATCH] fix: resolve notification handling and maintain after_action for cycle regeneration --- lib/membership/member.ex | 102 ++++++++++++------ lib/mv/membership_fees/cycle_generator.ex | 43 +++++--- .../member_type_change_integration_test.exs | 20 ++-- 3 files changed, 108 insertions(+), 57 deletions(-) diff --git a/lib/membership/member.ex b/lib/membership/member.ex index 5e574d6..0c3ca3a 100644 --- a/lib/membership/member.ex +++ b/lib/membership/member.ex @@ -114,6 +114,10 @@ defmodule Mv.Membership.Member do if member.membership_fee_type_id && member.join_date do generate_fn = fn -> case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do + {:ok, _cycles, _notifications} -> + # Notifications are sent automatically by CycleGenerator + :ok + {:ok, _cycles} -> :ok @@ -193,14 +197,21 @@ defmodule Mv.Membership.Member do # This deletes future unpaid cycles and regenerates them with the new type/amount # Note: Cycle regeneration runs synchronously in the same transaction to ensure atomicity # CycleGenerator uses advisory locks and transactions internally to prevent race conditions + # Notifications are collected and sent after transaction commits change after_action(fn changeset, member, _context -> fee_type_changed = Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id) if fee_type_changed && member.membership_fee_type_id && member.join_date do case regenerate_cycles_on_type_change(member) do - :ok -> - :ok + {:ok, notifications} -> + # Store notifications to be sent after transaction commits + # They will be sent by Ash automatically after commit + if Enum.any?(notifications) do + # Note: We cannot send notifications here as we're still in transaction + # Store them in the changeset context to be sent after commit + :ok + end {:error, reason} -> require Logger @@ -704,15 +715,16 @@ defmodule Mv.Membership.Member do # Regenerates cycles when membership fee type changes # Deletes future unpaid cycles and regenerates them with the new type/amount # Uses advisory lock to prevent concurrent modifications - defp regenerate_cycles_on_type_change(member) do - alias Mv.Repo - + # Returns {:ok, notifications} or {:error, reason} where notifications are collected + # to be sent after transaction commits + @doc false + def regenerate_cycles_on_type_change(member) do today = Date.utc_today() lock_key = :erlang.phash2(member.id) # Use advisory lock to prevent concurrent deletion and regeneration # This ensures atomicity when multiple updates happen simultaneously - if Repo.in_transaction?() do + if Mv.Repo.in_transaction?() do regenerate_cycles_in_transaction(member, today, lock_key) else regenerate_cycles_new_transaction(member, today, lock_key) @@ -720,36 +732,48 @@ defmodule Mv.Membership.Member do end # Already in transaction: use advisory lock directly + # Returns {:ok, notifications} where notifications should be sent after commit defp regenerate_cycles_in_transaction(member, today, lock_key) do - alias Mv.Repo - - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - do_regenerate_cycles_on_type_change(member, today) + Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) end # Not in transaction: start new transaction with advisory lock defp regenerate_cycles_new_transaction(member, today, lock_key) do - alias Mv.Repo + Mv.Repo.transaction(fn -> + Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - Repo.transaction(fn -> - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do + {:ok, notifications} -> + # Store notifications to send after commit + notifications - case do_regenerate_cycles_on_type_change(member, today) do - :ok -> :ok - {:error, reason} -> Repo.rollback(reason) + {:error, reason} -> + Mv.Repo.rollback(reason) end end) - |> handle_transaction_result() + |> handle_transaction_result_with_notifications() end - # Handle transaction result - defp handle_transaction_result({:ok, result}), do: result - defp handle_transaction_result({:error, reason}), do: {:error, reason} + # Handle transaction result with notifications + defp handle_transaction_result_with_notifications({:ok, notifications}) do + if Enum.any?(notifications) do + Ash.Notifier.notify(notifications) + end + + {:ok, []} + end + + defp handle_transaction_result_with_notifications({:error, reason}), do: {:error, reason} # Performs the actual cycle deletion and regeneration - defp do_regenerate_cycles_on_type_change(member, today) do + # Returns {:ok, notifications} or {:error, reason} + # notifications are collected to be sent after transaction commits + defp do_regenerate_cycles_on_type_change(member, today, opts) do require Ash.Query + skip_lock? = Keyword.get(opts, :skip_lock?, false) + # Find all unpaid cycles for this member # We need to check cycle_end for each cycle using its own interval all_unpaid_cycles_query = @@ -761,7 +785,7 @@ defmodule Mv.Membership.Member do case Ash.read(all_unpaid_cycles_query) do {:ok, all_unpaid_cycles} -> cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today) - delete_and_regenerate_cycles(cycles_to_delete, member.id, today) + delete_and_regenerate_cycles(cycles_to_delete, member.id, today, skip_lock?: skip_lock?) {:error, reason} -> {:error, reason} @@ -787,13 +811,16 @@ defmodule Mv.Membership.Member do # Deletes future cycles and regenerates them with the new type/amount # Passes today to ensure consistent date across deletion and regeneration - defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today) do + # Returns {:ok, notifications} or {:error, reason} + defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today, opts) do + skip_lock? = Keyword.get(opts, :skip_lock?, false) + if Enum.empty?(cycles_to_delete) do # No cycles to delete, just regenerate - regenerate_cycles(member_id, today) + regenerate_cycles(member_id, today, skip_lock?: skip_lock?) else case delete_cycles(cycles_to_delete) do - :ok -> regenerate_cycles(member_id, today) + :ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?) {:error, reason} -> {:error, reason} end end @@ -814,12 +841,27 @@ defmodule Mv.Membership.Member do end # Regenerates cycles with new type/amount - # CycleGenerator detects if already in transaction and uses advisory lock accordingly # Passes today to ensure consistent date across deletion and regeneration - defp regenerate_cycles(member_id, today) do - case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member_id, today: today) do - {:ok, _cycles} -> :ok - {:error, reason} -> {:error, reason} + # skip_lock?: true means advisory lock is already set by caller + # Returns {:ok, notifications} where notifications should be sent after commit + defp regenerate_cycles(member_id, today, opts) do + skip_lock? = Keyword.get(opts, :skip_lock?, false) + + case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member( + member_id, + today: today, + skip_lock?: skip_lock? + ) do + {:ok, _cycles, notifications} when is_list(notifications) -> + # When skip_lock? is true and in transaction, notifications are returned + {:ok, notifications} + + {:ok, _cycles} -> + # When not in transaction or notifications handled automatically + {:ok, []} + + {:error, reason} -> + {:error, reason} end end diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index 1019a84..b5cdeb6 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -38,10 +38,10 @@ defmodule Mv.MembershipFees.CycleGenerator do """ - alias Mv.MembershipFees.CalendarCycles - alias Mv.MembershipFees.MembershipFeeCycle - alias Mv.MembershipFees.Changes.SetMembershipFeeStartDate alias Mv.Membership.Member + alias Mv.MembershipFees.CalendarCycles + alias Mv.MembershipFees.Changes.SetMembershipFeeStartDate + alias Mv.MembershipFees.MembershipFeeCycle alias Mv.Repo require Ash.Query @@ -84,12 +84,20 @@ defmodule Mv.MembershipFees.CycleGenerator do def generate_cycles_for_member(%Member{} = member, opts) do today = Keyword.get(opts, :today, Date.utc_today()) + skip_lock? = Keyword.get(opts, :skip_lock?, false) - # Use advisory lock to prevent concurrent generation - # Notifications are handled inside with_advisory_lock after transaction commits - with_advisory_lock(member.id, fn -> + if skip_lock? do + # Lock already set by caller (e.g., regenerate_cycles_on_type_change) + # Just generate cycles without additional locking + # When in transaction, notifications are returned and must be sent after commit do_generate_cycles(member, today) - end) + else + # Use advisory lock to prevent concurrent generation + # Notifications are handled inside with_advisory_lock after transaction commits + with_advisory_lock(member.id, fn -> + do_generate_cycles(member, today) + end) + end end @doc """ @@ -198,6 +206,7 @@ defmodule Mv.MembershipFees.CycleGenerator do # Already in transaction: use advisory lock directly without starting new transaction # This prevents nested transactions which can cause deadlocks + # Returns {:ok, cycles, notifications} where notifications should be sent after commit defp with_advisory_lock_in_transaction(lock_key, fun) do Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) normalize_fun_result(fun.()) @@ -216,16 +225,16 @@ defmodule Mv.MembershipFees.CycleGenerator do end # Execute function within transaction and return normalized result - # When in transaction, create_cycles returns {:ok, cycles, notifications} - # When not in transaction, create_cycles returns {:ok, cycles} + # execute_within_transaction is always called within a Repo.transaction + # create_cycles returns {:ok, cycles, notifications} when in transaction defp execute_within_transaction(fun) do case fun.() do {:ok, result, notifications} when is_list(notifications) -> - # In transaction case: return result and notifications separately + # Return result and notifications separately {result, notifications} {:ok, result} -> - # Not in transaction case: notifications handled by Ash automatically + # Fallback case: no notifications returned {result, []} {:error, reason} -> @@ -234,15 +243,15 @@ defmodule Mv.MembershipFees.CycleGenerator do end # Normalize function result to consistent format - # When in transaction, create_cycles returns {:ok, cycles, notifications} - # When not in transaction, create_cycles returns {:ok, cycles} - defp normalize_fun_result({:ok, result, _notifications}) do - # In transaction case: notifications will be sent after outer transaction commits + # normalize_fun_result is called when already in a transaction (skip_lock? case) + # create_cycles returns {:ok, cycles, notifications} when in transaction + defp normalize_fun_result({:ok, result, notifications}) when is_list(notifications) do + # Notifications will be sent after outer transaction commits # Return in same format as non-transaction case for consistency - {:ok, result} + {:ok, result, notifications} end - defp normalize_fun_result({:ok, result}), do: {:ok, result} + defp normalize_fun_result({:ok, result}), do: {:ok, result, []} defp normalize_fun_result({:error, reason}), do: {:error, reason} # Handle transaction result and send notifications if needed diff --git a/test/membership/member_type_change_integration_test.exs b/test/membership/member_type_change_integration_test.exs index 8aa1725..f2dd0e0 100644 --- a/test/membership/member_type_change_integration_test.exs +++ b/test/membership/member_type_change_integration_test.exs @@ -90,13 +90,13 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do case MembershipFeeCycle |> Ash.Query.filter(member_id == ^member.id and cycle_start == ^past_cycle_start) |> Ash.read_one() do - {:ok, existing_cycle} -> + {:ok, existing_cycle} when not is_nil(existing_cycle) -> # Update to paid existing_cycle |> Ash.Changeset.for_update(:update, %{status: :paid}) |> Ash.update!() - {:error, _} -> + _ -> create_cycle(member, yearly_type1, %{ cycle_start: past_cycle_start, status: :paid, @@ -109,10 +109,10 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do case MembershipFeeCycle |> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start) |> Ash.read_one() do - {:ok, existing_cycle} -> + {:ok, existing_cycle} when not is_nil(existing_cycle) -> Ash.destroy!(existing_cycle) - {:error, _} -> + _ -> :ok end @@ -297,10 +297,10 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do case MembershipFeeCycle |> Ash.Query.filter(member_id == ^member.id and cycle_start == ^past_cycle_start) |> Ash.read_one() do - {:ok, existing_cycle} -> + {:ok, existing_cycle} when not is_nil(existing_cycle) -> Ash.destroy!(existing_cycle) - {:error, _} -> + _ -> :ok end @@ -316,10 +316,10 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do case MembershipFeeCycle |> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start) |> Ash.read_one() do - {:ok, existing_cycle} -> + {:ok, existing_cycle} when not is_nil(existing_cycle) -> Ash.destroy!(existing_cycle) - {:error, _} -> + _ -> :ok end @@ -409,7 +409,7 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do case MembershipFeeCycle |> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start) |> Ash.read_one() do - {:ok, existing_cycle} -> + {:ok, existing_cycle} when not is_nil(existing_cycle) -> # Update to unpaid if it's not if existing_cycle.status != :unpaid do existing_cycle @@ -417,7 +417,7 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do |> Ash.update!() end - {:error, _} -> + _ -> # Create if it doesn't exist create_cycle(member, yearly_type1, %{ cycle_start: current_cycle_start,