diff --git a/lib/membership/member.ex b/lib/membership/member.ex index 8e67570..5e574d6 100644 --- a/lib/membership/member.ex +++ b/lib/membership/member.ex @@ -588,8 +588,14 @@ defmodule Mv.Membership.Member do def show_in_overview?(_), do: true # Helper functions for cycle status calculations + # + # These functions expect membership_fee_cycles to be loaded with membership_fee_type + # preloaded. The calculations explicitly load this relationship, but if called + # directly, ensure membership_fee_type is loaded or the functions will return + # nil/[] when membership_fee_type is missing. @doc false + @spec get_current_cycle(Member.t()) :: MembershipFeeCycle.t() | nil def get_current_cycle(member) do today = Date.utc_today() @@ -619,6 +625,7 @@ defmodule Mv.Membership.Member do end @doc false + @spec get_last_completed_cycle(Member.t()) :: MembershipFeeCycle.t() | nil def get_last_completed_cycle(member) do today = Date.utc_today() @@ -664,6 +671,7 @@ defmodule Mv.Membership.Member do end @doc false + @spec get_overdue_cycles(Member.t()) :: [MembershipFeeCycle.t()] def get_overdue_cycles(member) do today = Date.utc_today() @@ -695,10 +703,52 @@ defmodule Mv.Membership.Member do # Regenerates cycles when membership fee type changes # Deletes future unpaid cycles and regenerates them with the new type/amount + # Uses advisory lock to prevent concurrent modifications defp regenerate_cycles_on_type_change(member) do - require Ash.Query + alias Mv.Repo today = Date.utc_today() + lock_key = :erlang.phash2(member.id) + + # Use advisory lock to prevent concurrent deletion and regeneration + # This ensures atomicity when multiple updates happen simultaneously + if Repo.in_transaction?() do + regenerate_cycles_in_transaction(member, today, lock_key) + else + regenerate_cycles_new_transaction(member, today, lock_key) + end + end + + # Already in transaction: use advisory lock directly + defp regenerate_cycles_in_transaction(member, today, lock_key) do + alias Mv.Repo + + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + do_regenerate_cycles_on_type_change(member, today) + end + + # Not in transaction: start new transaction with advisory lock + defp regenerate_cycles_new_transaction(member, today, lock_key) do + alias Mv.Repo + + Repo.transaction(fn -> + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + + case do_regenerate_cycles_on_type_change(member, today) do + :ok -> :ok + {:error, reason} -> Repo.rollback(reason) + end + end) + |> handle_transaction_result() + end + + # Handle transaction result + defp handle_transaction_result({:ok, result}), do: result + defp handle_transaction_result({:error, reason}), do: {:error, reason} + + # Performs the actual cycle deletion and regeneration + defp do_regenerate_cycles_on_type_change(member, today) do + require Ash.Query # Find all unpaid cycles for this member # We need to check cycle_end for each cycle using its own interval @@ -711,7 +761,7 @@ defmodule Mv.Membership.Member do case Ash.read(all_unpaid_cycles_query) do {:ok, all_unpaid_cycles} -> cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today) - delete_and_regenerate_cycles(cycles_to_delete, member.id) + delete_and_regenerate_cycles(cycles_to_delete, member.id, today) {:error, reason} -> {:error, reason} @@ -736,13 +786,14 @@ defmodule Mv.Membership.Member do end # Deletes future cycles and regenerates them with the new type/amount - defp delete_and_regenerate_cycles(cycles_to_delete, member_id) do + # Passes today to ensure consistent date across deletion and regeneration + defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today) do if Enum.empty?(cycles_to_delete) do # No cycles to delete, just regenerate - regenerate_cycles(member_id) + regenerate_cycles(member_id, today) else case delete_cycles(cycles_to_delete) do - :ok -> regenerate_cycles(member_id) + :ok -> regenerate_cycles(member_id, today) {:error, reason} -> {:error, reason} end end @@ -764,8 +815,9 @@ defmodule Mv.Membership.Member do # Regenerates cycles with new type/amount # CycleGenerator detects if already in transaction and uses advisory lock accordingly - defp regenerate_cycles(member_id) do - case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member_id) do + # Passes today to ensure consistent date across deletion and regeneration + defp regenerate_cycles(member_id, today) do + case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member_id, today: today) do {:ok, _cycles} -> :ok {:error, reason} -> {:error, reason} end diff --git a/lib/membership_fees/changes/validate_same_interval.ex b/lib/membership_fees/changes/validate_same_interval.ex index e41441d..8c1efb4 100644 --- a/lib/membership_fees/changes/validate_same_interval.ex +++ b/lib/membership_fees/changes/validate_same_interval.ex @@ -116,7 +116,10 @@ defmodule Mv.MembershipFees.Changes.ValidateSameInterval do # Add validation error when types cannot be loaded defp add_type_validation_error(changeset, _reason) do - message = "Could not validate membership fee type intervals: type not found" + message = + "Could not validate membership fee type intervals. " <> + "The current or new membership fee type no longer exists. " <> + "This may indicate a data consistency issue." Ash.Changeset.add_error( changeset, diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index 4ed7ee7..1019a84 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -190,58 +190,72 @@ defmodule Mv.MembershipFees.CycleGenerator do # Check if we're already in a transaction (e.g., called from Ash action) if Repo.in_transaction?() do - # Already in transaction: use advisory lock directly without starting new transaction - # This prevents nested transactions which can cause deadlocks - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - - case fun.() do - {:ok, result, notifications} when is_list(notifications) -> - # Notifications will be sent after the outer transaction commits - # Return in same format as non-transaction case for consistency - {:ok, result} - - {:ok, result} -> - {:ok, result} - - {:error, reason} -> - {:error, reason} - end + with_advisory_lock_in_transaction(lock_key, fun) else - # Not in transaction: start new transaction with advisory lock - result = - Repo.transaction(fn -> - # Acquire advisory lock for this transaction - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - - case fun.() do - {:ok, result, notifications} when is_list(notifications) -> - # Return result and notifications separately - {result, notifications} - - {:ok, result} -> - # Handle case where no notifications were returned (backward compatibility) - {result, []} - - {:error, reason} -> - Repo.rollback(reason) - end - end) - - # Extract result and notifications, send notifications after transaction - case result do - {:ok, {cycles, notifications}} -> - if Enum.any?(notifications) do - Ash.Notifier.notify(notifications) - end - - {:ok, cycles} - - {:error, reason} -> - {:error, reason} - end + with_advisory_lock_new_transaction(lock_key, fun) end end + # Already in transaction: use advisory lock directly without starting new transaction + # This prevents nested transactions which can cause deadlocks + defp with_advisory_lock_in_transaction(lock_key, fun) do + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + normalize_fun_result(fun.()) + end + + # Not in transaction: start new transaction with advisory lock + defp with_advisory_lock_new_transaction(lock_key, fun) do + result = + Repo.transaction(fn -> + # Acquire advisory lock for this transaction + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + execute_within_transaction(fun) + end) + + handle_transaction_result(result) + end + + # Execute function within transaction and return normalized result + # When in transaction, create_cycles returns {:ok, cycles, notifications} + # When not in transaction, create_cycles returns {:ok, cycles} + defp execute_within_transaction(fun) do + case fun.() do + {:ok, result, notifications} when is_list(notifications) -> + # In transaction case: return result and notifications separately + {result, notifications} + + {:ok, result} -> + # Not in transaction case: notifications handled by Ash automatically + {result, []} + + {:error, reason} -> + Repo.rollback(reason) + end + end + + # Normalize function result to consistent format + # When in transaction, create_cycles returns {:ok, cycles, notifications} + # When not in transaction, create_cycles returns {:ok, cycles} + defp normalize_fun_result({:ok, result, _notifications}) do + # In transaction case: notifications will be sent after outer transaction commits + # Return in same format as non-transaction case for consistency + {:ok, result} + end + + defp normalize_fun_result({:ok, result}), do: {:ok, result} + defp normalize_fun_result({:error, reason}), do: {:error, reason} + + # Handle transaction result and send notifications if needed + defp handle_transaction_result({:ok, {cycles, notifications}}) do + if Enum.any?(notifications) do + Ash.Notifier.notify(notifications) + end + + {:ok, cycles} + end + + defp handle_transaction_result({:error, reason}), do: {:error, reason} + defp do_generate_cycles(member, today) do # Reload member with relationships to ensure fresh data case load_member(member.id) do @@ -374,6 +388,11 @@ defmodule Mv.MembershipFees.CycleGenerator do end defp create_cycles(cycle_starts, member_id, fee_type_id, amount) do + # Always return notifications when in a transaction (required by Ash) + # When not in transaction, Ash handles notifications automatically + # When in transaction, we must return notifications and send them after commit + return_notifications? = Repo.in_transaction?() + results = Enum.map(cycle_starts, fn cycle_start -> attrs = %{ @@ -384,10 +403,15 @@ defmodule Mv.MembershipFees.CycleGenerator do status: :unpaid } - # Return notifications to avoid warnings when creating within a transaction - case Ash.create(MembershipFeeCycle, attrs, return_notifications?: true) do - {:ok, cycle, notifications} -> {:ok, cycle, notifications} - {:error, reason} -> {:error, {cycle_start, reason}} + case Ash.create(MembershipFeeCycle, attrs, return_notifications?: return_notifications?) do + {:ok, cycle, notifications} when is_list(notifications) -> + {:ok, cycle, notifications} + + {:ok, cycle} -> + {:ok, cycle, []} + + {:error, reason} -> + {:error, {cycle_start, reason}} end end) @@ -398,8 +422,14 @@ defmodule Mv.MembershipFees.CycleGenerator do if Enum.empty?(errors) do successful_cycles = Enum.map(successes, fn {:ok, cycle, _notifications} -> cycle end) - # Return cycles and notifications to be sent after transaction commits - {:ok, successful_cycles, all_notifications} + + if return_notifications? do + # Return cycles and notifications to be sent after transaction commits + {:ok, successful_cycles, all_notifications} + else + # Not in transaction: Ash handles notifications automatically + {:ok, successful_cycles} + end else Logger.warning("Some cycles failed to create: #{inspect(errors)}") # Return partial failure with errors