refactor: implement proper notification handling via after_action hooks
Refactor notification handling according to Ash best practices
This commit is contained in:
parent
f7c33bfc7d
commit
4997493139
5 changed files with 106 additions and 164 deletions
|
|
@ -47,7 +47,8 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
|||
require Ash.Query
|
||||
require Logger
|
||||
|
||||
@type generate_result :: {:ok, [MembershipFeeCycle.t()]} | {:error, term()}
|
||||
@type generate_result ::
|
||||
{:ok, [MembershipFeeCycle.t()], [Ash.Notifier.Notification.t()]} | {:error, term()}
|
||||
|
||||
@doc """
|
||||
Generates membership fee cycles for a single member.
|
||||
|
|
@ -62,14 +63,14 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
|||
|
||||
## Returns
|
||||
|
||||
- `{:ok, cycles}` - List of newly created cycles
|
||||
- `{:ok, cycles, notifications}` - List of newly created cycles and notifications
|
||||
- `{:error, reason}` - Error with reason
|
||||
|
||||
## Examples
|
||||
|
||||
{:ok, cycles} = CycleGenerator.generate_cycles_for_member(member)
|
||||
{:ok, cycles} = CycleGenerator.generate_cycles_for_member(member_id)
|
||||
{:ok, cycles} = CycleGenerator.generate_cycles_for_member(member, today: ~D[2024-12-31])
|
||||
{:ok, cycles, notifications} = CycleGenerator.generate_cycles_for_member(member)
|
||||
{:ok, cycles, notifications} = CycleGenerator.generate_cycles_for_member(member_id)
|
||||
{:ok, cycles, notifications} = CycleGenerator.generate_cycles_for_member(member, today: ~D[2024-12-31])
|
||||
|
||||
"""
|
||||
@spec generate_cycles_for_member(Member.t() | String.t(), keyword()) :: generate_result()
|
||||
|
|
@ -86,17 +87,37 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
|||
today = Keyword.get(opts, :today, Date.utc_today())
|
||||
skip_lock? = Keyword.get(opts, :skip_lock?, false)
|
||||
|
||||
if skip_lock? do
|
||||
# Lock already set by caller (e.g., regenerate_cycles_on_type_change)
|
||||
# Just generate cycles without additional locking
|
||||
# When in transaction, notifications are returned and must be sent after commit
|
||||
do_generate_cycles(member, today)
|
||||
else
|
||||
# Use advisory lock to prevent concurrent generation
|
||||
# Notifications are handled inside with_advisory_lock after transaction commits
|
||||
with_advisory_lock(member.id, fn ->
|
||||
do_generate_cycles(member, today)
|
||||
end)
|
||||
do_generate_cycles_with_lock(member, today, skip_lock?)
|
||||
end
|
||||
|
||||
# Generate cycles with lock handling
|
||||
# Returns {:ok, cycles, notifications} - notifications are never sent here,
|
||||
# they should be returned to the caller (e.g., via after_action hook)
|
||||
defp do_generate_cycles_with_lock(member, today, true = _skip_lock?) do
|
||||
# Lock already set by caller (e.g., regenerate_cycles_on_type_change)
|
||||
# Just generate cycles without additional locking
|
||||
do_generate_cycles(member, today)
|
||||
end
|
||||
|
||||
defp do_generate_cycles_with_lock(member, today, false) do
|
||||
lock_key = :erlang.phash2(member.id)
|
||||
|
||||
Repo.transaction(fn ->
|
||||
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
||||
|
||||
case do_generate_cycles(member, today) do
|
||||
{:ok, cycles, notifications} ->
|
||||
# Return cycles and notifications - do NOT send notifications here
|
||||
# They will be sent by the caller (e.g., via after_action hook)
|
||||
{cycles, notifications}
|
||||
|
||||
{:error, reason} ->
|
||||
Repo.rollback(reason)
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
{:ok, {cycles, notifications}} -> {:ok, cycles, notifications}
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -192,79 +213,6 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
|||
end
|
||||
end
|
||||
|
||||
defp with_advisory_lock(member_id, fun) do
|
||||
# Convert UUID to integer for advisory lock (use hash)
|
||||
lock_key = :erlang.phash2(member_id)
|
||||
|
||||
# Check if we're already in a transaction (e.g., called from Ash action)
|
||||
if Repo.in_transaction?() do
|
||||
with_advisory_lock_in_transaction(lock_key, fun)
|
||||
else
|
||||
with_advisory_lock_new_transaction(lock_key, fun)
|
||||
end
|
||||
end
|
||||
|
||||
# Already in transaction: use advisory lock directly without starting new transaction
|
||||
# This prevents nested transactions which can cause deadlocks
|
||||
# Returns {:ok, cycles, notifications} where notifications should be sent after commit
|
||||
defp with_advisory_lock_in_transaction(lock_key, fun) do
|
||||
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
||||
normalize_fun_result(fun.())
|
||||
end
|
||||
|
||||
# Not in transaction: start new transaction with advisory lock
|
||||
defp with_advisory_lock_new_transaction(lock_key, fun) do
|
||||
result =
|
||||
Repo.transaction(fn ->
|
||||
# Acquire advisory lock for this transaction
|
||||
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
||||
execute_within_transaction(fun)
|
||||
end)
|
||||
|
||||
handle_transaction_result(result)
|
||||
end
|
||||
|
||||
# Execute function within transaction and return normalized result
|
||||
# execute_within_transaction is always called within a Repo.transaction
|
||||
# create_cycles returns {:ok, cycles, notifications} when in transaction
|
||||
defp execute_within_transaction(fun) do
|
||||
case fun.() do
|
||||
{:ok, result, notifications} when is_list(notifications) ->
|
||||
# Return result and notifications separately
|
||||
{result, notifications}
|
||||
|
||||
{:ok, result} ->
|
||||
# Fallback case: no notifications returned
|
||||
{result, []}
|
||||
|
||||
{:error, reason} ->
|
||||
Repo.rollback(reason)
|
||||
end
|
||||
end
|
||||
|
||||
# Normalize function result to consistent format
|
||||
# normalize_fun_result is called when already in a transaction (skip_lock? case)
|
||||
# create_cycles returns {:ok, cycles, notifications} when in transaction
|
||||
defp normalize_fun_result({:ok, result, notifications}) when is_list(notifications) do
|
||||
# Notifications will be sent after outer transaction commits
|
||||
# Return in same format as non-transaction case for consistency
|
||||
{:ok, result, notifications}
|
||||
end
|
||||
|
||||
defp normalize_fun_result({:ok, result}), do: {:ok, result, []}
|
||||
defp normalize_fun_result({:error, reason}), do: {:error, reason}
|
||||
|
||||
# Handle transaction result and send notifications if needed
|
||||
defp handle_transaction_result({:ok, {cycles, notifications}}) do
|
||||
if Enum.any?(notifications) do
|
||||
Ash.Notifier.notify(notifications)
|
||||
end
|
||||
|
||||
{:ok, cycles}
|
||||
end
|
||||
|
||||
defp handle_transaction_result({:error, reason}), do: {:error, reason}
|
||||
|
||||
defp do_generate_cycles(member, today) do
|
||||
# Reload member with relationships to ensure fresh data
|
||||
case load_member(member.id) do
|
||||
|
|
@ -397,11 +345,9 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
|||
end
|
||||
|
||||
defp create_cycles(cycle_starts, member_id, fee_type_id, amount) do
|
||||
# Always return notifications when in a transaction (required by Ash)
|
||||
# When not in transaction, Ash handles notifications automatically
|
||||
# When in transaction, we must return notifications and send them after commit
|
||||
return_notifications? = Repo.in_transaction?()
|
||||
|
||||
# Always use return_notifications?: true to collect notifications
|
||||
# Notifications will be returned to the caller, who is responsible for
|
||||
# sending them (e.g., via after_action hook returning {:ok, result, notifications})
|
||||
results =
|
||||
Enum.map(cycle_starts, fn cycle_start ->
|
||||
attrs = %{
|
||||
|
|
@ -412,7 +358,7 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
|||
status: :unpaid
|
||||
}
|
||||
|
||||
case Ash.create(MembershipFeeCycle, attrs, return_notifications?: return_notifications?) do
|
||||
case Ash.create(MembershipFeeCycle, attrs, return_notifications?: true) do
|
||||
{:ok, cycle, notifications} when is_list(notifications) ->
|
||||
{:ok, cycle, notifications}
|
||||
|
||||
|
|
@ -431,14 +377,7 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
|||
|
||||
if Enum.empty?(errors) do
|
||||
successful_cycles = Enum.map(successes, fn {:ok, cycle, _notifications} -> cycle end)
|
||||
|
||||
if return_notifications? do
|
||||
# Return cycles and notifications to be sent after transaction commits
|
||||
{:ok, successful_cycles, all_notifications}
|
||||
else
|
||||
# Not in transaction: Ash handles notifications automatically
|
||||
{:ok, successful_cycles}
|
||||
end
|
||||
{:ok, successful_cycles, all_notifications}
|
||||
else
|
||||
Logger.warning("Some cycles failed to create: #{inspect(errors)}")
|
||||
# Return partial failure with errors
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue