From 15fc897f2a1964218753b266cd5aa600965b57ed Mon Sep 17 00:00:00 2001 From: Moritz Date: Mon, 15 Dec 2025 12:33:20 +0100 Subject: [PATCH 1/4] refactor: reduce complexity of with_advisory_lock function Split the complex with_advisory_lock function into smaller, focused functions to improve readability and reduce cyclomatic complexity --- lib/mv/membership_fees/cycle_generator.ex | 106 ++++++++++++---------- 1 file changed, 58 insertions(+), 48 deletions(-) diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index 4ed7ee7..7601380 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -190,58 +190,68 @@ defmodule Mv.MembershipFees.CycleGenerator do # Check if we're already in a transaction (e.g., called from Ash action) if Repo.in_transaction?() do - # Already in transaction: use advisory lock directly without starting new transaction - # This prevents nested transactions which can cause deadlocks - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - - case fun.() do - {:ok, result, notifications} when is_list(notifications) -> - # Notifications will be sent after the outer transaction commits - # Return in same format as non-transaction case for consistency - {:ok, result} - - {:ok, result} -> - {:ok, result} - - {:error, reason} -> - {:error, reason} - end + with_advisory_lock_in_transaction(lock_key, fun) else - # Not in transaction: start new transaction with advisory lock - result = - Repo.transaction(fn -> - # Acquire advisory lock for this transaction - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - - case fun.() do - {:ok, result, notifications} when is_list(notifications) -> - # Return result and notifications separately - {result, notifications} - - {:ok, result} -> - # Handle case where no notifications were returned (backward compatibility) - {result, []} - - {:error, reason} -> - Repo.rollback(reason) - end - end) - - # Extract result and notifications, send notifications after transaction - case result do - {:ok, {cycles, notifications}} -> - if Enum.any?(notifications) do - Ash.Notifier.notify(notifications) - end - - {:ok, cycles} - - {:error, reason} -> - {:error, reason} - end + with_advisory_lock_new_transaction(lock_key, fun) end end + # Already in transaction: use advisory lock directly without starting new transaction + # This prevents nested transactions which can cause deadlocks + defp with_advisory_lock_in_transaction(lock_key, fun) do + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + normalize_fun_result(fun.()) + end + + # Not in transaction: start new transaction with advisory lock + defp with_advisory_lock_new_transaction(lock_key, fun) do + result = + Repo.transaction(fn -> + # Acquire advisory lock for this transaction + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + execute_within_transaction(fun) + end) + + handle_transaction_result(result) + end + + # Execute function within transaction and return normalized result + defp execute_within_transaction(fun) do + case fun.() do + {:ok, result, notifications} when is_list(notifications) -> + # Return result and notifications separately + {result, notifications} + + {:ok, result} -> + # Handle case where no notifications were returned (backward compatibility) + {result, []} + + {:error, reason} -> + Repo.rollback(reason) + end + end + + # Normalize function result to consistent format + defp normalize_fun_result({:ok, result, _notifications}) do + # Notifications will be sent after the outer transaction commits + # Return in same format as non-transaction case for consistency + {:ok, result} + end + + defp normalize_fun_result({:ok, result}), do: {:ok, result} + defp normalize_fun_result({:error, reason}), do: {:error, reason} + + # Handle transaction result and send notifications if needed + defp handle_transaction_result({:ok, {cycles, notifications}}) do + if Enum.any?(notifications) do + Ash.Notifier.notify(notifications) + end + + {:ok, cycles} + end + + defp handle_transaction_result({:error, reason}), do: {:error, reason} + defp do_generate_cycles(member, today) do # Reload member with relationships to ensure fresh data case load_member(member.id) do From 2f83f35bcc4c87805136b94d1ba30fff2e9f51b6 Mon Sep 17 00:00:00 2001 From: Moritz Date: Mon, 15 Dec 2025 13:12:17 +0100 Subject: [PATCH 2/4] fix: address code review points for cycle regeneration 1. Fix critical notifications bug 2. Fix today inconsistency 3. Add advisory lock around deletion 4. Improve helper function documentation 5. Improve error message UX --- lib/membership/member.ex | 52 ++++++++++++++++--- .../changes/validate_same_interval.ex | 5 +- lib/mv/membership_fees/cycle_generator.ex | 39 ++++++++++---- 3 files changed, 79 insertions(+), 17 deletions(-) diff --git a/lib/membership/member.ex b/lib/membership/member.ex index 8e67570..234fc6f 100644 --- a/lib/membership/member.ex +++ b/lib/membership/member.ex @@ -588,8 +588,14 @@ defmodule Mv.Membership.Member do def show_in_overview?(_), do: true # Helper functions for cycle status calculations + # + # These functions expect membership_fee_cycles to be loaded with membership_fee_type + # preloaded. The calculations explicitly load this relationship, but if called + # directly, ensure membership_fee_type is loaded or the functions will return + # nil/[] when membership_fee_type is missing. @doc false + @spec get_current_cycle(Member.t()) :: MembershipFeeCycle.t() | nil def get_current_cycle(member) do today = Date.utc_today() @@ -619,6 +625,7 @@ defmodule Mv.Membership.Member do end @doc false + @spec get_last_completed_cycle(Member.t()) :: MembershipFeeCycle.t() | nil def get_last_completed_cycle(member) do today = Date.utc_today() @@ -664,6 +671,7 @@ defmodule Mv.Membership.Member do end @doc false + @spec get_overdue_cycles(Member.t()) :: [MembershipFeeCycle.t()] def get_overdue_cycles(member) do today = Date.utc_today() @@ -695,10 +703,40 @@ defmodule Mv.Membership.Member do # Regenerates cycles when membership fee type changes # Deletes future unpaid cycles and regenerates them with the new type/amount + # Uses advisory lock to prevent concurrent modifications defp regenerate_cycles_on_type_change(member) do require Ash.Query + alias Mv.Repo today = Date.utc_today() + lock_key = :erlang.phash2(member.id) + + # Use advisory lock to prevent concurrent deletion and regeneration + # This ensures atomicity when multiple updates happen simultaneously + if Repo.in_transaction?() do + # Already in transaction: use advisory lock directly + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + do_regenerate_cycles_on_type_change(member, today) + else + # Not in transaction: start new transaction with advisory lock + Repo.transaction(fn -> + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + + case do_regenerate_cycles_on_type_change(member, today) do + :ok -> :ok + {:error, reason} -> Repo.rollback(reason) + end + end) + |> case do + {:ok, result} -> result + {:error, reason} -> {:error, reason} + end + end + end + + # Performs the actual cycle deletion and regeneration + defp do_regenerate_cycles_on_type_change(member, today) do + require Ash.Query # Find all unpaid cycles for this member # We need to check cycle_end for each cycle using its own interval @@ -711,7 +749,7 @@ defmodule Mv.Membership.Member do case Ash.read(all_unpaid_cycles_query) do {:ok, all_unpaid_cycles} -> cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today) - delete_and_regenerate_cycles(cycles_to_delete, member.id) + delete_and_regenerate_cycles(cycles_to_delete, member.id, today) {:error, reason} -> {:error, reason} @@ -736,13 +774,14 @@ defmodule Mv.Membership.Member do end # Deletes future cycles and regenerates them with the new type/amount - defp delete_and_regenerate_cycles(cycles_to_delete, member_id) do + # Passes today to ensure consistent date across deletion and regeneration + defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today) do if Enum.empty?(cycles_to_delete) do # No cycles to delete, just regenerate - regenerate_cycles(member_id) + regenerate_cycles(member_id, today) else case delete_cycles(cycles_to_delete) do - :ok -> regenerate_cycles(member_id) + :ok -> regenerate_cycles(member_id, today) {:error, reason} -> {:error, reason} end end @@ -764,8 +803,9 @@ defmodule Mv.Membership.Member do # Regenerates cycles with new type/amount # CycleGenerator detects if already in transaction and uses advisory lock accordingly - defp regenerate_cycles(member_id) do - case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member_id) do + # Passes today to ensure consistent date across deletion and regeneration + defp regenerate_cycles(member_id, today) do + case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member_id, today: today) do {:ok, _cycles} -> :ok {:error, reason} -> {:error, reason} end diff --git a/lib/membership_fees/changes/validate_same_interval.ex b/lib/membership_fees/changes/validate_same_interval.ex index e41441d..8c1efb4 100644 --- a/lib/membership_fees/changes/validate_same_interval.ex +++ b/lib/membership_fees/changes/validate_same_interval.ex @@ -116,7 +116,10 @@ defmodule Mv.MembershipFees.Changes.ValidateSameInterval do # Add validation error when types cannot be loaded defp add_type_validation_error(changeset, _reason) do - message = "Could not validate membership fee type intervals: type not found" + message = + "Could not validate membership fee type intervals. " <> + "The current or new membership fee type no longer exists. " <> + "This may indicate a data consistency issue." Ash.Changeset.add_error( changeset, diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index 7601380..f2539c0 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -216,14 +216,16 @@ defmodule Mv.MembershipFees.CycleGenerator do end # Execute function within transaction and return normalized result + # When not in transaction, create_cycles returns {:ok, cycles, notifications} + # When in transaction, create_cycles returns {:ok, cycles} (notifications handled by Ash) defp execute_within_transaction(fun) do case fun.() do {:ok, result, notifications} when is_list(notifications) -> - # Return result and notifications separately + # Return result and notifications separately (not in transaction case) {result, notifications} {:ok, result} -> - # Handle case where no notifications were returned (backward compatibility) + # In transaction case: notifications handled by Ash automatically {result, []} {:error, reason} -> @@ -232,9 +234,11 @@ defmodule Mv.MembershipFees.CycleGenerator do end # Normalize function result to consistent format + # When in transaction, create_cycles returns {:ok, cycles} (notifications handled by Ash) + # When not in transaction, create_cycles returns {:ok, cycles, notifications} defp normalize_fun_result({:ok, result, _notifications}) do - # Notifications will be sent after the outer transaction commits - # Return in same format as non-transaction case for consistency + # This case should not occur when in transaction (create_cycles handles it) + # But handle it for safety {:ok, result} end @@ -384,6 +388,10 @@ defmodule Mv.MembershipFees.CycleGenerator do end defp create_cycles(cycle_starts, member_id, fee_type_id, amount) do + # If already in a transaction, let Ash handle notifications automatically + # Otherwise, return notifications to send them after transaction commits + return_notifications? = not Repo.in_transaction?() + results = Enum.map(cycle_starts, fn cycle_start -> attrs = %{ @@ -394,10 +402,15 @@ defmodule Mv.MembershipFees.CycleGenerator do status: :unpaid } - # Return notifications to avoid warnings when creating within a transaction - case Ash.create(MembershipFeeCycle, attrs, return_notifications?: true) do - {:ok, cycle, notifications} -> {:ok, cycle, notifications} - {:error, reason} -> {:error, {cycle_start, reason}} + case Ash.create(MembershipFeeCycle, attrs, return_notifications?: return_notifications?) do + {:ok, cycle, notifications} when is_list(notifications) -> + {:ok, cycle, notifications} + + {:ok, cycle} -> + {:ok, cycle, []} + + {:error, reason} -> + {:error, {cycle_start, reason}} end end) @@ -408,8 +421,14 @@ defmodule Mv.MembershipFees.CycleGenerator do if Enum.empty?(errors) do successful_cycles = Enum.map(successes, fn {:ok, cycle, _notifications} -> cycle end) - # Return cycles and notifications to be sent after transaction commits - {:ok, successful_cycles, all_notifications} + + if return_notifications? do + # Return cycles and notifications to be sent after transaction commits + {:ok, successful_cycles, all_notifications} + else + # Notifications are handled automatically by Ash when in transaction + {:ok, successful_cycles} + end else Logger.warning("Some cycles failed to create: #{inspect(errors)}") # Return partial failure with errors From 3fad9120259b85f667604697bcdc17e43b352de3 Mon Sep 17 00:00:00 2001 From: Moritz Date: Mon, 15 Dec 2025 13:15:31 +0100 Subject: [PATCH 3/4] refactor: reduce nesting depth in regenerate_cycles_on_type_change Split the function into smaller, focused functions to reduce nesting depth --- lib/membership/member.ex | 46 +++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/lib/membership/member.ex b/lib/membership/member.ex index 234fc6f..5e574d6 100644 --- a/lib/membership/member.ex +++ b/lib/membership/member.ex @@ -705,7 +705,6 @@ defmodule Mv.Membership.Member do # Deletes future unpaid cycles and regenerates them with the new type/amount # Uses advisory lock to prevent concurrent modifications defp regenerate_cycles_on_type_change(member) do - require Ash.Query alias Mv.Repo today = Date.utc_today() @@ -714,26 +713,39 @@ defmodule Mv.Membership.Member do # Use advisory lock to prevent concurrent deletion and regeneration # This ensures atomicity when multiple updates happen simultaneously if Repo.in_transaction?() do - # Already in transaction: use advisory lock directly - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - do_regenerate_cycles_on_type_change(member, today) + regenerate_cycles_in_transaction(member, today, lock_key) else - # Not in transaction: start new transaction with advisory lock - Repo.transaction(fn -> - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - - case do_regenerate_cycles_on_type_change(member, today) do - :ok -> :ok - {:error, reason} -> Repo.rollback(reason) - end - end) - |> case do - {:ok, result} -> result - {:error, reason} -> {:error, reason} - end + regenerate_cycles_new_transaction(member, today, lock_key) end end + # Already in transaction: use advisory lock directly + defp regenerate_cycles_in_transaction(member, today, lock_key) do + alias Mv.Repo + + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + do_regenerate_cycles_on_type_change(member, today) + end + + # Not in transaction: start new transaction with advisory lock + defp regenerate_cycles_new_transaction(member, today, lock_key) do + alias Mv.Repo + + Repo.transaction(fn -> + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + + case do_regenerate_cycles_on_type_change(member, today) do + :ok -> :ok + {:error, reason} -> Repo.rollback(reason) + end + end) + |> handle_transaction_result() + end + + # Handle transaction result + defp handle_transaction_result({:ok, result}), do: result + defp handle_transaction_result({:error, reason}), do: {:error, reason} + # Performs the actual cycle deletion and regeneration defp do_regenerate_cycles_on_type_change(member, today) do require Ash.Query From 65d156180391bb54622a4337134d6c451e137b8a Mon Sep 17 00:00:00 2001 From: Moritz Date: Mon, 15 Dec 2025 13:22:35 +0100 Subject: [PATCH 4/4] fix: correct return_notifications? logic to prevent missed notifications Fix the logic for return_notifications? in create_cycles --- lib/mv/membership_fees/cycle_generator.ex | 25 ++++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index f2539c0..1019a84 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -216,16 +216,16 @@ defmodule Mv.MembershipFees.CycleGenerator do end # Execute function within transaction and return normalized result - # When not in transaction, create_cycles returns {:ok, cycles, notifications} - # When in transaction, create_cycles returns {:ok, cycles} (notifications handled by Ash) + # When in transaction, create_cycles returns {:ok, cycles, notifications} + # When not in transaction, create_cycles returns {:ok, cycles} defp execute_within_transaction(fun) do case fun.() do {:ok, result, notifications} when is_list(notifications) -> - # Return result and notifications separately (not in transaction case) + # In transaction case: return result and notifications separately {result, notifications} {:ok, result} -> - # In transaction case: notifications handled by Ash automatically + # Not in transaction case: notifications handled by Ash automatically {result, []} {:error, reason} -> @@ -234,11 +234,11 @@ defmodule Mv.MembershipFees.CycleGenerator do end # Normalize function result to consistent format - # When in transaction, create_cycles returns {:ok, cycles} (notifications handled by Ash) - # When not in transaction, create_cycles returns {:ok, cycles, notifications} + # When in transaction, create_cycles returns {:ok, cycles, notifications} + # When not in transaction, create_cycles returns {:ok, cycles} defp normalize_fun_result({:ok, result, _notifications}) do - # This case should not occur when in transaction (create_cycles handles it) - # But handle it for safety + # In transaction case: notifications will be sent after outer transaction commits + # Return in same format as non-transaction case for consistency {:ok, result} end @@ -388,9 +388,10 @@ defmodule Mv.MembershipFees.CycleGenerator do end defp create_cycles(cycle_starts, member_id, fee_type_id, amount) do - # If already in a transaction, let Ash handle notifications automatically - # Otherwise, return notifications to send them after transaction commits - return_notifications? = not Repo.in_transaction?() + # Always return notifications when in a transaction (required by Ash) + # When not in transaction, Ash handles notifications automatically + # When in transaction, we must return notifications and send them after commit + return_notifications? = Repo.in_transaction?() results = Enum.map(cycle_starts, fn cycle_start -> @@ -426,7 +427,7 @@ defmodule Mv.MembershipFees.CycleGenerator do # Return cycles and notifications to be sent after transaction commits {:ok, successful_cycles, all_notifications} else - # Notifications are handled automatically by Ash when in transaction + # Not in transaction: Ash handles notifications automatically {:ok, successful_cycles} end else