fix: resolve notification handling and maintain after_action for cycle regeneration
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
65d1561803
commit
bb5851bc23
3 changed files with 108 additions and 57 deletions
|
|
@ -114,6 +114,10 @@ defmodule Mv.Membership.Member do
|
||||||
if member.membership_fee_type_id && member.join_date do
|
if member.membership_fee_type_id && member.join_date do
|
||||||
generate_fn = fn ->
|
generate_fn = fn ->
|
||||||
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do
|
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member.id) do
|
||||||
|
{:ok, _cycles, _notifications} ->
|
||||||
|
# Notifications are sent automatically by CycleGenerator
|
||||||
|
:ok
|
||||||
|
|
||||||
{:ok, _cycles} ->
|
{:ok, _cycles} ->
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
|
|
@ -193,14 +197,21 @@ defmodule Mv.Membership.Member do
|
||||||
# This deletes future unpaid cycles and regenerates them with the new type/amount
|
# This deletes future unpaid cycles and regenerates them with the new type/amount
|
||||||
# Note: Cycle regeneration runs synchronously in the same transaction to ensure atomicity
|
# Note: Cycle regeneration runs synchronously in the same transaction to ensure atomicity
|
||||||
# CycleGenerator uses advisory locks and transactions internally to prevent race conditions
|
# CycleGenerator uses advisory locks and transactions internally to prevent race conditions
|
||||||
|
# Notifications are collected and sent after transaction commits
|
||||||
change after_action(fn changeset, member, _context ->
|
change after_action(fn changeset, member, _context ->
|
||||||
fee_type_changed =
|
fee_type_changed =
|
||||||
Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id)
|
Ash.Changeset.changing_attribute?(changeset, :membership_fee_type_id)
|
||||||
|
|
||||||
if fee_type_changed && member.membership_fee_type_id && member.join_date do
|
if fee_type_changed && member.membership_fee_type_id && member.join_date do
|
||||||
case regenerate_cycles_on_type_change(member) do
|
case regenerate_cycles_on_type_change(member) do
|
||||||
:ok ->
|
{:ok, notifications} ->
|
||||||
:ok
|
# Store notifications to be sent after transaction commits
|
||||||
|
# They will be sent by Ash automatically after commit
|
||||||
|
if Enum.any?(notifications) do
|
||||||
|
# Note: We cannot send notifications here as we're still in transaction
|
||||||
|
# Store them in the changeset context to be sent after commit
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
require Logger
|
require Logger
|
||||||
|
|
@ -704,15 +715,16 @@ defmodule Mv.Membership.Member do
|
||||||
# Regenerates cycles when membership fee type changes
|
# Regenerates cycles when membership fee type changes
|
||||||
# Deletes future unpaid cycles and regenerates them with the new type/amount
|
# Deletes future unpaid cycles and regenerates them with the new type/amount
|
||||||
# Uses advisory lock to prevent concurrent modifications
|
# Uses advisory lock to prevent concurrent modifications
|
||||||
defp regenerate_cycles_on_type_change(member) do
|
# Returns {:ok, notifications} or {:error, reason} where notifications are collected
|
||||||
alias Mv.Repo
|
# to be sent after transaction commits
|
||||||
|
@doc false
|
||||||
|
def regenerate_cycles_on_type_change(member) do
|
||||||
today = Date.utc_today()
|
today = Date.utc_today()
|
||||||
lock_key = :erlang.phash2(member.id)
|
lock_key = :erlang.phash2(member.id)
|
||||||
|
|
||||||
# Use advisory lock to prevent concurrent deletion and regeneration
|
# Use advisory lock to prevent concurrent deletion and regeneration
|
||||||
# This ensures atomicity when multiple updates happen simultaneously
|
# This ensures atomicity when multiple updates happen simultaneously
|
||||||
if Repo.in_transaction?() do
|
if Mv.Repo.in_transaction?() do
|
||||||
regenerate_cycles_in_transaction(member, today, lock_key)
|
regenerate_cycles_in_transaction(member, today, lock_key)
|
||||||
else
|
else
|
||||||
regenerate_cycles_new_transaction(member, today, lock_key)
|
regenerate_cycles_new_transaction(member, today, lock_key)
|
||||||
|
|
@ -720,36 +732,48 @@ defmodule Mv.Membership.Member do
|
||||||
end
|
end
|
||||||
|
|
||||||
# Already in transaction: use advisory lock directly
|
# Already in transaction: use advisory lock directly
|
||||||
|
# Returns {:ok, notifications} where notifications should be sent after commit
|
||||||
defp regenerate_cycles_in_transaction(member, today, lock_key) do
|
defp regenerate_cycles_in_transaction(member, today, lock_key) do
|
||||||
alias Mv.Repo
|
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
||||||
|
do_regenerate_cycles_on_type_change(member, today, skip_lock?: true)
|
||||||
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
|
||||||
do_regenerate_cycles_on_type_change(member, today)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Not in transaction: start new transaction with advisory lock
|
# Not in transaction: start new transaction with advisory lock
|
||||||
defp regenerate_cycles_new_transaction(member, today, lock_key) do
|
defp regenerate_cycles_new_transaction(member, today, lock_key) do
|
||||||
alias Mv.Repo
|
Mv.Repo.transaction(fn ->
|
||||||
|
Ecto.Adapters.SQL.query!(Mv.Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
||||||
|
|
||||||
Repo.transaction(fn ->
|
case do_regenerate_cycles_on_type_change(member, today, skip_lock?: true) do
|
||||||
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
{:ok, notifications} ->
|
||||||
|
# Store notifications to send after commit
|
||||||
|
notifications
|
||||||
|
|
||||||
case do_regenerate_cycles_on_type_change(member, today) do
|
{:error, reason} ->
|
||||||
:ok -> :ok
|
Mv.Repo.rollback(reason)
|
||||||
{:error, reason} -> Repo.rollback(reason)
|
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|> handle_transaction_result()
|
|> handle_transaction_result_with_notifications()
|
||||||
end
|
end
|
||||||
|
|
||||||
# Handle transaction result
|
# Handle transaction result with notifications
|
||||||
defp handle_transaction_result({:ok, result}), do: result
|
defp handle_transaction_result_with_notifications({:ok, notifications}) do
|
||||||
defp handle_transaction_result({:error, reason}), do: {:error, reason}
|
if Enum.any?(notifications) do
|
||||||
|
Ash.Notifier.notify(notifications)
|
||||||
|
end
|
||||||
|
|
||||||
|
{:ok, []}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_transaction_result_with_notifications({:error, reason}), do: {:error, reason}
|
||||||
|
|
||||||
# Performs the actual cycle deletion and regeneration
|
# Performs the actual cycle deletion and regeneration
|
||||||
defp do_regenerate_cycles_on_type_change(member, today) do
|
# Returns {:ok, notifications} or {:error, reason}
|
||||||
|
# notifications are collected to be sent after transaction commits
|
||||||
|
defp do_regenerate_cycles_on_type_change(member, today, opts) do
|
||||||
require Ash.Query
|
require Ash.Query
|
||||||
|
|
||||||
|
skip_lock? = Keyword.get(opts, :skip_lock?, false)
|
||||||
|
|
||||||
# Find all unpaid cycles for this member
|
# Find all unpaid cycles for this member
|
||||||
# We need to check cycle_end for each cycle using its own interval
|
# We need to check cycle_end for each cycle using its own interval
|
||||||
all_unpaid_cycles_query =
|
all_unpaid_cycles_query =
|
||||||
|
|
@ -761,7 +785,7 @@ defmodule Mv.Membership.Member do
|
||||||
case Ash.read(all_unpaid_cycles_query) do
|
case Ash.read(all_unpaid_cycles_query) do
|
||||||
{:ok, all_unpaid_cycles} ->
|
{:ok, all_unpaid_cycles} ->
|
||||||
cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today)
|
cycles_to_delete = filter_future_cycles(all_unpaid_cycles, today)
|
||||||
delete_and_regenerate_cycles(cycles_to_delete, member.id, today)
|
delete_and_regenerate_cycles(cycles_to_delete, member.id, today, skip_lock?: skip_lock?)
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
{:error, reason}
|
{:error, reason}
|
||||||
|
|
@ -787,13 +811,16 @@ defmodule Mv.Membership.Member do
|
||||||
|
|
||||||
# Deletes future cycles and regenerates them with the new type/amount
|
# Deletes future cycles and regenerates them with the new type/amount
|
||||||
# Passes today to ensure consistent date across deletion and regeneration
|
# Passes today to ensure consistent date across deletion and regeneration
|
||||||
defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today) do
|
# Returns {:ok, notifications} or {:error, reason}
|
||||||
|
defp delete_and_regenerate_cycles(cycles_to_delete, member_id, today, opts) do
|
||||||
|
skip_lock? = Keyword.get(opts, :skip_lock?, false)
|
||||||
|
|
||||||
if Enum.empty?(cycles_to_delete) do
|
if Enum.empty?(cycles_to_delete) do
|
||||||
# No cycles to delete, just regenerate
|
# No cycles to delete, just regenerate
|
||||||
regenerate_cycles(member_id, today)
|
regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
|
||||||
else
|
else
|
||||||
case delete_cycles(cycles_to_delete) do
|
case delete_cycles(cycles_to_delete) do
|
||||||
:ok -> regenerate_cycles(member_id, today)
|
:ok -> regenerate_cycles(member_id, today, skip_lock?: skip_lock?)
|
||||||
{:error, reason} -> {:error, reason}
|
{:error, reason} -> {:error, reason}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
@ -814,12 +841,27 @@ defmodule Mv.Membership.Member do
|
||||||
end
|
end
|
||||||
|
|
||||||
# Regenerates cycles with new type/amount
|
# Regenerates cycles with new type/amount
|
||||||
# CycleGenerator detects if already in transaction and uses advisory lock accordingly
|
|
||||||
# Passes today to ensure consistent date across deletion and regeneration
|
# Passes today to ensure consistent date across deletion and regeneration
|
||||||
defp regenerate_cycles(member_id, today) do
|
# skip_lock?: true means advisory lock is already set by caller
|
||||||
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(member_id, today: today) do
|
# Returns {:ok, notifications} where notifications should be sent after commit
|
||||||
{:ok, _cycles} -> :ok
|
defp regenerate_cycles(member_id, today, opts) do
|
||||||
{:error, reason} -> {:error, reason}
|
skip_lock? = Keyword.get(opts, :skip_lock?, false)
|
||||||
|
|
||||||
|
case Mv.MembershipFees.CycleGenerator.generate_cycles_for_member(
|
||||||
|
member_id,
|
||||||
|
today: today,
|
||||||
|
skip_lock?: skip_lock?
|
||||||
|
) do
|
||||||
|
{:ok, _cycles, notifications} when is_list(notifications) ->
|
||||||
|
# When skip_lock? is true and in transaction, notifications are returned
|
||||||
|
{:ok, notifications}
|
||||||
|
|
||||||
|
{:ok, _cycles} ->
|
||||||
|
# When not in transaction or notifications handled automatically
|
||||||
|
{:ok, []}
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
{:error, reason}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -38,10 +38,10 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
alias Mv.MembershipFees.CalendarCycles
|
|
||||||
alias Mv.MembershipFees.MembershipFeeCycle
|
|
||||||
alias Mv.MembershipFees.Changes.SetMembershipFeeStartDate
|
|
||||||
alias Mv.Membership.Member
|
alias Mv.Membership.Member
|
||||||
|
alias Mv.MembershipFees.CalendarCycles
|
||||||
|
alias Mv.MembershipFees.Changes.SetMembershipFeeStartDate
|
||||||
|
alias Mv.MembershipFees.MembershipFeeCycle
|
||||||
alias Mv.Repo
|
alias Mv.Repo
|
||||||
|
|
||||||
require Ash.Query
|
require Ash.Query
|
||||||
|
|
@ -84,12 +84,20 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
||||||
|
|
||||||
def generate_cycles_for_member(%Member{} = member, opts) do
|
def generate_cycles_for_member(%Member{} = member, opts) do
|
||||||
today = Keyword.get(opts, :today, Date.utc_today())
|
today = Keyword.get(opts, :today, Date.utc_today())
|
||||||
|
skip_lock? = Keyword.get(opts, :skip_lock?, false)
|
||||||
|
|
||||||
# Use advisory lock to prevent concurrent generation
|
if skip_lock? do
|
||||||
# Notifications are handled inside with_advisory_lock after transaction commits
|
# Lock already set by caller (e.g., regenerate_cycles_on_type_change)
|
||||||
with_advisory_lock(member.id, fn ->
|
# Just generate cycles without additional locking
|
||||||
|
# When in transaction, notifications are returned and must be sent after commit
|
||||||
do_generate_cycles(member, today)
|
do_generate_cycles(member, today)
|
||||||
end)
|
else
|
||||||
|
# Use advisory lock to prevent concurrent generation
|
||||||
|
# Notifications are handled inside with_advisory_lock after transaction commits
|
||||||
|
with_advisory_lock(member.id, fn ->
|
||||||
|
do_generate_cycles(member, today)
|
||||||
|
end)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
|
@ -198,6 +206,7 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
||||||
|
|
||||||
# Already in transaction: use advisory lock directly without starting new transaction
|
# Already in transaction: use advisory lock directly without starting new transaction
|
||||||
# This prevents nested transactions which can cause deadlocks
|
# This prevents nested transactions which can cause deadlocks
|
||||||
|
# Returns {:ok, cycles, notifications} where notifications should be sent after commit
|
||||||
defp with_advisory_lock_in_transaction(lock_key, fun) do
|
defp with_advisory_lock_in_transaction(lock_key, fun) do
|
||||||
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
Ecto.Adapters.SQL.query!(Repo, "SELECT pg_advisory_xact_lock($1)", [lock_key])
|
||||||
normalize_fun_result(fun.())
|
normalize_fun_result(fun.())
|
||||||
|
|
@ -216,16 +225,16 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
||||||
end
|
end
|
||||||
|
|
||||||
# Execute function within transaction and return normalized result
|
# Execute function within transaction and return normalized result
|
||||||
# When in transaction, create_cycles returns {:ok, cycles, notifications}
|
# execute_within_transaction is always called within a Repo.transaction
|
||||||
# When not in transaction, create_cycles returns {:ok, cycles}
|
# create_cycles returns {:ok, cycles, notifications} when in transaction
|
||||||
defp execute_within_transaction(fun) do
|
defp execute_within_transaction(fun) do
|
||||||
case fun.() do
|
case fun.() do
|
||||||
{:ok, result, notifications} when is_list(notifications) ->
|
{:ok, result, notifications} when is_list(notifications) ->
|
||||||
# In transaction case: return result and notifications separately
|
# Return result and notifications separately
|
||||||
{result, notifications}
|
{result, notifications}
|
||||||
|
|
||||||
{:ok, result} ->
|
{:ok, result} ->
|
||||||
# Not in transaction case: notifications handled by Ash automatically
|
# Fallback case: no notifications returned
|
||||||
{result, []}
|
{result, []}
|
||||||
|
|
||||||
{:error, reason} ->
|
{:error, reason} ->
|
||||||
|
|
@ -234,15 +243,15 @@ defmodule Mv.MembershipFees.CycleGenerator do
|
||||||
end
|
end
|
||||||
|
|
||||||
# Normalize function result to consistent format
|
# Normalize function result to consistent format
|
||||||
# When in transaction, create_cycles returns {:ok, cycles, notifications}
|
# normalize_fun_result is called when already in a transaction (skip_lock? case)
|
||||||
# When not in transaction, create_cycles returns {:ok, cycles}
|
# create_cycles returns {:ok, cycles, notifications} when in transaction
|
||||||
defp normalize_fun_result({:ok, result, _notifications}) do
|
defp normalize_fun_result({:ok, result, notifications}) when is_list(notifications) do
|
||||||
# In transaction case: notifications will be sent after outer transaction commits
|
# Notifications will be sent after outer transaction commits
|
||||||
# Return in same format as non-transaction case for consistency
|
# Return in same format as non-transaction case for consistency
|
||||||
{:ok, result}
|
{:ok, result, notifications}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp normalize_fun_result({:ok, result}), do: {:ok, result}
|
defp normalize_fun_result({:ok, result}), do: {:ok, result, []}
|
||||||
defp normalize_fun_result({:error, reason}), do: {:error, reason}
|
defp normalize_fun_result({:error, reason}), do: {:error, reason}
|
||||||
|
|
||||||
# Handle transaction result and send notifications if needed
|
# Handle transaction result and send notifications if needed
|
||||||
|
|
|
||||||
|
|
@ -90,13 +90,13 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do
|
||||||
case MembershipFeeCycle
|
case MembershipFeeCycle
|
||||||
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^past_cycle_start)
|
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^past_cycle_start)
|
||||||
|> Ash.read_one() do
|
|> Ash.read_one() do
|
||||||
{:ok, existing_cycle} ->
|
{:ok, existing_cycle} when not is_nil(existing_cycle) ->
|
||||||
# Update to paid
|
# Update to paid
|
||||||
existing_cycle
|
existing_cycle
|
||||||
|> Ash.Changeset.for_update(:update, %{status: :paid})
|
|> Ash.Changeset.for_update(:update, %{status: :paid})
|
||||||
|> Ash.update!()
|
|> Ash.update!()
|
||||||
|
|
||||||
{:error, _} ->
|
_ ->
|
||||||
create_cycle(member, yearly_type1, %{
|
create_cycle(member, yearly_type1, %{
|
||||||
cycle_start: past_cycle_start,
|
cycle_start: past_cycle_start,
|
||||||
status: :paid,
|
status: :paid,
|
||||||
|
|
@ -109,10 +109,10 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do
|
||||||
case MembershipFeeCycle
|
case MembershipFeeCycle
|
||||||
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start)
|
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start)
|
||||||
|> Ash.read_one() do
|
|> Ash.read_one() do
|
||||||
{:ok, existing_cycle} ->
|
{:ok, existing_cycle} when not is_nil(existing_cycle) ->
|
||||||
Ash.destroy!(existing_cycle)
|
Ash.destroy!(existing_cycle)
|
||||||
|
|
||||||
{:error, _} ->
|
_ ->
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -297,10 +297,10 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do
|
||||||
case MembershipFeeCycle
|
case MembershipFeeCycle
|
||||||
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^past_cycle_start)
|
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^past_cycle_start)
|
||||||
|> Ash.read_one() do
|
|> Ash.read_one() do
|
||||||
{:ok, existing_cycle} ->
|
{:ok, existing_cycle} when not is_nil(existing_cycle) ->
|
||||||
Ash.destroy!(existing_cycle)
|
Ash.destroy!(existing_cycle)
|
||||||
|
|
||||||
{:error, _} ->
|
_ ->
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -316,10 +316,10 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do
|
||||||
case MembershipFeeCycle
|
case MembershipFeeCycle
|
||||||
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start)
|
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start)
|
||||||
|> Ash.read_one() do
|
|> Ash.read_one() do
|
||||||
{:ok, existing_cycle} ->
|
{:ok, existing_cycle} when not is_nil(existing_cycle) ->
|
||||||
Ash.destroy!(existing_cycle)
|
Ash.destroy!(existing_cycle)
|
||||||
|
|
||||||
{:error, _} ->
|
_ ->
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -409,7 +409,7 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do
|
||||||
case MembershipFeeCycle
|
case MembershipFeeCycle
|
||||||
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start)
|
|> Ash.Query.filter(member_id == ^member.id and cycle_start == ^current_cycle_start)
|
||||||
|> Ash.read_one() do
|
|> Ash.read_one() do
|
||||||
{:ok, existing_cycle} ->
|
{:ok, existing_cycle} when not is_nil(existing_cycle) ->
|
||||||
# Update to unpaid if it's not
|
# Update to unpaid if it's not
|
||||||
if existing_cycle.status != :unpaid do
|
if existing_cycle.status != :unpaid do
|
||||||
existing_cycle
|
existing_cycle
|
||||||
|
|
@ -417,7 +417,7 @@ defmodule Mv.Membership.MemberTypeChangeIntegrationTest do
|
||||||
|> Ash.update!()
|
|> Ash.update!()
|
||||||
end
|
end
|
||||||
|
|
||||||
{:error, _} ->
|
_ ->
|
||||||
# Create if it doesn't exist
|
# Create if it doesn't exist
|
||||||
create_cycle(member, yearly_type1, %{
|
create_cycle(member, yearly_type1, %{
|
||||||
cycle_start: current_cycle_start,
|
cycle_start: current_cycle_start,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue