diff --git a/lib/membership/member.ex b/lib/membership/member.ex index 0c3ca3a..2665b0e 100644 --- a/lib/membership/member.ex +++ b/lib/membership/member.ex @@ -112,14 +112,16 @@ defmodule Mv.Membership.Member do # but in test environment it runs synchronously for DB sandbox compatibility change after_action(fn _changeset, member, _context -> if member.membership_fee_type_id && member.join_date do - generate_fn = fn -> - case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do - {:ok, _cycles, _notifications} -> - # Notifications are sent automatically by CycleGenerator - :ok - - {:ok, _cycles} -> - :ok + if Application.get_env(:mv, :sql_sandbox, false) do + # Run synchronously in test environment for DB sandbox compatibility + # Return notifications to Ash so they are sent after commit + case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member( + member.id, + today: Date.utc_today(), + skip_lock?: false + ) do + {:ok, _cycles, notifications} -> + {:ok, member, notifications} {:error, reason} -> require Logger @@ -127,19 +129,35 @@ defmodule Mv.Membership.Member do Logger.warning( "Failed to generate cycles for member #{member.id}: #{inspect(reason)}" ) - end - end - if Application.get_env(:mv, :sql_sandbox, false) do - # Run synchronously in test environment for DB sandbox compatibility - generate_fn.() + {:ok, member} + end else # Run asynchronously in other environments - Task.start(generate_fn) - end - end + # Notifications cannot be returned in async case, so they will be lost + # This is acceptable as cycle generation is not critical for member creation + Task.start(fn -> + case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do + {:ok, _cycles, notifications} -> + # Send notifications manually for async case + if Enum.any?(notifications) do + Ash.Notifier.notify(notifications) + end - {:ok, member} + {:error, reason} -> + require Logger + + Logger.warning( + "Failed to generate cycles for member #{member.id}: #{inspect(reason)}" + ) + end + end) + + {:ok, member} + end + else + {:ok, member} + end end) end @@ -197,7 +215,7 @@ defmodule Mv.Membership.Member do # This deletes future unpaid cycles and regenerates them with the new type/amount # Note: Cycle regeneration runs synchronously in the same transaction to ensure atomicity # CycleGenerator uses advisory locks and transactions internally to prevent race conditions - # Notifications are collected and sent after transaction commits + # Notifications are returned to Ash and sent automatically after commit change after_action(fn changeset, member, _context -> fee_type_changed = Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id) @@ -205,13 +223,8 @@ defmodule Mv.Membership.Member do if fee_type_changed && member.membership_fee_type_id && member.join_date do case regenerate_cycles_on_type_change(member) do {:ok, notifications} -> - # Store notifications to be sent after transaction commits - # They will be sent by Ash automatically after commit - if Enum.any?(notifications) do - # Note: We cannot send notifications here as we're still in transaction - # Store them in the changeset context to be sent after commit - :ok - end + # Return notifications to Ash - they will be sent automatically after commit + {:ok, member, notifications} {:error, reason} -> require Logger @@ -219,10 +232,12 @@ defmodule Mv.Membership.Member do Logger.warning( "Failed to regenerate cycles for member #{member.id}: #{inspect(reason)}" ) - end - end - {:ok, member} + {:ok, member} + end + else + {:ok, member} + end end) end @@ -732,40 +747,33 @@ defmodule Mv.Membership.Member do end # Already in transaction: use advisory lock directly - # Returns {:ok, notifications} where notifications should be sent after commit + # Returns {:ok, notifications} - notifications should be returned to after_action hook defp regenerate_cycles_in_transaction(member, today, lock_key) do Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) end # Not in transaction: start new transaction with advisory lock + # Returns {:ok, notifications} - notifications should be sent by caller (e.g., via after_action) defp regenerate_cycles_new_transaction(member, today, lock_key) do Mv.Repo.transaction(fn -> Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do {:ok, notifications} -> - # Store notifications to send after commit + # Return notifications - they will be sent by the caller notifications {:error, reason} -> Mv.Repo.rollback(reason) end end) - |> handle_transaction_result_with_notifications() - end - - # Handle transaction result with notifications - defp handle_transaction_result_with_notifications({:ok, notifications}) do - if Enum.any?(notifications) do - Ash.Notifier.notify(notifications) + |> case do + {:ok, notifications} -> {:ok, notifications} + {:error, reason} -> {:error, reason} end - - {:ok, []} end - defp handle_transaction_result_with_notifications({:error, reason}), do: {:error, reason} - # Performs the actual cycle deletion and regeneration # Returns {:ok, notifications} or {:error, reason} # notifications are collected to be sent after transaction commits @@ -843,7 +851,7 @@ defmodule Mv.Membership.Member do # Regenerates cycles with new type/amount # Passes today to ensure consistent date across deletion and regeneration # skip_lock?: true means advisory lock is already set by caller - # Returns {:ok, notifications} where notifications should be sent after commit + # Returns {:ok, notifications} - notifications should be returned to after_action hook defp regenerate_cycles(member_id, today, opts) do skip_lock? = Keyword.get(opts, :skip_lock?, false) @@ -853,13 +861,8 @@ defmodule Mv.Membership.Member do skip_lock?: skip_lock? ) do {:ok, _cycles, notifications} when is_list(notifications) -> - # When skip_lock? is true and in transaction, notifications are returned {:ok, notifications} - {:ok, _cycles} -> - # When not in transaction or notifications handled automatically - {:ok, []} - {:error, reason} -> {:error, reason} end diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index b5cdeb6..2ddae91 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -47,7 +47,8 @@ defmodule Mv.MembershipFees.CycleGenerator do require Ash.Query require Logger - @type generate_result :: {:ok, [MembershipFeeCycle.t()]} | {:error, term()} + @type generate_result :: + {:ok, [MembershipFeeCycle.t()], [Ash.Notifier.Notification.t()]} | {:error, term()} @doc """ Generates membership fee cycles for a single member. @@ -62,14 +63,14 @@ defmodule Mv.MembershipFees.CycleGenerator do ## Returns - - `{:ok, cycles}` - List of newly created cycles + - `{:ok, cycles, notifications}` - List of newly created cycles and notifications - `{:error, reason}` - Error with reason ## Examples - {:ok, cycles} = CycleGenerator.generate_cycles_for_member(member) - {:ok, cycles} = CycleGenerator.generate_cycles_for_member(member_id) - {:ok, cycles} = CycleGenerator.generate_cycles_for_member(member, today: ~D[2024-12-31]) + {:ok, cycles, notifications} = CycleGenerator.generate_cycles_for_member(member) + {:ok, cycles, notifications} = CycleGenerator.generate_cycles_for_member(member_id) + {:ok, cycles, notifications} = CycleGenerator.generate_cycles_for_member(member, today: ~D[2024-12-31]) """ @spec generate_cycles_for_member(Member.t() | String.t(), keyword()) :: generate_result() @@ -86,17 +87,37 @@ defmodule Mv.MembershipFees.CycleGenerator do today = Keyword.get(opts, :today, Date.utc_today()) skip_lock? = Keyword.get(opts, :skip_lock?, false) - if skip_lock? do - # Lock already set by caller (e.g., regenerate_cycles_on_type_change) - # Just generate cycles without additional locking - # When in transaction, notifications are returned and must be sent after commit - do_generate_cycles(member, today) - else - # Use advisory lock to prevent concurrent generation - # Notifications are handled inside with_advisory_lock after transaction commits - with_advisory_lock(member.id, fn -> - do_generate_cycles(member, today) - end) + do_generate_cycles_with_lock(member, today, skip_lock?) + end + + # Generate cycles with lock handling + # Returns {:ok, cycles, notifications} - notifications are never sent here, + # they should be returned to the caller (e.g., via after_action hook) + defp do_generate_cycles_with_lock(member, today, true = _skip_lock?) do + # Lock already set by caller (e.g., regenerate_cycles_on_type_change) + # Just generate cycles without additional locking + do_generate_cycles(member, today) + end + + defp do_generate_cycles_with_lock(member, today, false) do + lock_key = :erlang.phash2(member.id) + + Repo.transaction(fn -> + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + + case do_generate_cycles(member, today) do + {:ok, cycles, notifications} -> + # Return cycles and notifications - do NOT send notifications here + # They will be sent by the caller (e.g., via after_action hook) + {cycles, notifications} + + {:error, reason} -> + Repo.rollback(reason) + end + end) + |> case do + {:ok, {cycles, notifications}} -> {:ok, cycles, notifications} + {:error, reason} -> {:error, reason} end end @@ -192,79 +213,6 @@ defmodule Mv.MembershipFees.CycleGenerator do end end - defp with_advisory_lock(member_id, fun) do - # Convert UUID to integer for advisory lock (use hash) - lock_key = :erlang.phash2(member_id) - - # Check if we're already in a transaction (e.g., called from Ash action) - if Repo.in_transaction?() do - with_advisory_lock_in_transaction(lock_key, fun) - else - with_advisory_lock_new_transaction(lock_key, fun) - end - end - - # Already in transaction: use advisory lock directly without starting new transaction - # This prevents nested transactions which can cause deadlocks - # Returns {:ok, cycles, notifications} where notifications should be sent after commit - defp with_advisory_lock_in_transaction(lock_key, fun) do - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - normalize_fun_result(fun.()) - end - - # Not in transaction: start new transaction with advisory lock - defp with_advisory_lock_new_transaction(lock_key, fun) do - result = - Repo.transaction(fn -> - # Acquire advisory lock for this transaction - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - execute_within_transaction(fun) - end) - - handle_transaction_result(result) - end - - # Execute function within transaction and return normalized result - # execute_within_transaction is always called within a Repo.transaction - # create_cycles returns {:ok, cycles, notifications} when in transaction - defp execute_within_transaction(fun) do - case fun.() do - {:ok, result, notifications} when is_list(notifications) -> - # Return result and notifications separately - {result, notifications} - - {:ok, result} -> - # Fallback case: no notifications returned - {result, []} - - {:error, reason} -> - Repo.rollback(reason) - end - end - - # Normalize function result to consistent format - # normalize_fun_result is called when already in a transaction (skip_lock? case) - # create_cycles returns {:ok, cycles, notifications} when in transaction - defp normalize_fun_result({:ok, result, notifications}) when is_list(notifications) do - # Notifications will be sent after outer transaction commits - # Return in same format as non-transaction case for consistency - {:ok, result, notifications} - end - - defp normalize_fun_result({:ok, result}), do: {:ok, result, []} - defp normalize_fun_result({:error, reason}), do: {:error, reason} - - # Handle transaction result and send notifications if needed - defp handle_transaction_result({:ok, {cycles, notifications}}) do - if Enum.any?(notifications) do - Ash.Notifier.notify(notifications) - end - - {:ok, cycles} - end - - defp handle_transaction_result({:error, reason}), do: {:error, reason} - defp do_generate_cycles(member, today) do # Reload member with relationships to ensure fresh data case load_member(member.id) do @@ -397,11 +345,9 @@ defmodule Mv.MembershipFees.CycleGenerator do end defp create_cycles(cycle_starts, member_id, fee_type_id, amount) do - # Always return notifications when in a transaction (required by Ash) - # When not in transaction, Ash handles notifications automatically - # When in transaction, we must return notifications and send them after commit - return_notifications? = Repo.in_transaction?() - + # Always use return_notifications?: true to collect notifications + # Notifications will be returned to the caller, who is responsible for + # sending them (e.g., via after_action hook returning {:ok, result, notifications}) results = Enum.map(cycle_starts, fn cycle_start -> attrs = %{ @@ -412,7 +358,7 @@ defmodule Mv.MembershipFees.CycleGenerator do status: :unpaid } - case Ash.create(MembershipFeeCycle, attrs, return_notifications?: return_notifications?) do + case Ash.create(MembershipFeeCycle, attrs, return_notifications?: true) do {:ok, cycle, notifications} when is_list(notifications) -> {:ok, cycle, notifications} @@ -431,14 +377,7 @@ defmodule Mv.MembershipFees.CycleGenerator do if Enum.empty?(errors) do successful_cycles = Enum.map(successes, fn {:ok, cycle, _notifications} -> cycle end) - - if return_notifications? do - # Return cycles and notifications to be sent after transaction commits - {:ok, successful_cycles, all_notifications} - else - # Not in transaction: Ash handles notifications automatically - {:ok, successful_cycles} - end + {:ok, successful_cycles, all_notifications} else Logger.warning("Some cycles failed to create: #{inspect(errors)}") # Return partial failure with errors diff --git a/test/membership_fees/member_cycle_integration_test.exs b/test/membership_fees/member_cycle_integration_test.exs index 7cbfbff..5d1cf28 100644 --- a/test/membership_fees/member_cycle_integration_test.exs +++ b/test/membership_fees/member_cycle_integration_test.exs @@ -198,7 +198,7 @@ defmodule Mv.MembershipFees.MemberCycleIntegrationTest do today = ~D[2025-12-31] # Manually trigger generation again with fixed "today" date - {:ok, _} = + {:ok, _, _} = Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id, today: today) final_cycles = get_member_cycles(member.id) diff --git a/test/mv/membership_fees/cycle_generator_edge_cases_test.exs b/test/mv/membership_fees/cycle_generator_edge_cases_test.exs index adca77a..85eb406 100644 --- a/test/mv/membership_fees/cycle_generator_edge_cases_test.exs +++ b/test/mv/membership_fees/cycle_generator_edge_cases_test.exs @@ -84,7 +84,7 @@ defmodule Mv.MembershipFees.CycleGeneratorEdgeCasesTest do Enum.each(existing_cycles, &Ash.destroy!(&1)) # Generate cycles with fixed "today" date - {:ok, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, _, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) end member @@ -128,7 +128,7 @@ defmodule Mv.MembershipFees.CycleGeneratorEdgeCasesTest do |> Ash.update!() # Explicitly generate cycles with fixed "today" date - {:ok, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, _, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # Check all cycles cycles = get_member_cycles(member.id) @@ -158,7 +158,7 @@ defmodule Mv.MembershipFees.CycleGeneratorEdgeCasesTest do |> Ash.update!() # Explicitly generate cycles with fixed "today" date - {:ok, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, _, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # Check all cycles cycles = get_member_cycles(member.id) @@ -333,7 +333,7 @@ defmodule Mv.MembershipFees.CycleGeneratorEdgeCasesTest do # Explicitly generate cycles with fixed "today" date today = ~D[2024-06-15] - {:ok, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, _, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # Check all cycles all_cycles = get_member_cycles(member.id) diff --git a/test/mv/membership_fees/cycle_generator_test.exs b/test/mv/membership_fees/cycle_generator_test.exs index 06dd59e..e6988da 100644 --- a/test/mv/membership_fees/cycle_generator_test.exs +++ b/test/mv/membership_fees/cycle_generator_test.exs @@ -78,7 +78,7 @@ defmodule Mv.MembershipFees.CycleGeneratorTest do # Explicitly generate cycles with fixed "today" date to avoid date dependency today = ~D[2024-06-15] - {:ok, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, _, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # Verify cycles were generated all_cycles = get_member_cycles(member.id) @@ -122,7 +122,7 @@ defmodule Mv.MembershipFees.CycleGeneratorTest do # Generate cycles with specific "today" date today = ~D[2024-06-15] - {:ok, new_cycles} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, new_cycles, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # Should generate only 2023 and 2024 (2022 already exists) new_cycle_years = Enum.map(new_cycles, & &1.cycle_start.year) |> Enum.sort() @@ -144,7 +144,7 @@ defmodule Mv.MembershipFees.CycleGeneratorTest do # Generate cycles with specific "today" date far in the future today = ~D[2025-06-15] - {:ok, cycles} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, cycles, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # With exit_date in 2023, should only generate 2022 and 2023 cycles cycle_years = Enum.map(cycles, & &1.cycle_start.year) |> Enum.sort() @@ -168,10 +168,10 @@ defmodule Mv.MembershipFees.CycleGeneratorTest do today = ~D[2024-06-15] # First generation - {:ok, _first_cycles} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, _first_cycles, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # Second generation (should be idempotent) - {:ok, second_cycles} = CycleGenerator.generate_cycles_for_member(member.id, today: today) + {:ok, second_cycles, _} = CycleGenerator.generate_cycles_for_member(member.id, today: today) # Second call should return empty list (no new cycles) assert second_cycles == [] @@ -411,12 +411,12 @@ defmodule Mv.MembershipFees.CycleGeneratorTest do result2 = Task.await(task2) # Both should succeed - assert match?({:ok, _}, result1) - assert match?({:ok, _}, result2) + assert match?({:ok, _, _}, result1) + assert match?({:ok, _, _}, result2) # One should have created cycles, the other should have empty list (idempotent) - {:ok, cycles1} = result1 - {:ok, cycles2} = result2 + {:ok, cycles1, _} = result1 + {:ok, cycles2, _} = result2 # Combined should not have duplicates all_cycles = cycles1 ++ cycles2