perf(import): reuse auto-created groups across import chunks
This commit is contained in:
parent
68a1a9530a
commit
118b9f8d57
5 changed files with 90 additions and 3 deletions
|
|
@ -97,6 +97,20 @@ defmodule Mv.Membership.Import.ImportRunner do
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Carries the in-memory group snapshot grown by a chunk back into `import_state`
|
||||||
|
so the next chunk reuses groups created earlier instead of re-reading the
|
||||||
|
Group table. When the chunk result omits `groups_found`, the state is returned
|
||||||
|
unchanged.
|
||||||
|
"""
|
||||||
|
@spec carry_groups_forward(map(), map()) :: map()
|
||||||
|
def carry_groups_forward(import_state, chunk_result) do
|
||||||
|
case Map.fetch(chunk_result, :groups_found) do
|
||||||
|
{:ok, groups_found} -> Map.put(import_state, :groups_found, groups_found)
|
||||||
|
:error -> import_state
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Returns the next action after processing a chunk: send the next chunk index or done.
|
Returns the next action after processing a chunk: send the next chunk index or done.
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,9 @@ defmodule Mv.Membership.Import.MemberCSV do
|
||||||
- `inserted` - Number of successfully created members
|
- `inserted` - Number of successfully created members
|
||||||
- `failed` - Number of failed member creations
|
- `failed` - Number of failed member creations
|
||||||
- `errors` - List of `%MemberCSV.Error{}` structs (capped at 50 per import)
|
- `errors` - List of `%MemberCSV.Error{}` structs (capped at 50 per import)
|
||||||
|
- `groups_found` - The in-memory group snapshot grown while processing this
|
||||||
|
chunk; thread it into the next chunk's `:groups_found` opt so groups created
|
||||||
|
in an earlier chunk are reused without re-reading the Group table
|
||||||
|
|
||||||
## Examples
|
## Examples
|
||||||
|
|
||||||
|
|
@ -94,7 +97,8 @@ defmodule Mv.Membership.Import.MemberCSV do
|
||||||
failed: non_neg_integer(),
|
failed: non_neg_integer(),
|
||||||
errors: list(Error.t()),
|
errors: list(Error.t()),
|
||||||
errors_truncated?: boolean(),
|
errors_truncated?: boolean(),
|
||||||
warnings: list(String.t())
|
warnings: list(String.t()),
|
||||||
|
groups_found: list(Mv.Membership.Group.t() | %{id: String.t(), name: String.t()})
|
||||||
}
|
}
|
||||||
|
|
||||||
alias Mv.Membership.Import.ColumnResolver
|
alias Mv.Membership.Import.ColumnResolver
|
||||||
|
|
@ -374,7 +378,7 @@ defmodule Mv.Membership.Import.MemberCSV do
|
||||||
actor: actor
|
actor: actor
|
||||||
}
|
}
|
||||||
|
|
||||||
{inserted, failed, errors, _collected_error_count, truncated?, warnings, _groups_acc} =
|
{inserted, failed, errors, _collected_error_count, truncated?, warnings, groups_acc} =
|
||||||
Enum.reduce(chunk_rows_with_lines, {0, 0, [], 0, false, [], groups_found}, fn {line_number,
|
Enum.reduce(chunk_rows_with_lines, {0, 0, [], 0, false, [], groups_found}, fn {line_number,
|
||||||
row_map},
|
row_map},
|
||||||
{acc_inserted,
|
{acc_inserted,
|
||||||
|
|
@ -417,7 +421,8 @@ defmodule Mv.Membership.Import.MemberCSV do
|
||||||
failed: failed,
|
failed: failed,
|
||||||
errors: Enum.reverse(errors),
|
errors: Enum.reverse(errors),
|
||||||
errors_truncated?: truncated?,
|
errors_truncated?: truncated?,
|
||||||
warnings: warnings
|
warnings: warnings,
|
||||||
|
groups_found: groups_acc
|
||||||
}}
|
}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -367,8 +367,11 @@ defmodule MvWeb.ImportLive do
|
||||||
new_progress =
|
new_progress =
|
||||||
ImportRunner.merge_progress(progress, chunk_result, idx, max_errors: @max_errors)
|
ImportRunner.merge_progress(progress, chunk_result, idx, max_errors: @max_errors)
|
||||||
|
|
||||||
|
new_import_state = ImportRunner.carry_groups_forward(import_state, chunk_result)
|
||||||
|
|
||||||
socket =
|
socket =
|
||||||
socket
|
socket
|
||||||
|
|> assign(:import_state, new_import_state)
|
||||||
|> assign(:import_progress, new_progress)
|
|> assign(:import_progress, new_progress)
|
||||||
|> assign(:import_status, new_progress.status)
|
|> assign(:import_status, new_progress.status)
|
||||||
|> maybe_send_next_chunk(idx, length(import_state.chunks))
|
|> maybe_send_next_chunk(idx, length(import_state.chunks))
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,24 @@ defmodule Mv.Membership.Import.ImportRunnerTest do
|
||||||
|
|
||||||
alias Mv.Membership.Import.ImportRunner
|
alias Mv.Membership.Import.ImportRunner
|
||||||
|
|
||||||
|
describe "carry_groups_forward/2" do
|
||||||
|
test "replaces import_state groups_found with the chunk's grown snapshot" do
|
||||||
|
import_state = %{groups_found: [%{id: "1", name: "A"}]}
|
||||||
|
chunk_result = %{groups_found: [%{id: "1", name: "A"}, %{id: "2", name: "B"}]}
|
||||||
|
|
||||||
|
assert ImportRunner.carry_groups_forward(import_state, chunk_result) == %{
|
||||||
|
groups_found: [%{id: "1", name: "A"}, %{id: "2", name: "B"}]
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "leaves import_state unchanged when the chunk result omits groups_found" do
|
||||||
|
import_state = %{groups_found: [%{id: "1", name: "A"}], other: :kept}
|
||||||
|
chunk_result = %{inserted: 1}
|
||||||
|
|
||||||
|
assert ImportRunner.carry_groups_forward(import_state, chunk_result) == import_state
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "read_file_entry/2" do
|
describe "read_file_entry/2" do
|
||||||
test "returns {:ok, content} for a readable file" do
|
test "returns {:ok, content} for a readable file" do
|
||||||
path =
|
path =
|
||||||
|
|
|
||||||
|
|
@ -1116,6 +1116,53 @@ defmodule Mv.Membership.Import.MemberCSVTest do
|
||||||
"Expected the group table read at most a few times, got #{reads} reads for 10 rows (N+1)."
|
"Expected the group table read at most a few times, got #{reads} reads for 10 rows (N+1)."
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "returns the grown group snapshot so later chunks skip the table read",
|
||||||
|
%{actor: actor} do
|
||||||
|
chunk1 = [
|
||||||
|
{2, %{member: %{email: "g-xchunk-1@example.com"}, custom: %{}, groups: "Shared X"}}
|
||||||
|
]
|
||||||
|
|
||||||
|
chunk2 = [
|
||||||
|
{3, %{member: %{email: "g-xchunk-2@example.com"}, custom: %{}, groups: "Shared X"}}
|
||||||
|
]
|
||||||
|
|
||||||
|
assert {:ok, result1} =
|
||||||
|
MemberCSV.process_chunk(chunk1, %{email: 0}, %{}, actor: actor, groups_found: [])
|
||||||
|
|
||||||
|
# The chunk result must expose the accumulated snapshot, including the group
|
||||||
|
# auto-created while processing this chunk, so the LiveView can thread it
|
||||||
|
# into the next chunk's opts.
|
||||||
|
assert is_list(result1.groups_found)
|
||||||
|
assert Enum.any?(result1.groups_found, &(&1.name == "Shared X"))
|
||||||
|
|
||||||
|
group_read_count = Agent.start_link(fn -> 0 end) |> elem(1)
|
||||||
|
test_pid = self()
|
||||||
|
|
||||||
|
handler = fn _event, _measurements, metadata, _config ->
|
||||||
|
if self() == test_pid and metadata[:source] == "groups" and
|
||||||
|
is_binary(metadata[:query]) and String.starts_with?(metadata.query, "SELECT") do
|
||||||
|
Agent.update(group_read_count, &(&1 + 1))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
handler_id = "test-xchunk-group-read-#{System.unique_integer([:positive])}"
|
||||||
|
:telemetry.attach(handler_id, [:mv, :repo, :query], handler, nil)
|
||||||
|
|
||||||
|
assert {:ok, %{inserted: 1}} =
|
||||||
|
MemberCSV.process_chunk(chunk2, %{email: 0}, %{},
|
||||||
|
actor: actor,
|
||||||
|
groups_found: result1.groups_found
|
||||||
|
)
|
||||||
|
|
||||||
|
reads = Agent.get(group_read_count, & &1)
|
||||||
|
:telemetry.detach(handler_id)
|
||||||
|
|
||||||
|
# The second chunk receives the snapshot grown by the first, so the shared
|
||||||
|
# group resolves from memory without any full-table read.
|
||||||
|
assert reads == 0,
|
||||||
|
"Expected no group table read in the second chunk, got #{reads} (snapshot not threaded across chunks)."
|
||||||
|
end
|
||||||
|
|
||||||
test "empty groups cell leaves the member without group assignment", %{actor: actor} do
|
test "empty groups cell leaves the member without group assignment", %{actor: actor} do
|
||||||
chunk = [{2, %{member: %{email: "g-empty@example.com"}, custom: %{}, groups: " "}}]
|
chunk = [{2, %{member: %{email: "g-empty@example.com"}, custom: %{}, groups: " "}}]
|
||||||
opts = [actor: actor, groups_found: []]
|
opts = [actor: actor, groups_found: []]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue