Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions integration_test/myxql/upsert_all_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,87 @@ defmodule Ecto.Integration.UpsertAllTest do

test "on conflict ignore" do
post = [title: "first", uuid: "6fa459ea-ee8a-3ca4-894e-db77e160355e"]
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) ==
{1, nil}
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) ==
{1, nil}
# Default :nothing behavior uses ON DUPLICATE KEY UPDATE x = x workaround
# which always reports rows as affected
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) == {1, nil}
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) == {1, nil}
end

test "insert_mode: :ignore_errors" do
post = [title: "first", uuid: "6fa459ea-ee8a-3ca4-894e-db77e160355e"]
# First insert succeeds - 1 row inserted
assert TestRepo.insert_all(Post, [post],
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {1, nil}

# Second insert is ignored due to duplicate - 0 rows inserted (INSERT IGNORE behavior)
assert TestRepo.insert_all(Post, [post],
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {0, nil}
end

test "insert_mode: :ignore_errors with mixed records (some conflicts, some new)" do
# Insert an existing post
existing_uuid = "6fa459ea-ee8a-3ca4-894e-db77e160355e"
existing_post = [title: "existing", uuid: existing_uuid]

assert TestRepo.insert_all(Post, [existing_post],
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {1, nil}

# Now insert a batch with one duplicate and two new records
new_uuid1 = "7fa459ea-ee8a-3ca4-894e-db77e160355f"
new_uuid2 = "8fa459ea-ee8a-3ca4-894e-db77e160355a"

posts = [
[title: "new post 1", uuid: new_uuid1],
[title: "duplicate", uuid: existing_uuid],
[title: "new post 2", uuid: new_uuid2]
]

# With INSERT IGNORE, only 2 rows should be inserted (the non-duplicates)
assert TestRepo.insert_all(Post, posts,
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {2, nil}

# Verify the data - should have 3 posts total (1 existing + 2 new)
assert length(TestRepo.all(Post)) == 3

# Verify the existing post was not modified
[original] = TestRepo.all(from p in Post, where: p.uuid == ^existing_uuid)
assert original.title == "existing"

# Verify new posts were inserted
assert TestRepo.exists?(from p in Post, where: p.uuid == ^new_uuid1)
assert TestRepo.exists?(from p in Post, where: p.uuid == ^new_uuid2)
end

test "insert_mode: :ignore_errors with all duplicates" do
# Insert initial posts
uuid1 = "1fa459ea-ee8a-3ca4-894e-db77e160355e"
uuid2 = "2fa459ea-ee8a-3ca4-894e-db77e160355e"
initial_posts = [[title: "first", uuid: uuid1], [title: "second", uuid: uuid2]]

assert TestRepo.insert_all(Post, initial_posts,
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {2, nil}

# Try to insert all duplicates
duplicate_posts = [[title: "dup1", uuid: uuid1], [title: "dup2", uuid: uuid2]]

# All are duplicates, so 0 rows inserted
assert TestRepo.insert_all(Post, duplicate_posts,
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {0, nil}

# Verify count unchanged
assert length(TestRepo.all(Post)) == 2
end

test "on conflict keyword list" do
Expand Down
66 changes: 62 additions & 4 deletions lib/ecto/adapters/myxql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,55 @@ defmodule Ecto.Adapters.MyXQL do
automatically commits after some commands like CREATE TABLE.
Therefore MySQL migrations does not run inside transactions.

### Insert Mode

The `:insert_mode` option controls the type of INSERT statement generated.
When set to `:ignore_errors`, the adapter uses MySQL's `INSERT IGNORE`
syntax:

Repo.insert_all(Post, posts, insert_mode: :ignore_errors)

With `INSERT IGNORE`, MySQL silently ignores errors that would normally
cause the statement to fail, such as duplicate key violations and certain
type conversion errors. When a row is ignored, the affected row count
will be 0 instead of 1.

See [MySQL documentation](https://dev.mysql.com/doc/refman/8.4/en/insert.html) for more details.

### Upserts

When using `on_conflict: :nothing`, the adapter uses the
`ON DUPLICATE KEY UPDATE x = x` workaround to simulate "do nothing"
behavior. This always reports 1 affected row regardless of whether
the row was actually inserted or ignored.

If you need accurate row counts (0 when ignored, 1 when inserted) at the expense of error handling,
you can combine `on_conflict: :nothing` with [`insert_mode: :ignore_errors`](#module-insert-mode):

Repo.insert_all(Post, posts,
on_conflict: :nothing,
insert_mode: :ignore_errors)

When both options are used together, `INSERT IGNORE` handles the conflict
resolution and the `ON DUPLICATE KEY UPDATE` clause is omitted.

Examples:

```elixir
Repo.insert_all(Post, posts,
on_conflict: :nothing)

# SQL: INSERT INTO `posts` (`title`, `uuid`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id` = VALUES(`id`)
```

```elixir
Repo.insert_all(Post, posts,
on_conflict: :nothing,
insert_mode: :ignore_errors)

# SQL: INSERT IGNORE INTO `posts` (`title`, `uuid`) VALUES (?,?)
```

## Old MySQL versions

### JSON support
Expand Down Expand Up @@ -319,7 +368,10 @@ defmodule Ecto.Adapters.MyXQL do

key = primary_key!(schema_meta, returning)
{fields, values} = :lists.unzip(params)
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, [], [])

# Extract insert_mode and pass it to the connection's insert function
insert_opts = if opts[:insert_mode], do: [insert_mode: opts[:insert_mode]], else: []
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, [], [], insert_opts)

opts =
if is_nil(Keyword.get(opts, :cache_statement)) do
Expand All @@ -330,9 +382,15 @@ defmodule Ecto.Adapters.MyXQL do

case Ecto.Adapters.SQL.query(adapter_meta, sql, values ++ query_params, opts) do
{:ok, %{num_rows: 0}} ->
raise "insert operation failed to insert any row in the database. " <>
"This may happen if you have trigger or other database conditions rejecting operations. " <>
"The emitted SQL was: #{sql}"
# With INSERT IGNORE (insert_mode: :ignore_errors), 0 rows means the row
# was ignored due to a conflict, which is expected behavior
if opts[:insert_mode] == :ignore_errors do
{:ok, []}
else
raise "insert operation failed to insert any row in the database. " <>
"This may happen if you have trigger or other database conditions rejecting operations. " <>
"The emitted SQL was: #{sql}"
end

# We were used to check if num_rows was 1 or 2 (in case of upserts)
# but MariaDB supports tables with System Versioning, and in those
Expand Down
46 changes: 33 additions & 13 deletions lib/ecto/adapters/myxql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -179,41 +179,61 @@ if Code.ensure_loaded?(MyXQL) do
end

@impl true
def insert(prefix, table, header, rows, on_conflict, [], []) do
def insert(prefix, table, header, rows, on_conflict, returning, placeholders, opts \\ [])

def insert(prefix, table, header, rows, on_conflict, [], [], opts) do
fields = quote_names(header)
insert_mode = Keyword.get(opts, :insert_mode)
insert_keyword = insert_keyword(insert_mode)

[
"INSERT INTO ",
insert_keyword,
quote_table(prefix, table),
" (",
fields,
") ",
insert_all(rows) | on_conflict(on_conflict, header)
insert_all(rows) | on_conflict(on_conflict, header, opts)
]
end

def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, []) do
def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, [], _opts) do
error!(nil, ":returning is not supported in insert/insert_all by MySQL")
end

def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, _placeholders) do
def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, _placeholders, _opts) do
error!(nil, ":placeholders is not supported by MySQL")
end

defp on_conflict({_, _, [_ | _]}, _header) do
# INSERT IGNORE when insert_mode: :ignore_errors is passed, independent of on_conflict
defp insert_keyword(:ignore_errors) do
"INSERT IGNORE INTO "
end

defp insert_keyword(_) do
"INSERT INTO "
end

defp on_conflict({_, _, [_ | _]}, _header, _opts) do
error!(nil, ":conflict_target is not supported in insert/insert_all by MySQL")
end

defp on_conflict({:raise, _, []}, _header) do
defp on_conflict({:raise, _, []}, _header, _opts) do
[]
end

defp on_conflict({:nothing, _, []}, [field | _]) do
quoted = quote_name(field)
[" ON DUPLICATE KEY UPDATE ", quoted, " = " | quoted]
# When insert_mode: :ignore_errors is used with on_conflict: :nothing,
# INSERT IGNORE already handles conflicts - no ON DUPLICATE KEY UPDATE needed
defp on_conflict({:nothing, _, []}, [field | _], opts) do
if Keyword.get(opts, :insert_mode) == :ignore_errors do
[]
else
# Default :nothing without INSERT IGNORE - uses workaround to simulate "do nothing" behavior
quoted = quote_name(field)
[" ON DUPLICATE KEY UPDATE ", quoted, " = " | quoted]
end
end

defp on_conflict({fields, _, []}, _header) when is_list(fields) do
defp on_conflict({fields, _, []}, _header, _opts) when is_list(fields) do
[
" ON DUPLICATE KEY UPDATE "
| Enum.map_intersperse(fields, ?,, fn field ->
Expand All @@ -223,11 +243,11 @@ if Code.ensure_loaded?(MyXQL) do
]
end

defp on_conflict({%{wheres: []} = query, _, []}, _header) do
defp on_conflict({%{wheres: []} = query, _, []}, _header, _opts) do
[" ON DUPLICATE KEY " | update_all(query, "UPDATE ")]
end

defp on_conflict({_query, _, []}, _header) do
defp on_conflict({_query, _, []}, _header, _opts) do
error!(
nil,
"Using a query with :where in combination with the :on_conflict option is not supported by MySQL"
Expand Down
2 changes: 1 addition & 1 deletion lib/ecto/adapters/postgres/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ if Code.ensure_loaded?(Postgrex) do
end

@impl true
def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do
def insert(prefix, table, header, rows, on_conflict, returning, placeholders, _opts \\ []) do
counter_offset = length(placeholders) + 1

values =
Expand Down
2 changes: 1 addition & 1 deletion lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ defmodule Ecto.Adapters.SQL do
rows -> unzip_inserts(header, rows)
end

sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders)
sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders, opts)

opts =
if is_nil(Keyword.get(opts, :cache_statement)) do
Expand Down
3 changes: 2 additions & 1 deletion lib/ecto/adapters/sql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ defmodule Ecto.Adapters.SQL.Connection do
rows :: [[atom | nil]],
on_conflict :: Ecto.Adapter.Schema.on_conflict(),
returning :: [atom],
placeholders :: [term]
placeholders :: [term],
opts :: Keyword.t()
) :: iodata

@doc """
Expand Down
2 changes: 1 addition & 1 deletion lib/ecto/adapters/tds/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ if Code.ensure_loaded?(Tds) do
end

@impl true
def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do
def insert(prefix, table, header, rows, on_conflict, returning, placeholders, _opts \\ []) do
counter_offset = length(placeholders) + 1
[] = on_conflict(on_conflict, header)
returning = returning(returning, "INSERTED")
Expand Down
21 changes: 19 additions & 2 deletions test/ecto/adapters/myxql_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ defmodule Ecto.Adapters.MyXQLTest do
defp delete_all(query), do: query |> SQL.delete_all() |> IO.iodata_to_binary()
defp execute_ddl(query), do: query |> SQL.execute_ddl() |> Enum.map(&IO.iodata_to_binary/1)

defp insert(prefx, table, header, rows, on_conflict, returning) do
IO.iodata_to_binary(SQL.insert(prefx, table, header, rows, on_conflict, returning, []))
defp insert(prefx, table, header, rows, on_conflict, returning, opts \\ []) do
IO.iodata_to_binary(SQL.insert(prefx, table, header, rows, on_conflict, returning, [], opts))
end

defp update(prefx, table, fields, filter, returning) do
Expand Down Expand Up @@ -1466,6 +1466,7 @@ defmodule Ecto.Adapters.MyXQLTest do
end

test "insert with on duplicate key" do
# Default :nothing uses ON DUPLICATE KEY UPDATE workaround
query = insert(nil, "schema", [:x, :y], [[:x, :y]], {:nothing, [], []}, [])

assert query ==
Expand Down Expand Up @@ -1499,6 +1500,22 @@ defmodule Ecto.Adapters.MyXQLTest do
end
end

test "insert with insert_mode: :ignore_errors" do
# INSERT IGNORE via insert_mode: :ignore_errors option
query =
insert(
nil,
"schema",
[:x, :y],
[[:x, :y]],
{:nothing, [], []},
[],
insert_mode: :ignore_errors
)

assert query == ~s{INSERT IGNORE INTO `schema` (`x`,`y`) VALUES (?,?)}
end

test "insert with query" do
select_query = from("schema", select: [:id]) |> plan(:all)

Expand Down
Loading