diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index 4ed7ee7..7601380 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -190,58 +190,68 @@ defmodule Mv.MembershipFees.CycleGenerator do # Check if we're already in a transaction (e.g., called from Ash action) if Repo.in_transaction?() do - # Already in transaction: use advisory lock directly without starting new transaction - # This prevents nested transactions which can cause deadlocks - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - - case fun.() do - {:ok, result, notifications} when is_list(notifications) -> - # Notifications will be sent after the outer transaction commits - # Return in same format as non-transaction case for consistency - {:ok, result} - - {:ok, result} -> - {:ok, result} - - {:error, reason} -> - {:error, reason} - end + with_advisory_lock_in_transaction(lock_key, fun) else - # Not in transaction: start new transaction with advisory lock - result = - Repo.transaction(fn -> - # Acquire advisory lock for this transaction - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - - case fun.() do - {:ok, result, notifications} when is_list(notifications) -> - # Return result and notifications separately - {result, notifications} - - {:ok, result} -> - # Handle case where no notifications were returned (backward compatibility) - {result, []} - - {:error, reason} -> - Repo.rollback(reason) - end - end) - - # Extract result and notifications, send notifications after transaction - case result do - {:ok, {cycles, notifications}} -> - if Enum.any?(notifications) do - Ash.Notifier.notify(notifications) - end - - {:ok, cycles} - - {:error, reason} -> - {:error, reason} - end + with_advisory_lock_new_transaction(lock_key, fun) end end + # Already in transaction: use advisory lock directly without starting new transaction + # This prevents nested transactions which can cause deadlocks + defp with_advisory_lock_in_transaction(lock_key, fun) do + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + normalize_fun_result(fun.()) + end + + # Not in transaction: start new transaction with advisory lock + defp with_advisory_lock_new_transaction(lock_key, fun) do + result = + Repo.transaction(fn -> + # Acquire advisory lock for this transaction + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + execute_within_transaction(fun) + end) + + handle_transaction_result(result) + end + + # Execute function within transaction and return normalized result + defp execute_within_transaction(fun) do + case fun.() do + {:ok, result, notifications} when is_list(notifications) -> + # Return result and notifications separately + {result, notifications} + + {:ok, result} -> + # Handle case where no notifications were returned (backward compatibility) + {result, []} + + {:error, reason} -> + Repo.rollback(reason) + end + end + + # Normalize function result to consistent format + defp normalize_fun_result({:ok, result, _notifications}) do + # Notifications will be sent after the outer transaction commits + # Return in same format as non-transaction case for consistency + {:ok, result} + end + + defp normalize_fun_result({:ok, result}), do: {:ok, result} + defp normalize_fun_result({:error, reason}), do: {:error, reason} + + # Handle transaction result and send notifications if needed + defp handle_transaction_result({:ok, {cycles, notifications}}) do + if Enum.any?(notifications) do + Ash.Notifier.notify(notifications) + end + + {:ok, cycles} + end + + defp handle_transaction_result({:error, reason}), do: {:error, reason} + defp do_generate_cycles(member, today) do # Reload member with relationships to ensure fresh data case load_member(member.id) do