From e6ac5d1ab15c9dcbf024bfefb3ba8d2af075b53f Mon Sep 17 00:00:00 2001 From: Moritz Date: Fri, 12 Dec 2025 16:53:57 +0100 Subject: [PATCH] fix: handle Ash notifications in CycleGenerator transactions - Use return_notifications?: true when creating cycles within transaction - Collect notifications and send them after transaction commits - Prevents 'Missed notifications' warnings in test output - Notifications are now properly sent via Ash.Notifier.notify/1 --- lib/mv/membership_fees/cycle_generator.ex | 56 +++++++++++++++++------ 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/lib/mv/membership_fees/cycle_generator.ex b/lib/mv/membership_fees/cycle_generator.ex index b80d3c8..2162b9e 100644 --- a/lib/mv/membership_fees/cycle_generator.ex +++ b/lib/mv/membership_fees/cycle_generator.ex @@ -86,6 +86,7 @@ defmodule Mv.MembershipFees.CycleGenerator do today = Keyword.get(opts, :today, Date.utc_today()) # Use advisory lock to prevent concurrent generation + # Notifications are handled inside with_advisory_lock after transaction commits with_advisory_lock(member.id, fn -> do_generate_cycles(member, today) end) @@ -187,15 +188,37 @@ defmodule Mv.MembershipFees.CycleGenerator do # Convert UUID to integer for advisory lock (use hash) lock_key = :erlang.phash2(member_id) - Repo.transaction(fn -> - # Acquire advisory lock for this transaction - Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) + result = + Repo.transaction(fn -> + # Acquire advisory lock for this transaction + Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key]) - case fun.() do - {:ok, result} -> result - {:error, reason} -> Repo.rollback(reason) - end - end) + case fun.() do + {:ok, result, notifications} when is_list(notifications) -> + # Return result and notifications separately + {result, notifications} + + {:ok, result} -> + # Handle case where no notifications were returned (backward compatibility) + {result, []} + + {:error, reason} -> + Repo.rollback(reason) + end + end) + + # Extract result and notifications, send notifications after transaction + case result do + {:ok, {cycles, notifications}} -> + if Enum.any?(notifications) do + Ash.Notifier.notify(notifications) + end + + {:ok, cycles} + + {:error, reason} -> + {:error, reason} + end end defp do_generate_cycles(member, today) do @@ -236,7 +259,7 @@ defmodule Mv.MembershipFees.CycleGenerator do cycle_starts = generate_cycle_starts(start_date, end_date, interval) create_cycles(cycle_starts, member.id, fee_type.id, amount) else - {:ok, []} + {:ok, [], []} end end @@ -340,17 +363,22 @@ defmodule Mv.MembershipFees.CycleGenerator do status: :unpaid } - case Ash.create(MembershipFeeCycle, attrs) do - {:ok, cycle} -> {:ok, cycle} + # Return notifications to avoid warnings when creating within a transaction + case Ash.create(MembershipFeeCycle, attrs, return_notifications?: true) do + {:ok, cycle, notifications} -> {:ok, cycle, notifications} {:error, reason} -> {:error, {cycle_start, reason}} end end) - {successes, errors} = Enum.split_with(results, &match?({:ok, _}, &1)) - successful_cycles = Enum.map(successes, fn {:ok, cycle} -> cycle end) + {successes, errors} = Enum.split_with(results, &match?({:ok, _, _}, &1)) + successful_cycles = Enum.map(successes, fn {:ok, cycle, _notifications} -> cycle end) + + all_notifications = + Enum.flat_map(successes, fn {:ok, _cycle, notifications} -> notifications end) if Enum.empty?(errors) do - {:ok, successful_cycles} + # Return cycles and notifications to be sent after transaction commits + {:ok, successful_cycles, all_notifications} else Logger.warning("Some cycles failed to create: #{inspect(errors)}") # Return partial failure with both successful and failed cycles