diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index a7b385cce..1678024ee 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -33,6 +33,8 @@ abstract class Adapter protected bool $alterLocks = false; + protected bool $skipDuplicates = false; + /** * @var array */ @@ -392,6 +394,27 @@ public function inTransaction(): bool return $this->inTransaction > 0; } + /** + * Run a callback with skipDuplicates enabled. + * Duplicate key errors during createDocuments() will be silently skipped + * instead of thrown. Nestable — saves and restores previous state. + * + * @template T + * @param callable(): T $callback + * @return T + */ + public function skipDuplicates(callable $callback): mixed + { + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + /** * @template T * @param callable(): T $callback diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index b654b436e..311b60476 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -122,6 +122,11 @@ public function withTransaction(callable $callback): mixed return $callback(); } + // upsert + $setOnInsert hits WriteConflict (E112) under txn snapshot isolation. + if ($this->skipDuplicates) { + return $callback(); + } + try { $this->startTransaction(); $result = $callback(); @@ -1492,6 +1497,42 @@ public function createDocuments(Document $collection, array $documents): array $records[] = $record; } + // insertMany aborts the txn on any duplicate; upsert + $setOnInsert no-ops instead. + if ($this->skipDuplicates) { + if (empty($records)) { + return []; + } + + $operations = []; + foreach ($records as $record) { + $filter = ['_uid' => $record['_uid'] ?? '']; + if ($this->sharedTables) { + $filter['_tenant'] = $record['_tenant'] ?? $this->getTenant(); + } + + // Filter fields can't reappear in $setOnInsert (mongo path-conflict error). + $setOnInsert = $record; + unset($setOnInsert['_uid'], $setOnInsert['_tenant']); + + if (empty($setOnInsert)) { + continue; + } + + $operations[] = [ + 'filter' => $filter, + 'update' => ['$setOnInsert' => $setOnInsert], + ]; + } + + try { + $this->client->upsert($name, $operations, $options); + } catch (MongoException $e) { + throw $this->processException($e); + } + + return $documents; + } + try { $documents = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 668753387..7bbfb98f2 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -43,6 +43,11 @@ public function __construct(UtopiaPool $pool) public function delegate(string $method, array $args): mixed { if ($this->pinnedAdapter !== null) { + if ($this->skipDuplicates) { + return $this->pinnedAdapter->skipDuplicates( + fn () => $this->pinnedAdapter->{$method}(...$args) + ); + } return $this->pinnedAdapter->{$method}(...$args); } @@ -66,6 +71,11 @@ public function delegate(string $method, array $args): mixed $adapter->setMetadata($key, $value); } + if ($this->skipDuplicates) { + return $adapter->skipDuplicates( + fn () => $adapter->{$method}(...$args) + ); + } return $adapter->{$method}(...$args); }); } @@ -146,6 +156,11 @@ public function withTransaction(callable $callback): mixed $this->pinnedAdapter = $adapter; try { + if ($this->skipDuplicates) { + return $adapter->skipDuplicates( + fn () => $adapter->withTransaction($callback) + ); + } return $adapter->withTransaction($callback); } finally { $this->pinnedAdapter = null; diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 8dcf72025..2c27e08e7 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -2350,6 +2350,35 @@ public function getSupportForOptionalSpatialAttributeWithExistingRows(): bool return false; } + protected function getInsertKeyword(): string + { + return 'INSERT INTO'; + } + + protected function getInsertSuffix(string $table): string + { + if (!$this->skipDuplicates) { + return ''; + } + + $conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + protected function getInsertPermissionsSuffix(): string + { + if (!$this->skipDuplicates) { + return ''; + } + + $conflictTarget = $this->sharedTables + ? '("_type", "_permission", "_document", "_tenant")' + : '("_type", "_permission", "_document")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + public function decodePoint(string $wkb): array { if (str_starts_with(strtoupper($wkb), 'POINT(')) { diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 6864e6aee..3fe2696db 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -1029,6 +1029,33 @@ public function getSupportForHostname(): bool return true; } + /** + * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertKeyword(): string + { + return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + } + + /** + * Returns a suffix appended after VALUES clause for duplicate handling. + * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). + */ + protected function getInsertSuffix(string $table): string + { + return ''; + } + + /** + * Returns a suffix for the permissions INSERT statement when ignoring duplicates. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertPermissionsSuffix(): string + { + return ''; + } + /** * Get current attribute count from collection document * @@ -2476,6 +2503,7 @@ public function createDocuments(Document $collection, array $documents): array if (empty($documents)) { return $documents; } + $spatialAttributes = $this->getSpatialAttributes($collection); $collection = $collection->getId(); try { @@ -2573,8 +2601,9 @@ public function createDocuments(Document $collection, array $documents): array $batchKeys = \implode(', ', $batchKeys); $stmt = $this->getPDO()->prepare(" - INSERT INTO {$this->getSQLTable($name)} {$columns} + {$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} + {$this->getInsertSuffix($name)} "); foreach ($bindValues as $key => $value) { @@ -2588,8 +2617,9 @@ public function createDocuments(Document $collection, array $documents): array $permissions = \implode(', ', $permissions); $sqlPermissions = " - INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) - VALUES {$permissions}; + {$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + VALUES {$permissions} + {$this->getInsertPermissionsSuffix()} "; $stmtPermissions = $this->getPDO()->prepare($sqlPermissions); diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 3c25987eb..33f370775 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -1936,4 +1936,9 @@ public function getSupportForTTLIndexes(): bool { return false; } + + protected function getInsertKeyword(): string + { + return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + } } diff --git a/src/Database/Database.php b/src/Database/Database.php index bae99ae79..f4dcede34 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -417,6 +417,8 @@ class Database protected bool $preserveDates = false; + protected bool $skipDuplicates = false; + protected bool $preserveSequence = false; protected int $maxQueryValues = 5000; @@ -842,6 +844,29 @@ public function skipRelationshipsExistCheck(callable $callback): mixed } } + public function skipDuplicates(callable $callback): mixed + { + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + + /** + * Build a tenant-aware identity key for a document. + * Returns ":" in tenant-per-document shared-table mode, otherwise just the id. + */ + private function tenantKey(Document $document): string + { + return ($this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + } + /** * Trigger callback for events * @@ -5700,9 +5725,11 @@ public function createDocuments( } foreach (\array_chunk($documents, $batchSize) as $chunk) { - $batch = $this->withTransaction(function () use ($collection, $chunk) { - return $this->adapter->createDocuments($collection, $chunk); - }); + $insert = fn () => $this->withTransaction(fn () => $this->adapter->createDocuments($collection, $chunk)); + // Set adapter flag before withTransaction so Mongo can opt out of a real txn. + $batch = $this->skipDuplicates + ? $this->adapter->skipDuplicates($insert) + : $insert(); $batch = $this->adapter->getSequences($collection->getId(), $batch); @@ -7116,18 +7143,57 @@ public function upsertDocumentsWithIncrease( $created = 0; $updated = 0; $seenIds = []; - foreach ($documents as $key => $document) { - if ($this->getSharedTables() && $this->getTenantPerDocument()) { - $old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - )))); - } else { - $old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - ))); + + // Batch-fetch existing documents in one query instead of N individual getDocument() calls. + // tenantPerDocument: group ids by tenant and run one find() per tenant under withTenant, + // so cross-tenant batches (e.g. StatsUsage worker) don't get silently scoped to the + // session tenant and miss rows belonging to other tenants. + $existingDocs = []; + + if ($this->getSharedTables() && $this->getTenantPerDocument()) { + $idsByTenant = []; + foreach ($documents as $doc) { + if ($doc->getId() !== '') { + $idsByTenant[$doc->getTenant()][] = $doc->getId(); + } + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $chunk) { + $found = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent( + fn () => $this->find($collection->getId(), [ + Query::equal('$id', $chunk), + Query::limit(PHP_INT_MAX), + ]) + ))); + foreach ($found as $doc) { + $existingDocs[$tenant . ':' . $doc->getId()] = $doc; + } + } + } + } else { + $docIds = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents), + fn ($id) => $id !== '' + ))); + + if (!empty($docIds)) { + foreach (\array_chunk($docIds, \max(1, $this->maxQueryValues)) as $chunk) { + $existing = $this->authorization->skip(fn () => $this->silent( + fn () => $this->find($collection->getId(), [ + Query::equal('$id', $chunk), + Query::limit(PHP_INT_MAX), + ]) + )); + foreach ($existing as $doc) { + $existingDocs[$this->tenantKey($doc)] = $doc; + } + } } + } + + foreach ($documents as $key => $document) { + $old = $existingDocs[$this->tenantKey($document)] ?? new Document(); // Extract operators early to avoid comparison issues $documentArray = $document->getArrayCopy(); @@ -7294,7 +7360,7 @@ public function upsertDocumentsWithIncrease( $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); } - $seenIds[] = $document->getId(); + $seenIds[] = $this->tenantKey($document); $old = $this->adapter->castingBefore($collection, $old); $document = $this->adapter->castingBefore($collection, $document); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index f740cab3e..2f799eb88 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -601,13 +601,36 @@ public function createDocuments( ?callable $onNext = null, ?callable $onError = null, ): int { - $modified = $this->source->createDocuments( - $collection, - $documents, - $batchSize, - $onNext, - $onError, - ); + // In skipDuplicates mode, identify which input ids already exist on source. + // These will be silently no-oped by the adapter's INSERT IGNORE and must + // not propagate to destination — a skipped duplicate is not a user write. + $existingIds = []; + if ($this->skipDuplicates) { + $ids = \array_values(\array_filter( + \array_map(fn (Document $d) => $d->getId(), $documents), + fn ($id) => $id !== '' + )); + + if (!empty($ids)) { + foreach (\array_chunk(\array_unique($ids), \max(1, $this->maxQueryValues)) as $chunk) { + $existing = $this->source->silent( + fn () => $this->source->find($collection, [ + Query::equal('$id', $chunk), + Query::limit(PHP_INT_MAX), + ]) + ); + foreach ($existing as $doc) { + $existingIds[$doc->getId()] = true; + } + } + } + } + + $modified = $this->skipDuplicates + ? $this->source->skipDuplicates( + fn () => $this->source->createDocuments($collection, $documents, $batchSize, $onNext, $onError) + ) + : $this->source->createDocuments($collection, $documents, $batchSize, $onNext, $onError); if ( \in_array($collection, self::SOURCE_ONLY_COLLECTIONS) @@ -621,12 +644,23 @@ public function createDocuments( return $modified; } + // In skipDuplicates mode, drop pre-existing ids so their no-op writes + // don't propagate. Non-skip mode forwards everything as before. + $toForward = $this->skipDuplicates + ? \array_values(\array_filter( + $documents, + fn (Document $d) => $d->getId() === '' || !isset($existingIds[$d->getId()]) + )) + : $documents; + + if (empty($toForward)) { + return $modified; + } + try { $clones = []; - - foreach ($documents as $document) { + foreach ($toForward as $document) { $clone = clone $document; - foreach ($this->writeFilters as $filter) { $clone = $filter->beforeCreateDocument( source: $this->source, @@ -635,18 +669,25 @@ public function createDocuments( document: $clone, ); } - $clones[] = $clone; } - $this->destination->withPreserveDates( - fn () => - $this->destination->createDocuments( - $collection, - $clones, - $batchSize, - ) - ); + if ($this->skipDuplicates) { + $this->destination->skipDuplicates( + fn () => $this->destination->withPreserveDates( + fn () => $this->destination->createDocuments($collection, $clones, $batchSize) + ) + ); + } else { + $this->destination->withPreserveDates( + fn () => + $this->destination->createDocuments( + $collection, + $clones, + $batchSize, + ) + ); + } foreach ($clones as $clone) { foreach ($this->writeFilters as $filter) { diff --git a/tests/e2e/Adapter/MirrorTest.php b/tests/e2e/Adapter/MirrorTest.php index 31bf3f3b6..c513ddfcf 100644 --- a/tests/e2e/Adapter/MirrorTest.php +++ b/tests/e2e/Adapter/MirrorTest.php @@ -313,6 +313,83 @@ public function testDeleteMirroredDocument(): void $this->assertTrue($database->getDestination()->getDocument('testDeleteMirroredDocument', $document->getId())->isEmpty()); } + public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): void + { + $database = $this->getDatabase(); + $collection = 'mirrorSkipDup'; + + $database->createCollection($collection, attributes: [ + new Document([ + '$id' => 'name', + 'type' => Database::VAR_STRING, + 'required' => true, + 'size' => Database::LENGTH_KEY, + ]), + ], permissions: [ + Permission::create(Role::any()), + Permission::read(Role::any()), + ], documentSecurity: false); + + // Seed the SOURCE only (bypass the mirror) with the row we want to + // skipDuplicates over later. Destination intentionally does NOT have it. + $database->getSource()->createDocument($collection, new Document([ + '$id' => 'dup', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ])); + + // Sanity check setup + $this->assertSame( + 'Original', + $database->getSource()->getDocument($collection, 'dup')->getAttribute('name') + ); + $this->assertTrue( + $database->getDestination()->getDocument($collection, 'dup')->isEmpty() + ); + + $database->skipDuplicates(fn () => $database->createDocuments($collection, [ + new Document([ + '$id' => 'dup', + 'name' => 'WouldBe', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'fresh', + 'name' => 'Fresh', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ])); + + $this->assertSame( + 'Original', + $database->getSource()->getDocument($collection, 'dup')->getAttribute('name') + ); + $this->assertSame( + 'Fresh', + $database->getSource()->getDocument($collection, 'fresh')->getAttribute('name') + ); + + // A source-skipped duplicate is a no-op and must not propagate to + // destination. Only the genuinely-inserted 'fresh' row should mirror. + $this->assertTrue( + $database->getDestination()->getDocument($collection, 'dup')->isEmpty(), + 'Source-skipped doc must not be inserted into destination' + ); + $this->assertSame( + 'Fresh', + $database->getDestination()->getDocument($collection, 'fresh')->getAttribute('name') + ); + } + protected function deleteColumn(string $collection, string $column): bool { $sqlTable = "`" . self::$source->getDatabase() . "`.`" . self::$source->getNamespace() . "_" . $collection . "`"; diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index 49d75e4b6..c45610c28 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -7855,4 +7855,409 @@ public function testRegexInjection(): void // } // $database->deleteCollection($collectionName); // } + + public function testCreateDocumentsIgnoreDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial documents + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Original A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc2', + 'name' => 'Original B', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // Without ignore, duplicates should throw + try { + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + $this->fail('Expected DuplicateException'); + } catch (DuplicateException $e) { + $this->assertNotEmpty($e->getMessage()); + } + + // With skipDuplicates, duplicates should be silently skipped + $emittedIds = []; + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc3', + 'name' => 'New C', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + $this->assertSame(2, $count); + $this->assertCount(2, $emittedIds); + \sort($emittedIds); + $this->assertSame(['doc1', 'doc3'], $emittedIds); + + $doc1 = $database->getDocument(__FUNCTION__, 'doc1'); + $this->assertSame('Original A', $doc1->getAttribute('name')); + + $doc3 = $database->getDocument(__FUNCTION__, 'doc3'); + $this->assertSame('New C', $doc3->getAttribute('name')); + + // Total should be 3 (doc1, doc2, doc3) + $all = $database->find(__FUNCTION__); + $this->assertCount(3, $all); + } + + public function testCreateDocumentsIgnoreAllDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial document + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // With skipDuplicates, inserting only duplicates should succeed with no new rows + $emittedIds = []; + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'existing', + 'name' => 'Duplicate', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + $this->assertSame(1, $count); + $this->assertSame(['existing'], $emittedIds); + + $doc = $database->getDocument(__FUNCTION__, 'existing'); + $this->assertSame('Original', $doc->getAttribute('name')); + + // Still only 1 document + $all = $database->find(__FUNCTION__); + $this->assertCount(1, $all); + } + + public function testCreateDocumentsSkipDuplicatesEmptyBatch(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupEmpty'; + $database->createCollection($collection); + $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true); + + $count = $database->skipDuplicates(fn () => $database->createDocuments($collection, [])); + + $this->assertSame(0, $count); + $this->assertCount(0, $database->find($collection)); + } + + public function testCreateDocumentsSkipDuplicatesNestedScope(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupNested'; + $database->createCollection($collection); + $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true); + + $makeDoc = fn (string $id, string $name) => new Document([ + '$id' => $id, + 'name' => $name, + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]); + + // Seed an existing doc + $database->createDocuments($collection, [$makeDoc('seed', 'Seed')]); + + // Nested scope — inner scope runs inside outer scope. + // After inner exits, outer state should still be "skip enabled". + // After outer exits, state should restore to "skip disabled". + $countOuter = $database->skipDuplicates(function () use ($database, $collection, $makeDoc) { + // Inner scope: add dup + new + $countInner = $database->skipDuplicates(function () use ($database, $collection, $makeDoc) { + return $database->createDocuments($collection, [ + $makeDoc('seed', 'Dup'), + $makeDoc('innerNew', 'InnerNew'), + ]); + }); + $this->assertSame(2, $countInner); + + // Still inside outer scope — skip flag should still be on + return $database->createDocuments($collection, [ + $makeDoc('seed', 'Dup2'), + $makeDoc('outerNew', 'OuterNew'), + ]); + }); + $this->assertSame(2, $countOuter); + + // After both scopes exit, skip flag is off again — a plain createDocuments + // call with a duplicate should throw. + $thrown = null; + try { + $database->createDocuments($collection, [$makeDoc('seed', 'ShouldThrow')]); + } catch (DuplicateException $e) { + $thrown = $e; + } + $this->assertNotNull($thrown, 'Plain createDocuments after nested scopes should throw on duplicate'); + + // Final state: seed + innerNew + outerNew + $all = $database->find($collection); + $ids = \array_map(fn (Document $d) => $d->getId(), $all); + \sort($ids); + $this->assertSame(['innerNew', 'outerNew', 'seed'], $ids); + } + + public function testCreateDocumentsSkipDuplicatesLargeBatch(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupLarge'; + $database->createCollection($collection); + $database->createAttribute($collection, 'idx', Database::VAR_INTEGER, 0, true); + + // Seed 50 docs + $seed = []; + for ($i = 0; $i < 50; $i++) { + $seed[] = new Document([ + '$id' => 'doc_' . $i, + 'idx' => $i, + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]); + } + $database->createDocuments($collection, $seed); + + // Now call skipDuplicates with 300 docs: 50 existing (0-49) + 250 new (50-299). + // 300 > default INSERT_BATCH_SIZE, so this exercises the chunk loop. + $batch = []; + for ($i = 0; $i < 300; $i++) { + $batch[] = new Document([ + '$id' => 'doc_' . $i, + 'idx' => $i + 1000, // different value so we can detect if existing got overwritten + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]); + } + + $emittedIds = []; + $count = $database->skipDuplicates(function () use ($database, $collection, $batch, &$emittedIds) { + return $database->createDocuments($collection, $batch, onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + $this->assertSame(300, $count); + $this->assertCount(300, $emittedIds); + + $seedDoc = $database->getDocument($collection, 'doc_25'); + $this->assertSame(25, $seedDoc->getAttribute('idx')); + + $newDoc = $database->getDocument($collection, 'doc_100'); + $this->assertSame(1100, $newDoc->getAttribute('idx')); + + $total = $database->count($collection); + $this->assertSame(300, $total); + } + + public function testCreateDocumentsSkipDuplicatesSecondCallSkipsAll(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupSecond'; + $database->createCollection($collection); + $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true); + + $makeBatch = fn (string $name) => \array_map( + fn (string $id) => new Document([ + '$id' => $id, + 'name' => $name, + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ['a', 'b', 'c'] + ); + + // First call — all new + $firstCount = $database->skipDuplicates( + fn () => $database->createDocuments($collection, $makeBatch('First')) + ); + $this->assertSame(3, $firstCount); + + $emittedIds = []; + $secondCount = $database->skipDuplicates(function () use ($database, $collection, $makeBatch, &$emittedIds) { + return $database->createDocuments($collection, $makeBatch('Second'), onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + $this->assertSame(3, $secondCount); + \sort($emittedIds); + $this->assertSame(['a', 'b', 'c'], $emittedIds); + + // All three should retain the First values + foreach (['a', 'b', 'c'] as $id) { + $doc = $database->getDocument($collection, $id); + $this->assertSame('First', $doc->getAttribute('name'), "Doc {$id} should not have been overwritten"); + } + } + + public function testCreateDocumentsSkipDuplicatesRelationships(): void + { + $database = $this->getDatabase(); + + if (!$database->getAdapter()->getSupportForRelationships()) { + $this->expectNotToPerformAssertions(); + return; + } + + $parent = 'skipDupParent'; + $child = 'skipDupChild'; + $permissions = [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::any()), + Permission::delete(Role::any()), + ]; + + $database->createCollection($parent); + $database->createCollection($child); + $database->createAttribute($parent, 'name', Database::VAR_STRING, 128, true); + $database->createAttribute($child, 'name', Database::VAR_STRING, 128, true); + $database->createRelationship( + collection: $parent, + relatedCollection: $child, + type: Database::RELATION_ONE_TO_MANY, + id: 'children', + ); + + $database->createDocument($parent, new Document([ + '$id' => 'existingParent', + 'name' => 'ExistingParent', + '$permissions' => $permissions, + 'children' => [ + new Document([ + '$id' => 'existingChild', + 'name' => 'ExistingChild', + '$permissions' => $permissions, + ]), + ], + ])); + + $batch = [ + new Document([ + '$id' => 'existingParent', + 'name' => 'ShouldNotOverwrite', + '$permissions' => $permissions, + 'children' => [ + new Document([ + '$id' => 'existingChild', + 'name' => 'ExistingChild', + '$permissions' => $permissions, + ]), + new Document([ + '$id' => 'retryChild', + 'name' => 'RetryChild', + '$permissions' => $permissions, + ]), + ], + ]), + new Document([ + '$id' => 'newParent', + 'name' => 'NewParent', + '$permissions' => $permissions, + 'children' => [ + new Document([ + '$id' => 'newChild', + 'name' => 'NewChild', + '$permissions' => $permissions, + ]), + ], + ]), + ]; + + $database->skipDuplicates(fn () => $database->createDocuments($parent, $batch)); + + $existing = $database->getDocument($parent, 'existingParent'); + $this->assertFalse($existing->isEmpty()); + $this->assertSame('ExistingParent', $existing->getAttribute('name')); + + $existingChildren = $existing->getAttribute('children', []); + $childIds = \array_map(fn (Document $d) => $d->getId(), $existingChildren); + \sort($childIds); + $this->assertSame(['existingChild', 'retryChild'], $childIds); + + $new = $database->getDocument($parent, 'newParent'); + $this->assertFalse($new->isEmpty()); + $this->assertSame('NewParent', $new->getAttribute('name')); + $newChildren = $new->getAttribute('children', []); + $this->assertCount(1, $newChildren); + $this->assertSame('newChild', $newChildren[0]->getId()); + + $allChildren = $database->find($child); + $allChildIds = \array_map(fn (Document $d) => $d->getId(), $allChildren); + \sort($allChildIds); + $this->assertSame(['existingChild', 'newChild', 'retryChild'], $allChildIds); + } }