476 lines
15 KiB
Elixir
476 lines
15 KiB
Elixir
defmodule Mv.Membership.Import.MemberCSV do
|
|
@moduledoc """
|
|
Service module for importing members from CSV files.
|
|
|
|
require Ash.Query
|
|
|
|
This module provides the core API for CSV member import functionality:
|
|
- `prepare/2` - Parses and validates CSV content, returns import state
|
|
- `process_chunk/3` - Processes a chunk of rows and creates members
|
|
|
|
## Error Handling
|
|
|
|
Errors are returned as `%MemberCSV.Error{}` structs containing:
|
|
- `csv_line_number` - The physical line number in the CSV file (or `nil` for general errors)
|
|
- `field` - The field name (atom) or `nil` if not field-specific
|
|
- `message` - Human-readable error message (or `nil` for general errors)
|
|
|
|
## Import State
|
|
|
|
The `import_state` returned by `prepare/2` contains:
|
|
- `chunks` - List of row chunks ready for processing
|
|
- `column_map` - Map of canonical field names to column indices
|
|
- `custom_field_map` - Map of custom field names to column indices
|
|
- `warnings` - List of warning messages (e.g., unknown custom field columns)
|
|
|
|
## Chunk Results
|
|
|
|
The `chunk_result` returned by `process_chunk/3` contains:
|
|
- `inserted` - Number of successfully created members
|
|
- `failed` - Number of failed member creations
|
|
- `errors` - List of `%MemberCSV.Error{}` structs (capped at 50 per import)
|
|
|
|
## Examples
|
|
|
|
# Prepare CSV for import
|
|
{:ok, import_state} = MemberCSV.prepare(csv_content)
|
|
|
|
# Process first chunk
|
|
chunk = Enum.at(import_state.chunks, 0)
|
|
{:ok, result} = MemberCSV.process_chunk(chunk, import_state.column_map)
|
|
"""
|
|
|
|
defmodule Error do
|
|
@moduledoc """
|
|
Error struct for CSV import errors.
|
|
|
|
## Fields
|
|
|
|
- `csv_line_number` - The physical line number in the CSV file (1-based, header is line 1)
|
|
- `field` - The field name as an atom (e.g., `:email`) or `nil` if not field-specific
|
|
- `message` - Human-readable error message
|
|
"""
|
|
defstruct csv_line_number: nil, field: nil, message: nil
|
|
|
|
@type t :: %__MODULE__{
|
|
csv_line_number: pos_integer() | nil,
|
|
field: atom() | nil,
|
|
message: String.t() | nil
|
|
}
|
|
end
|
|
|
|
@type import_state :: %{
|
|
chunks: list(list({pos_integer(), map()})),
|
|
column_map: %{atom() => non_neg_integer()},
|
|
custom_field_map: %{String.t() => non_neg_integer()},
|
|
custom_field_lookup: %{String.t() => %{id: String.t(), value_type: atom()}},
|
|
warnings: list(String.t())
|
|
}
|
|
|
|
@type chunk_result :: %{
|
|
inserted: non_neg_integer(),
|
|
failed: non_neg_integer(),
|
|
errors: list(Error.t())
|
|
}
|
|
|
|
alias Mv.Membership.Import.CsvParser
|
|
alias Mv.Membership.Import.HeaderMapper
|
|
|
|
@doc """
|
|
Prepares CSV content for import by parsing, mapping headers, and validating limits.
|
|
|
|
This function:
|
|
1. Strips UTF-8 BOM if present
|
|
2. Detects CSV delimiter (semicolon or comma)
|
|
3. Parses headers and data rows
|
|
4. Maps headers to canonical member fields
|
|
5. Maps custom field columns by name
|
|
6. Validates row count limits
|
|
7. Chunks rows for processing
|
|
|
|
## Parameters
|
|
|
|
- `file_content` - The raw CSV file content as a string
|
|
- `opts` - Optional keyword list:
|
|
- `:max_rows` - Maximum number of data rows allowed (default: 1000)
|
|
- `:chunk_size` - Number of rows per chunk (default: 200)
|
|
|
|
## Returns
|
|
|
|
- `{:ok, import_state}` - Successfully prepared import state
|
|
- `{:error, reason}` - Error reason (string or error struct)
|
|
|
|
## Examples
|
|
|
|
iex> MemberCSV.prepare("email\\njohn@example.com")
|
|
{:ok, %{chunks: [...], column_map: %{email: 0}, ...}}
|
|
|
|
iex> MemberCSV.prepare("")
|
|
{:error, "CSV file is empty"}
|
|
"""
|
|
@spec prepare(String.t(), keyword()) :: {:ok, import_state()} | {:error, String.t()}
|
|
def prepare(file_content, opts \\ []) do
|
|
max_rows = Keyword.get(opts, :max_rows, 1000)
|
|
chunk_size = Keyword.get(opts, :chunk_size, 200)
|
|
|
|
with {:ok, headers, rows} <- CsvParser.parse(file_content),
|
|
{:ok, custom_fields} <- load_custom_fields(),
|
|
{:ok, maps, warnings} <- build_header_maps(headers, custom_fields),
|
|
:ok <- validate_row_count(rows, max_rows) do
|
|
chunks = chunk_rows(rows, maps, chunk_size)
|
|
|
|
# Build custom field lookup for efficient value processing
|
|
custom_field_lookup = build_custom_field_lookup(custom_fields)
|
|
|
|
{:ok,
|
|
%{
|
|
chunks: chunks,
|
|
column_map: maps.member,
|
|
custom_field_map: maps.custom,
|
|
custom_field_lookup: custom_field_lookup,
|
|
warnings: warnings
|
|
}}
|
|
end
|
|
end
|
|
|
|
# Loads all custom fields from the database
|
|
defp load_custom_fields do
|
|
custom_fields =
|
|
Mv.Membership.CustomField
|
|
|> Ash.read!()
|
|
|
|
{:ok, custom_fields}
|
|
rescue
|
|
e ->
|
|
{:error, "Failed to load custom fields: #{Exception.message(e)}"}
|
|
end
|
|
|
|
# Builds custom field lookup map for efficient value processing
|
|
defp build_custom_field_lookup(custom_fields) do
|
|
custom_fields
|
|
|> Enum.reduce(%{}, fn cf, acc ->
|
|
id_str = to_string(cf.id)
|
|
Map.put(acc, id_str, %{id: cf.id, value_type: cf.value_type})
|
|
end)
|
|
end
|
|
|
|
# Builds header maps using HeaderMapper and collects warnings for unknown custom fields
|
|
defp build_header_maps(headers, custom_fields) do
|
|
# Convert custom fields to maps with id and name
|
|
custom_field_maps =
|
|
Enum.map(custom_fields, fn cf ->
|
|
%{id: to_string(cf.id), name: cf.name}
|
|
end)
|
|
|
|
case HeaderMapper.build_maps(headers, custom_field_maps) do
|
|
{:ok, %{member: member_map, custom: custom_map, unknown: unknown}} ->
|
|
# Build warnings for unknown custom field columns
|
|
warnings =
|
|
unknown
|
|
|> Enum.filter(fn header ->
|
|
# Check if it could be a custom field (not a known member field)
|
|
normalized = HeaderMapper.normalize_header(header)
|
|
# If it's not empty and not a member field, it might be a custom field
|
|
normalized != "" && not member_field?(normalized)
|
|
end)
|
|
|> Enum.map(fn header ->
|
|
"Unknown column '#{header}' will be ignored. " <>
|
|
"If this is a custom field, create it in Mila before importing."
|
|
end)
|
|
|
|
{:ok, %{member: member_map, custom: custom_map}, warnings}
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
# Checks if a normalized header matches a member field
|
|
# Uses HeaderMapper's internal logic to check if header would map to a member field
|
|
defp member_field?(normalized) do
|
|
# Try to build maps with just this header - if it maps to a member field, it's a member field
|
|
case HeaderMapper.build_maps([normalized], []) do
|
|
{:ok, %{member: member_map}} ->
|
|
# If member_map is not empty, it's a member field
|
|
map_size(member_map) > 0
|
|
|
|
_ ->
|
|
false
|
|
end
|
|
end
|
|
|
|
# Validates that row count doesn't exceed limit
|
|
defp validate_row_count(rows, max_rows) do
|
|
if length(rows) > max_rows do
|
|
{:error, "CSV file exceeds maximum row limit of #{max_rows} rows"}
|
|
else
|
|
:ok
|
|
end
|
|
end
|
|
|
|
# Chunks rows and converts them to row maps using column maps
|
|
defp chunk_rows(rows, maps, chunk_size) do
|
|
rows
|
|
|> Enum.chunk_every(chunk_size)
|
|
|> Enum.map(fn chunk ->
|
|
Enum.map(chunk, fn {line_number, row_values} ->
|
|
row_map = build_row_map(row_values, maps)
|
|
{line_number, row_map}
|
|
end)
|
|
end)
|
|
end
|
|
|
|
# Builds a row map from raw row values using column maps
|
|
defp build_row_map(row_values, maps) do
|
|
member_map =
|
|
maps.member
|
|
|> Enum.reduce(%{}, fn {field, index}, acc ->
|
|
value = Enum.at(row_values, index, "")
|
|
Map.put(acc, field, value)
|
|
end)
|
|
|
|
custom_map =
|
|
maps.custom
|
|
|> Enum.reduce(%{}, fn {custom_field_id, index}, acc ->
|
|
value = Enum.at(row_values, index, "")
|
|
Map.put(acc, custom_field_id, value)
|
|
end)
|
|
|
|
%{member: member_map, custom: custom_map}
|
|
end
|
|
|
|
@doc """
|
|
Processes a chunk of CSV rows and creates members.
|
|
|
|
This function:
|
|
1. Validates each row
|
|
2. Creates members via Ash resource
|
|
3. Creates custom field values for each member
|
|
4. Collects errors with correct CSV line numbers
|
|
5. Returns chunk processing results
|
|
|
|
## Parameters
|
|
|
|
- `chunk_rows_with_lines` - List of tuples `{csv_line_number, row_map}` where:
|
|
- `csv_line_number` - Physical line number in CSV (1-based)
|
|
- `row_map` - Map with `:member` and `:custom` keys containing field values
|
|
- `column_map` - Map of canonical field names (atoms) to column indices (for reference)
|
|
- `custom_field_map` - Map of custom field IDs (strings) to column indices (for reference)
|
|
- `opts` - Optional keyword list for processing options
|
|
|
|
## Returns
|
|
|
|
- `{:ok, chunk_result}` - Chunk processing results
|
|
- `{:error, reason}` - Error reason (string)
|
|
|
|
## Examples
|
|
|
|
iex> chunk = [{2, %{member: %{email: "john@example.com"}, custom: %{}}}]
|
|
iex> column_map = %{email: 0}
|
|
iex> custom_field_map = %{}
|
|
iex> MemberCSV.process_chunk(chunk, column_map, custom_field_map)
|
|
{:ok, %{inserted: 1, failed: 0, errors: []}}
|
|
"""
|
|
@spec process_chunk(
|
|
list({pos_integer(), map()}),
|
|
%{atom() => non_neg_integer()},
|
|
%{String.t() => non_neg_integer()},
|
|
keyword()
|
|
) :: {:ok, chunk_result()} | {:error, String.t()}
|
|
def process_chunk(chunk_rows_with_lines, _column_map, _custom_field_map, opts \\ []) do
|
|
custom_field_lookup = Keyword.get(opts, :custom_field_lookup, %{})
|
|
|
|
{inserted, failed, errors} =
|
|
Enum.reduce(chunk_rows_with_lines, {0, 0, []}, fn {line_number, row_map},
|
|
{acc_inserted, acc_failed, acc_errors} ->
|
|
case process_row(row_map, line_number, custom_field_lookup) do
|
|
{:ok, _member} ->
|
|
{acc_inserted + 1, acc_failed, acc_errors}
|
|
|
|
{:error, error} ->
|
|
{acc_inserted, acc_failed + 1, [error | acc_errors]}
|
|
end
|
|
end)
|
|
|
|
{:ok, %{inserted: inserted, failed: failed, errors: Enum.reverse(errors)}}
|
|
end
|
|
|
|
# Processes a single row and creates member with custom field values
|
|
defp process_row(
|
|
%{member: member_attrs, custom: custom_attrs},
|
|
line_number,
|
|
custom_field_lookup
|
|
) do
|
|
# Prepare custom field values for Ash
|
|
custom_field_values = prepare_custom_field_values(custom_attrs, custom_field_lookup)
|
|
|
|
# Create member with custom field values
|
|
member_attrs_with_cf =
|
|
member_attrs
|
|
|> Map.put(:custom_field_values, custom_field_values)
|
|
|> trim_string_values()
|
|
|
|
# Only include custom_field_values if not empty
|
|
final_attrs =
|
|
if Enum.empty?(custom_field_values) do
|
|
Map.delete(member_attrs_with_cf, :custom_field_values)
|
|
else
|
|
member_attrs_with_cf
|
|
end
|
|
|
|
case Mv.Membership.create_member(final_attrs) do
|
|
{:ok, member} ->
|
|
{:ok, member}
|
|
|
|
{:error, %Ash.Error.Invalid{} = error} ->
|
|
{:error, format_ash_error(error, line_number)}
|
|
|
|
{:error, error} ->
|
|
{:error, %Error{csv_line_number: line_number, field: nil, message: inspect(error)}}
|
|
end
|
|
rescue
|
|
e ->
|
|
{:error, %Error{csv_line_number: line_number, field: nil, message: Exception.message(e)}}
|
|
end
|
|
|
|
# Prepares custom field values from row map for Ash
|
|
defp prepare_custom_field_values(custom_attrs, custom_field_lookup) when is_map(custom_attrs) do
|
|
custom_attrs
|
|
|> Enum.filter(fn {_id, value} -> value != nil && value != "" end)
|
|
|> Enum.map(fn {custom_field_id_str, value} ->
|
|
case Map.get(custom_field_lookup, custom_field_id_str) do
|
|
nil ->
|
|
# Custom field not found, skip
|
|
nil
|
|
|
|
%{id: custom_field_id, value_type: value_type} ->
|
|
%{
|
|
"custom_field_id" => to_string(custom_field_id),
|
|
"value" => format_custom_field_value(value, value_type)
|
|
}
|
|
end
|
|
end)
|
|
|> Enum.filter(&(&1 != nil))
|
|
end
|
|
|
|
defp prepare_custom_field_values(_, _), do: []
|
|
|
|
# Formats a custom field value according to its type
|
|
# Uses _union_type and _union_value format as expected by Ash
|
|
defp format_custom_field_value(value, :string) when is_binary(value) do
|
|
%{"_union_type" => "string", "_union_value" => String.trim(value)}
|
|
end
|
|
|
|
defp format_custom_field_value(value, :integer) when is_binary(value) do
|
|
case Integer.parse(value) do
|
|
{int_value, _} -> %{"_union_type" => "integer", "_union_value" => int_value}
|
|
:error -> %{"_union_type" => "string", "_union_value" => String.trim(value)}
|
|
end
|
|
end
|
|
|
|
defp format_custom_field_value(value, :boolean) when is_binary(value) do
|
|
bool_value =
|
|
value
|
|
|> String.trim()
|
|
|> String.downcase()
|
|
|> case do
|
|
"true" -> true
|
|
"1" -> true
|
|
"yes" -> true
|
|
"ja" -> true
|
|
_ -> false
|
|
end
|
|
|
|
%{"_union_type" => "boolean", "_union_value" => bool_value}
|
|
end
|
|
|
|
defp format_custom_field_value(value, :date) when is_binary(value) do
|
|
case Date.from_iso8601(String.trim(value)) do
|
|
{:ok, date} -> %{"_union_type" => "date", "_union_value" => date}
|
|
{:error, _} -> %{"_union_type" => "string", "_union_value" => String.trim(value)}
|
|
end
|
|
end
|
|
|
|
defp format_custom_field_value(value, :email) when is_binary(value) do
|
|
%{"_union_type" => "email", "_union_value" => String.trim(value)}
|
|
end
|
|
|
|
defp format_custom_field_value(value, _type) when is_binary(value) do
|
|
# Default to string if type is unknown
|
|
%{"_union_type" => "string", "_union_value" => String.trim(value)}
|
|
end
|
|
|
|
# Trims all string values in member attributes
|
|
defp trim_string_values(attrs) do
|
|
Enum.reduce(attrs, %{}, fn {key, value}, acc ->
|
|
trimmed_value =
|
|
if is_binary(value) do
|
|
String.trim(value)
|
|
else
|
|
value
|
|
end
|
|
|
|
Map.put(acc, key, trimmed_value)
|
|
end)
|
|
end
|
|
|
|
# Formats Ash errors into MemberCSV.Error structs
|
|
defp format_ash_error(%Ash.Error.Invalid{errors: errors}, line_number) do
|
|
# Try to find email-related errors first (for better error messages)
|
|
email_error =
|
|
Enum.find(errors, fn error ->
|
|
case error do
|
|
%{field: :email} -> true
|
|
_ -> false
|
|
end
|
|
end)
|
|
|
|
case email_error || List.first(errors) do
|
|
%{field: field, message: message} when is_atom(field) ->
|
|
%Error{
|
|
csv_line_number: line_number,
|
|
field: field,
|
|
message: format_error_message(message, field)
|
|
}
|
|
|
|
%{message: message} ->
|
|
%Error{
|
|
csv_line_number: line_number,
|
|
field: nil,
|
|
message: format_error_message(message, nil)
|
|
}
|
|
|
|
_ ->
|
|
%Error{
|
|
csv_line_number: line_number,
|
|
field: nil,
|
|
message: "Validation failed"
|
|
}
|
|
end
|
|
end
|
|
|
|
# Formats error messages, handling common cases like email uniqueness
|
|
defp format_error_message(message, field) when is_binary(message) do
|
|
if email_uniqueness_error?(message, field) do
|
|
"email has already been taken"
|
|
else
|
|
message
|
|
end
|
|
end
|
|
|
|
defp format_error_message(message, _field), do: to_string(message)
|
|
|
|
# Checks if error message indicates email uniqueness constraint violation
|
|
defp email_uniqueness_error?(message, :email) do
|
|
message_lower = String.downcase(message)
|
|
|
|
String.contains?(message_lower, "unique") or
|
|
String.contains?(message_lower, "constraint") or
|
|
String.contains?(message_lower, "duplicate") or
|
|
String.contains?(message_lower, "already been taken") or
|
|
String.contains?(message_lower, "already exists") or
|
|
String.contains?(message_lower, "violates unique constraint")
|
|
end
|
|
|
|
defp email_uniqueness_error?(_message, _field), do: false
|
|
end
|