170 lines
5.7 KiB
Elixir
170 lines
5.7 KiB
Elixir
defmodule Mv.Membership.Import.ImportRunner do
|
|
@moduledoc """
|
|
Orchestrates CSV member import: file reading, progress tracking, chunk processing,
|
|
and error formatting. Used by `MvWeb.ImportExportLive` to keep LiveView thin.
|
|
|
|
This module does not depend on Phoenix or LiveView. It provides pure functions for
|
|
progress/merge and side-effectful helpers (read_file_entry, process_chunk) that
|
|
are called from the LiveView or from tasks started by it.
|
|
"""
|
|
|
|
use Gettext, backend: MvWeb.Gettext
|
|
|
|
alias Mv.Membership.Import.MemberCSV
|
|
|
|
@default_max_errors 50
|
|
|
|
@doc """
|
|
Reads file content from a Phoenix LiveView upload entry (path).
|
|
|
|
Used as the callback for `consume_uploaded_entries/3`. Returns `{:ok, content}` or
|
|
`{:error, reason}` with a user-friendly string.
|
|
"""
|
|
@spec read_file_entry(map(), map()) :: {:ok, String.t()} | {:error, String.t()}
|
|
def read_file_entry(%{path: path}, _entry) do
|
|
case File.read(path) do
|
|
{:ok, content} ->
|
|
{:ok, content}
|
|
|
|
{:error, reason} when is_atom(reason) ->
|
|
{:error, :file.format_error(reason)}
|
|
|
|
{:error, %File.Error{reason: reason}} ->
|
|
{:error, :file.format_error(reason)}
|
|
|
|
{:error, reason} ->
|
|
{:error, Exception.message(reason)}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Normalizes the result of `consume_uploaded_entries/3` into `{:ok, content}` or `{:error, reason}`.
|
|
|
|
Handles both the standard `[{:ok, content}]` and test helpers that may return `[content]`.
|
|
"""
|
|
@spec parse_consume_result(list()) :: {:ok, String.t()} | {:error, String.t()}
|
|
def parse_consume_result(raw) do
|
|
case raw do
|
|
[{:ok, content}] when is_binary(content) -> {:ok, content}
|
|
[content] when is_binary(content) -> {:ok, content}
|
|
[{:error, reason}] -> {:error, gettext("Failed to read file: %{reason}", reason: reason)}
|
|
[] -> {:error, gettext("No file was uploaded")}
|
|
_other -> {:error, gettext("Failed to read uploaded file: unexpected format")}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Builds the initial progress map from a prepared import_state.
|
|
"""
|
|
@spec initial_progress(map(), keyword()) :: map()
|
|
def initial_progress(import_state, opts \\ []) do
|
|
_max_errors = Keyword.get(opts, :max_errors, @default_max_errors)
|
|
total = length(import_state.chunks)
|
|
|
|
%{
|
|
inserted: 0,
|
|
failed: 0,
|
|
errors: [],
|
|
warnings: import_state.warnings || [],
|
|
status: :running,
|
|
current_chunk: 0,
|
|
total_chunks: total,
|
|
errors_truncated?: false
|
|
}
|
|
end
|
|
|
|
@doc """
|
|
Merges a chunk result into the current progress and returns updated progress.
|
|
|
|
Caps errors at `max_errors` (default 50). Sets `status` to `:done` when all chunks
|
|
have been processed.
|
|
"""
|
|
@spec merge_progress(map(), map(), non_neg_integer(), keyword()) :: map()
|
|
def merge_progress(progress, chunk_result, current_chunk_idx, opts \\ []) do
|
|
max_errors = Keyword.get(opts, :max_errors, @default_max_errors)
|
|
|
|
all_errors = progress.errors ++ chunk_result.errors
|
|
new_errors = Enum.take(all_errors, max_errors)
|
|
errors_truncated? = length(all_errors) > max_errors
|
|
new_warnings = progress.warnings ++ Map.get(chunk_result, :warnings, [])
|
|
|
|
chunks_processed = current_chunk_idx + 1
|
|
new_status = if chunks_processed >= progress.total_chunks, do: :done, else: :running
|
|
|
|
%{
|
|
inserted: progress.inserted + chunk_result.inserted,
|
|
failed: progress.failed + chunk_result.failed,
|
|
errors: new_errors,
|
|
warnings: new_warnings,
|
|
status: new_status,
|
|
current_chunk: chunks_processed,
|
|
total_chunks: progress.total_chunks,
|
|
errors_truncated?: errors_truncated? || Map.get(chunk_result, :errors_truncated?, false)
|
|
}
|
|
end
|
|
|
|
@doc """
|
|
Returns the next action after processing a chunk: send the next chunk index or done.
|
|
"""
|
|
@spec next_chunk_action(non_neg_integer(), non_neg_integer()) ::
|
|
{:send_chunk, non_neg_integer()} | :done
|
|
def next_chunk_action(current_idx, total_chunks) do
|
|
next_idx = current_idx + 1
|
|
if next_idx < total_chunks, do: {:send_chunk, next_idx}, else: :done
|
|
end
|
|
|
|
@doc """
|
|
Processes one chunk (validate + create members), then sends `{:chunk_done, idx, result}`
|
|
or `{:chunk_error, idx, reason}` to `live_view_pid`.
|
|
|
|
Options: `:custom_field_lookup`, `:existing_error_count`, `:max_errors`, `:actor`.
|
|
"""
|
|
@spec process_chunk(
|
|
list(),
|
|
map(),
|
|
map(),
|
|
keyword(),
|
|
pid(),
|
|
non_neg_integer()
|
|
) :: :ok
|
|
def process_chunk(chunk, column_map, custom_field_map, opts, live_view_pid, idx) do
|
|
result =
|
|
try do
|
|
MemberCSV.process_chunk(chunk, column_map, custom_field_map, opts)
|
|
rescue
|
|
e -> {:error, Exception.message(e)}
|
|
catch
|
|
:exit, reason -> {:error, inspect(reason)}
|
|
:throw, reason -> {:error, inspect(reason)}
|
|
end
|
|
|
|
case result do
|
|
{:ok, chunk_result} -> send(live_view_pid, {:chunk_done, idx, chunk_result})
|
|
{:error, reason} -> send(live_view_pid, {:chunk_error, idx, reason})
|
|
end
|
|
|
|
:ok
|
|
end
|
|
|
|
@doc """
|
|
Returns a user-facing error message for chunk failures (invalid index, missing state,
|
|
or processing failure).
|
|
"""
|
|
@spec format_chunk_error(
|
|
:invalid_index | :missing_state | :processing_failed,
|
|
non_neg_integer(),
|
|
any()
|
|
) ::
|
|
String.t()
|
|
def format_chunk_error(:invalid_index, idx, _reason) do
|
|
gettext("Invalid chunk index: %{idx}", idx: idx)
|
|
end
|
|
|
|
def format_chunk_error(:missing_state, idx, _reason) do
|
|
gettext("Import state is missing. Cannot process chunk %{idx}.", idx: idx)
|
|
end
|
|
|
|
def format_chunk_error(:processing_failed, idx, reason) do
|
|
gettext("Failed to process chunk %{idx}: %{reason}", idx: idx, reason: inspect(reason))
|
|
end
|
|
end
|