Cycle Management & Member Integration closes #279 #294

Open
moritz wants to merge 49 commits from feature/279_cycle_management into main
Showing only changes of commit 15fc897f2a - Show all commits

View file

@ -190,58 +190,68 @@ defmodule Mv.MembershipFees.CycleGenerator do
# Check if we're already in a transaction (e.g., called from Ash action)
if Repo.in_transaction?() do
# Already in transaction: use advisory lock directly without starting new transaction
# This prevents nested transactions which can cause deadlocks
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
case fun.() do
{:ok, result, notifications} when is_list(notifications) ->
# Notifications will be sent after the outer transaction commits
# Return in same format as non-transaction case for consistency
{:ok, result}
{:ok, result} ->
{:ok, result}
{:error, reason} ->
{:error, reason}
end
with_advisory_lock_in_transaction(lock_key, fun)
else
# Not in transaction: start new transaction with advisory lock
result =
Repo.transaction(fn ->
# Acquire advisory lock for this transaction
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
case fun.() do
{:ok, result, notifications} when is_list(notifications) ->
# Return result and notifications separately
{result, notifications}
{:ok, result} ->
# Handle case where no notifications were returned (backward compatibility)
{result, []}
{:error, reason} ->
Repo.rollback(reason)
end
end)
# Extract result and notifications, send notifications after transaction
case result do
{:ok, {cycles, notifications}} ->
if Enum.any?(notifications) do
Ash.Notifier.notify(notifications)
end
{:ok, cycles}
{:error, reason} ->
{:error, reason}
end
with_advisory_lock_new_transaction(lock_key, fun)
end
end
# Already in transaction: use advisory lock directly without starting new transaction
# This prevents nested transactions which can cause deadlocks
defp with_advisory_lock_in_transaction(lock_key, fun) do
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
normalize_fun_result(fun.())
end
# Not in transaction: start new transaction with advisory lock
defp with_advisory_lock_new_transaction(lock_key, fun) do
result =
Repo.transaction(fn ->
# Acquire advisory lock for this transaction
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
execute_within_transaction(fun)
end)
handle_transaction_result(result)
end
# Execute function within transaction and return normalized result
defp execute_within_transaction(fun) do
case fun.() do
{:ok, result, notifications} when is_list(notifications) ->
# Return result and notifications separately
{result, notifications}
{:ok, result} ->
# Handle case where no notifications were returned (backward compatibility)
{result, []}
{:error, reason} ->
Repo.rollback(reason)
end
end
# Normalize function result to consistent format
defp normalize_fun_result({:ok, result, _notifications}) do
# Notifications will be sent after the outer transaction commits
# Return in same format as non-transaction case for consistency
{:ok, result}
end
defp normalize_fun_result({:ok, result}), do: {:ok, result}
defp normalize_fun_result({:error, reason}), do: {:error, reason}
# Handle transaction result and send notifications if needed
defp handle_transaction_result({:ok, {cycles, notifications}}) do
if Enum.any?(notifications) do
Ash.Notifier.notify(notifications)
end
{:ok, cycles}
end
defp handle_transaction_result({:error, reason}), do: {:error, reason}
defp do_generate_cycles(member, today) do
# Reload member with relationships to ensure fresh data
case load_member(member.id) do