From dfe1234cf68023f6ac4765042b5e57a96e1131a5 Mon Sep 17 00:00:00 2001 From: Scott Murphy Heiberg Date: Fri, 5 Jun 2026 01:08:45 -0700 Subject: [PATCH] Add opt-in MongoDB multi-document transactions to GORM for MongoDB GORM for MongoDB previously treated a transaction as a client-side flush boundary: pending writes were batched and flushed on commit, but each write auto-committed individually and nothing rolled back when a later operation failed. This adds real server-side transactions backed by a com.mongodb.client.ClientSession. When grails.mongodb.transactional is enabled (default false), a GORM transaction starts a ClientSession and MongoDB transaction and every read and write for the session runs within it, committing or aborting atomically. A new MongoTransaction drives the commit (retrying on an UnknownTransactionCommitResult) and the abort, and closes the session afterwards. The feature is opt-in and degrades gracefully: a standalone topology is detected at runtime and falls back to the legacy flush-only behavior with a one-time warning. Identifier generation for native Long ids is intentionally left non-transactional, mirroring the semantics of database sequences. --- .../gorm/mongo/api/MongoStaticApi.groovy | 21 +-- .../mapping/mongo/AbstractMongoSession.java | 127 ++++++++++++++ .../mapping/mongo/MongoCodecSession.groovy | 14 +- .../mapping/mongo/MongoDatastore.java | 53 ++++++ .../datastore/mapping/mongo/MongoSession.java | 21 +-- .../mapping/mongo/MongoTransaction.java | 156 +++++++++++++++++ ...stractMongoConnectionSourceSettings.groovy | 10 ++ .../engine/MongoCodecEntityPersister.groovy | 5 +- .../mongo/engine/MongoEntityPersister.java | 4 +- .../mapping/mongo/query/MongoQuery.java | 9 +- .../MongoTransactionDisabledSpec.groovy | 76 ++++++++ .../transactions/MongoTransactionSpec.groovy | 163 ++++++++++++++++++ .../gettingStarted/advancedConfig.adoc | 29 ++++ 13 files changed, 640 insertions(+), 48 deletions(-) create mode 100644 grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoTransaction.java create mode 100644 grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionDisabledSpec.groovy create mode 100644 grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionSpec.groovy diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/gorm/mongo/api/MongoStaticApi.groovy b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/gorm/mongo/api/MongoStaticApi.groovy index f460cfb2c2c..8d549d1d975 100644 --- a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/gorm/mongo/api/MongoStaticApi.groovy +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/gorm/mongo/api/MongoStaticApi.groovy @@ -71,9 +71,9 @@ class MongoStaticApi extends GormStaticApi implements MongoAllOperations def entity = session.mappingContext.getPersistentEntity(persistentClass.name) filter = wrapFilterWithMultiTenancy(filter) - return session.getCollection(entity) + MongoCollection collection = session.getCollection(entity) .withDocumentClass(persistentClass) - .find(filter) + return session.find(collection, filter) } } @@ -84,10 +84,8 @@ class MongoStaticApi extends GormStaticApi implements MongoAllOperations mongoCollection = session.getCollection(entity) .withDocumentClass(persistentClass) - D result = options ? mongoCollection - .findOneAndDelete(filter, options) : - mongoCollection - .findOneAndDelete(filter) + D result = options ? session.findOneAndDelete(mongoCollection, filter, options) : + session.findOneAndDelete(mongoCollection, filter) return result } @@ -97,8 +95,7 @@ class MongoStaticApi extends GormStaticApi implements MongoAllOperations def entity = session.mappingContext.getPersistentEntity(persistentClass.name) filter = wrapFilterWithMultiTenancy(filter) - return session.getCollection(entity) - .countDocuments(filter) + return session.countDocuments(session.getCollection(entity), filter) } } @@ -201,7 +198,7 @@ class MongoStaticApi extends GormStaticApi implements MongoAllOperations newPipeline = preparePipeline(pipeline) - AggregateIterable aggregateIterable = mongoCollection.aggregate(newPipeline) + AggregateIterable aggregateIterable = session.aggregate(mongoCollection, newPipeline) if (doWithAggregate != null) { aggregateIterable = doWithAggregate.apply(aggregateIterable) } @@ -216,7 +213,7 @@ class MongoStaticApi extends GormStaticApi implements MongoAllOperations newPipeline = preparePipeline(pipeline) def mongoCollection = session.getCollection(persistentEntity) .withReadPreference(readPreference) - def aggregateIterable = mongoCollection.aggregate(newPipeline) + def aggregateIterable = session.aggregate(mongoCollection, newPipeline) if (doWithAggregate != null) { aggregateIterable = doWithAggregate.apply(aggregateIterable) } @@ -243,7 +240,7 @@ class MongoStaticApi extends GormStaticApi implements MongoAllOperations extends GormStaticApi implements MongoAllOperations protected MongoDatastore mongoDatastore; protected WriteConcern writeConcern = null; protected boolean errorOccured = false; + protected ClientSession clientSession; protected Map mongoCollections = new ConcurrentHashMap<>(); protected Map mongoDatabases = new ConcurrentHashMap<>(); @@ -200,6 +213,120 @@ public MongoMappingContext getMappingContext() { return (MongoMappingContext) super.getMappingContext(); } + /** + * @return the active {@link ClientSession} for the current MongoDB transaction, or {@code null} + * if no server-side transaction is in progress + */ + public ClientSession getClientSession() { + return clientSession; + } + + /** + * @return {@code true} if a server-side MongoDB transaction is currently active on this session + */ + public boolean hasActiveTransaction() { + return clientSession != null && clientSession.hasActiveTransaction(); + } + + /** + * Detaches the {@link ClientSession} from this session once its transaction has completed. + * Called by {@link MongoTransaction} after commit or rollback closes the session. + */ + void clearClientSession() { + this.clientSession = null; + } + + /** + * Closes and detaches the {@link ClientSession} if one is still attached. Used defensively when a + * transaction did not complete through {@link MongoTransaction}, so a session is never leaked. + */ + protected void closeClientSessionQuietly() { + if (clientSession != null) { + try { + clientSession.close(); + } + catch (RuntimeException ignored) { + // best effort + } + finally { + clientSession = null; + } + } + } + + @Override + public void disconnect() { + try { + closeClientSessionQuietly(); + } + finally { + super.disconnect(); + } + } + + @Override + protected Transaction beginTransactionInternal() { + if (getDatastore().isTransactionsEnabled()) { + // Defensive: if a previous transaction did not complete cleanly, close its orphaned + // session before starting a new one so it cannot leak. + closeClientSessionQuietly(); + ClientSession session = getNativeInterface().startSession(); + try { + session.startTransaction(); + } + catch (RuntimeException e) { + session.close(); + throw e; + } + this.clientSession = session; + return new MongoTransaction(this, session); + } + return new SessionOnlyTransaction<>(getNativeInterface(), this); + } + + // The driver exposes a session-less and a ClientSession overload for every operation, and the + // session argument cannot be null. These helpers branch once so call sites stay readable and + // behave identically (session-less) when no transaction is active. + + @SuppressWarnings({"rawtypes", "unchecked"}) + public BulkWriteResult bulkWrite(com.mongodb.client.MongoCollection collection, List writes) { + return clientSession != null ? collection.bulkWrite(clientSession, writes) : collection.bulkWrite(writes); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public DeleteResult deleteMany(com.mongodb.client.MongoCollection collection, Bson filter) { + return clientSession != null ? collection.deleteMany(clientSession, filter) : collection.deleteMany(filter); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public UpdateResult updateMany(com.mongodb.client.MongoCollection collection, Bson filter, Bson update, UpdateOptions options) { + return clientSession != null ? collection.updateMany(clientSession, filter, update, options) : collection.updateMany(filter, update, options); + } + + public FindIterable find(com.mongodb.client.MongoCollection collection, Bson filter) { + return clientSession != null ? collection.find(clientSession, filter) : collection.find(filter); + } + + public FindIterable find(com.mongodb.client.MongoCollection collection, Bson filter, Class resultClass) { + return clientSession != null ? collection.find(clientSession, filter, resultClass) : collection.find(filter, resultClass); + } + + public AggregateIterable aggregate(com.mongodb.client.MongoCollection collection, List pipeline) { + return clientSession != null ? collection.aggregate(clientSession, pipeline) : collection.aggregate(pipeline); + } + + public T findOneAndDelete(com.mongodb.client.MongoCollection collection, Bson filter) { + return clientSession != null ? collection.findOneAndDelete(clientSession, filter) : collection.findOneAndDelete(filter); + } + + public T findOneAndDelete(com.mongodb.client.MongoCollection collection, Bson filter, FindOneAndDeleteOptions options) { + return clientSession != null ? collection.findOneAndDelete(clientSession, filter, options) : collection.findOneAndDelete(filter, options); + } + + public long countDocuments(com.mongodb.client.MongoCollection collection, Bson filter) { + return clientSession != null ? collection.countDocuments(clientSession, filter) : collection.countDocuments(filter); + } + /** * Decodes the given entity type from the given native object type * diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoCodecSession.groovy b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoCodecSession.groovy index 29ae2d06635..b721929640a 100644 --- a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoCodecSession.groovy +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoCodecSession.groovy @@ -67,8 +67,6 @@ import org.grails.datastore.mapping.mongo.engine.codecs.PersistentEntityCodec import org.grails.datastore.mapping.mongo.query.MongoQuery import org.grails.datastore.mapping.query.Query import org.grails.datastore.mapping.query.api.QueryableCriteria -import org.grails.datastore.mapping.transactions.SessionOnlyTransaction -import org.grails.datastore.mapping.transactions.Transaction /** * A MongoDB session for codec mapping style @@ -236,8 +234,7 @@ class MongoCodecSession extends AbstractMongoSession { final List> writes = writeModels[persistentEntity] if (writes) { - final BulkWriteResult bulkWriteResult = collection - .bulkWrite(writes) + final BulkWriteResult bulkWriteResult = bulkWrite(collection, writes) final boolean isAcknowledged = wc.isAcknowledged() if (!bulkWriteResult.wasAcknowledged() && isAcknowledged) { @@ -306,11 +303,6 @@ class MongoCodecSession extends AbstractMongoSession { MongoIdCoercion.coerceIdToStoredType(nativeKey, entity) } - @Override - protected Transaction beginTransactionInternal() { - return new SessionOnlyTransaction(getNativeInterface(), this) - } - @Override protected MongoCodecEntityPersister createPersister(Class cls, MappingContext mappingContext) { return mongoCodecEntityPersisterMap[cls] @@ -322,7 +314,7 @@ class MongoCodecSession extends AbstractMongoSession { final Document nativeQuery = buildNativeDocumentQueryFromCriteria(criteria, entity) final MongoCollection collection = getCollection(entity) - final DeleteResult deleteResult = collection.deleteMany((Bson) nativeQuery) + final DeleteResult deleteResult = deleteMany(collection, (Bson) nativeQuery) if (deleteResult.wasAcknowledged()) { return deleteResult.deletedCount } @@ -347,7 +339,7 @@ class MongoCodecSession extends AbstractMongoSession { } } } - final UpdateResult updateResult = collection.updateMany(nativeQuery, new Document(MONGO_SET_OPERATOR, properties), updateOptions) + final UpdateResult updateResult = updateMany(collection, nativeQuery, new Document(MONGO_SET_OPERATOR, properties), updateOptions) if (updateResult.wasAcknowledged()) { try { return updateResult.modifiedCount diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoDatastore.java b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoDatastore.java index a5a1116c0e9..2139c361053 100644 --- a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoDatastore.java +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoDatastore.java @@ -35,6 +35,7 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoIterable; import com.mongodb.client.model.IndexOptions; +import com.mongodb.connection.ClusterType; import org.bson.Document; import org.bson.codecs.Codec; import org.bson.codecs.configuration.CodecProvider; @@ -133,6 +134,9 @@ public class MongoDatastore extends AbstractDatastore implements MappingContext. protected final Map mongoDatabases = new ConcurrentHashMap<>(); protected final boolean stateless; protected final boolean codecEngine; + protected final boolean transactionsEnabled; + private volatile Boolean transactionsSupported; + private volatile boolean warnedTransactionsUnsupported = false; protected CodecRegistry codecRegistry; protected final ConfigurableApplicationEventPublisher eventPublisher; protected final PlatformTransactionManager transactionManager; @@ -173,6 +177,7 @@ public MongoDatastore(final ConnectionSources collection = getCollection(persistentEntity); final WriteConcern wc = getWriteConcern(); if (wc != null) { collection = collection.withWriteConcern(wc); @@ -207,8 +204,7 @@ public void flush(WriteConcern writeConcern) { final List> writes = writeModels.get(persistentEntity); if (!writes.isEmpty()) { - final com.mongodb.bulk.BulkWriteResult bulkWriteResult = collection - .bulkWrite(writes); + final com.mongodb.bulk.BulkWriteResult bulkWriteResult = bulkWrite(collection, writes); if (!bulkWriteResult.wasAcknowledged()) { errorOccured = true; @@ -286,11 +282,6 @@ protected Persister createPersister(@SuppressWarnings("rawtypes") Class cls, Map return entity == null ? null : new MongoEntityPersister(mappingContext, entity, this, publisher); } - @Override - protected Transaction beginTransactionInternal() { - return new SessionOnlyTransaction<>(getNativeInterface(), this); - } - @Override public void delete(Iterable objects) { final Map toDelete = getDeleteMap(objects); @@ -342,8 +333,8 @@ public long deleteAll(QueryableCriteria criteria) { final PersistentEntity entity = criteria.getPersistentEntity(); final Document nativeQuery = buildNativeDocumentQueryFromCriteria(criteria, entity); - final com.mongodb.client.MongoCollection collection = getCollection(entity); - final DeleteResult deleteResult = collection.deleteMany(nativeQuery); + final com.mongodb.client.MongoCollection collection = getCollection(entity); + final DeleteResult deleteResult = deleteMany(collection, nativeQuery); if (deleteResult.wasAcknowledged()) { return deleteResult.getDeletedCount(); } @@ -356,10 +347,10 @@ public long deleteAll(QueryableCriteria criteria) { public long updateAll(QueryableCriteria criteria, Map properties) { final PersistentEntity entity = criteria.getPersistentEntity(); final Document nativeQuery = buildNativeDocumentQueryFromCriteria(criteria, entity); - final com.mongodb.client.MongoCollection collection = getCollection(entity); + final com.mongodb.client.MongoCollection collection = getCollection(entity); final UpdateOptions updateOptions = new UpdateOptions(); updateOptions.upsert(false); - final UpdateResult updateResult = collection.updateMany(nativeQuery, new Document("$set", properties), updateOptions); + final UpdateResult updateResult = updateMany(collection, nativeQuery, new Document("$set", properties), updateOptions); if (updateResult.wasAcknowledged()) { try { return updateResult.getModifiedCount(); diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoTransaction.java b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoTransaction.java new file mode 100644 index 00000000000..267271bb363 --- /dev/null +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoTransaction.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.grails.datastore.mapping.mongo; + +import com.mongodb.MongoException; +import com.mongodb.client.ClientSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.grails.datastore.mapping.transactions.Transaction; + +/** + * A {@link Transaction} backed by a real MongoDB multi-document transaction on a + * {@link ClientSession}. Unlike the legacy + * {@link org.grails.datastore.mapping.transactions.SessionOnlyTransaction} (which only flushes the + * GORM session), this commits or aborts a server-side transaction so multiple writes are atomic. + * + *

The {@link ClientSession} is started with an active transaction before this object is + * constructed; {@link #commit()} flushes the GORM session (a no-op when the + * {@link org.grails.datastore.mapping.transactions.DatastoreTransactionManager} already flushed) + * and commits the server transaction, while {@link #rollback()} aborts it. Both close the + * {@link ClientSession} and detach it from the owning session.

+ * + * @since 8.0 + */ +public class MongoTransaction implements Transaction { + + /** + * Maximum number of times {@link ClientSession#commitTransaction()} is retried when the server + * reports an {@code UnknownTransactionCommitResult} (i.e. the commit outcome is unknown and the + * operation is safe to retry). + */ + private static final int MAX_COMMIT_RETRIES = 3; + + private static final Logger LOG = LoggerFactory.getLogger(MongoTransaction.class); + + private static volatile boolean warnedTimeoutIgnored = false; + + private final AbstractMongoSession session; + private final ClientSession clientSession; + private boolean active = true; + + public MongoTransaction(AbstractMongoSession session, ClientSession clientSession) { + this.session = session; + this.clientSession = clientSession; + } + + @Override + public void commit() { + if (!active) { + return; + } + boolean committed = false; + try { + // Flush pending GORM operations into the active transaction. When driven by the + // DatastoreTransactionManager the session was already flushed, so this clears nothing + // and is a no-op; it covers callers that commit the transaction directly. + session.flush(); + commitWithRetry(); + committed = true; + } finally { + if (!committed) { + // The commit failed and the server transaction is aborted/unknown. Discard the GORM + // session's pending operations and first-level cache so a reused session cannot return + // entities that were never committed. + try { + session.clear(); + } + catch (RuntimeException e) { + LOG.debug("Error clearing session after failed transaction commit: {}", e.getMessage(), e); + } + } + close(); + } + } + + @Override + public void rollback() { + if (!active) { + return; + } + try { + if (clientSession.hasActiveTransaction()) { + clientSession.abortTransaction(); + } + } finally { + close(); + } + } + + @Override + public ClientSession getNativeTransaction() { + return clientSession; + } + + @Override + public boolean isActive() { + return active; + } + + @Override + public void setTimeout(int timeout) { + // The transaction is started before the manager applies a timeout, so a per-transaction + // timeout cannot be applied to the server-side transaction; the server enforces its own + // transactionLifetimeLimitSeconds instead. Warn once so a configured timeout is not silently + // ignored. + if (!warnedTimeoutIgnored) { + warnedTimeoutIgnored = true; + LOG.warn("A per-transaction timeout was requested but GORM for MongoDB does not apply it to the " + + "server-side MongoDB transaction; the server's transactionLifetimeLimitSeconds applies instead."); + } + } + + private void commitWithRetry() { + int attempts = 0; + while (true) { + try { + clientSession.commitTransaction(); + return; + } + catch (MongoException e) { + if (attempts++ < MAX_COMMIT_RETRIES && + e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) { + continue; + } + throw e; + } + } + } + + private void close() { + active = false; + try { + clientSession.close(); + } + finally { + session.clearClientSession(); + } + } +} diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/connections/AbstractMongoConnectionSourceSettings.groovy b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/connections/AbstractMongoConnectionSourceSettings.groovy index 025f4be9250..09f6045427e 100644 --- a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/connections/AbstractMongoConnectionSourceSettings.groovy +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/connections/AbstractMongoConnectionSourceSettings.groovy @@ -80,6 +80,16 @@ abstract class AbstractMongoConnectionSourceSettings extends ConnectionSourceSet */ boolean stateless = false + /** + * Whether GORM should use real MongoDB multi-document transactions (a server-side + * {@code ClientSession}) for transactional operations. Requires a replica set or sharded + * cluster. When {@code false} (the default) a GORM transaction remains a client-side flush + * boundary, preserving the historical behavior. Bound from {@code grails.mongodb.transactional}. + * + * @since 8.0 + */ + boolean transactional = false + /** * Whether to use the decimal128 type for BigDecimal values * diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoCodecEntityPersister.groovy b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoCodecEntityPersister.groovy index 7178b6b0029..c876adf9473 100644 --- a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoCodecEntityPersister.groovy +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoCodecEntityPersister.groovy @@ -156,11 +156,10 @@ class MongoCodecEntityPersister extends ThirdPartyCacheEntityPersister { return null } else { MongoCollection mongoCollection = getMongoCollection(pe) - Document idQuery = createIdQuery(coerceIdToStoredType(key, pe)) - o = mongoCollection .withDocumentClass(persistentEntity.javaClass) .withCodecRegistry(mongoDatastore.codecRegistry) - .find(idQuery, pe.javaClass) + Document idQuery = createIdQuery(coerceIdToStoredType(key, pe)) + o = mongoSession.find(mongoCollection, idQuery, pe.javaClass) .limit(1) .first() diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoEntityPersister.java b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoEntityPersister.java index 1b025f8f2c0..8faae47a255 100644 --- a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoEntityPersister.java +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/engine/MongoEntityPersister.java @@ -300,7 +300,7 @@ protected Document retrieveEntry(final PersistentEntity persistentEntity, .getNativeInterface() .getDatabase(mongoSession.getDatabase(persistentEntity)) .getCollection(mongoSession.getCollectionName(persistentEntity)); - return collection.find(createDBObjectWithKey(key)).limit(1).first(); + return mongoSession.find(collection, createDBObjectWithKey(key)).limit(1).first(); } private Document removeNullEntries(Document nativeEntry) { @@ -403,7 +403,7 @@ protected void deleteEntries(String family, final List keys) { MongoQuery query = (MongoQuery) mongoSession.createQuery(getPersistentEntity().getJavaClass()); query.in(getPersistentEntity().getIdentity().getName(), keys); - dbCollection.deleteMany(query.getMongoQuery()); + mongoSession.deleteMany(dbCollection, query.getMongoQuery()); } diff --git a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/query/MongoQuery.java b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/query/MongoQuery.java index 75522c20284..59b23545748 100644 --- a/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/query/MongoQuery.java +++ b/grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/query/MongoQuery.java @@ -489,13 +489,12 @@ protected List executeQuery(final PersistentEntity entity, final Junction criter } final Object dbObject; if (criteria.isEmpty()) { - FindIterable cursor = collection - .find(createQueryObject(entity)); + FindIterable cursor = mongoSession.find(collection, createQueryObject(entity)); dbObject = ((FindIterable) setHint(cursor)).limit(1) .first(); } else { - FindIterable cursor = collection.find(getMongoQuery()); + FindIterable cursor = mongoSession.find(collection, getMongoQuery()); dbObject = ((FindIterable) setHint(cursor)).limit(1) .first(); @@ -536,7 +535,7 @@ protected List executeQuery(final PersistentEntity entity, final Junction criter List projectedKeys = aggregatePipeline.getProjectedKeys(); List projectedResults = new ArrayList(); - AggregateIterable aggregatedResults = collection.aggregate(aggregationPipeline); + AggregateIterable aggregatedResults = mongoSession.aggregate(collection, aggregationPipeline); aggregatedResults = (AggregateIterable) setHint(aggregatedResults); final MongoCursor aggregateCursor = aggregatedResults.iterator(); @@ -619,7 +618,7 @@ protected FindIterable executeQueryAndApplyPagination(com.mongodb.clie ); } - final FindIterable iterable = collection.find(query); + final FindIterable iterable = mongoSession.find(collection, query); if (offset > 0) { iterable.skip(offset); } diff --git a/grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionDisabledSpec.groovy b/grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionDisabledSpec.groovy new file mode 100644 index 00000000000..029f7c64239 --- /dev/null +++ b/grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionDisabledSpec.groovy @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.grails.datastore.gorm.mongo.transactions + +import grails.gorm.annotation.Entity + +import org.apache.grails.testing.mongo.AutoStartedMongoSpec +import org.grails.datastore.mapping.mongo.MongoDatastore +import spock.lang.AutoCleanup +import spock.lang.Shared + +/** + * Verifies that with {@code grails.mongodb.transactional} left at its default (disabled), GORM keeps + * the legacy client-side flush behavior: server-side transactions are not used, so writes already + * flushed within a transaction are not rolled back. This is the non-breaking fallback contract. + */ +class MongoTransactionDisabledSpec extends AutoStartedMongoSpec { + + @Shared + @AutoCleanup + MongoDatastore datastore + + @Override + boolean shouldInitializeDatastore() { + false + } + + void setupSpec() { + // No grails.mongodb.transactional => default false + datastore = new MongoDatastore(['grails.mongodb.url': dbContainer.getReplicaSetUrl('myDb')] as Map, LegacyThing) + } + + void setup() { + LegacyThing.withNewSession { LegacyThing.DB.drop() } + } + + void "test transactions are disabled by default"() { + expect: + !datastore.isTransactionsEnabled() + } + + void "test flushed writes are not rolled back when transactions are disabled (legacy behavior)"() { + when: "a document is flushed inside a transaction that then fails" + LegacyThing.withTransaction { + new LegacyThing(name: "flushed").save(flush: true) + throw new RuntimeException("boom") + } + + then: + thrown(RuntimeException) + + and: "the already-flushed write remains, because there was no server-side transaction to abort" + LegacyThing.withNewSession { LegacyThing.count() } == 1 + } +} + +@Entity +class LegacyThing { + String name +} diff --git a/grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionSpec.groovy b/grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionSpec.groovy new file mode 100644 index 00000000000..1485ed5f0b7 --- /dev/null +++ b/grails-data-mongodb/core/src/test/groovy/org/grails/datastore/gorm/mongo/transactions/MongoTransactionSpec.groovy @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.grails.datastore.gorm.mongo.transactions + +import grails.gorm.annotation.Entity + +import com.mongodb.client.model.Filters +import org.apache.grails.testing.mongo.AutoStartedMongoSpec +import org.grails.datastore.mapping.mongo.MongoDatastore +import org.springframework.transaction.TransactionDefinition +import spock.lang.AutoCleanup +import spock.lang.Shared + +/** + * Tests that GORM uses real MongoDB multi-document transactions (a server-side ClientSession) when + * {@code grails.mongodb.transactional} is enabled. + */ +class MongoTransactionSpec extends AutoStartedMongoSpec { + + @Shared + @AutoCleanup + MongoDatastore datastore + + @Override + boolean shouldInitializeDatastore() { + false + } + + void setupSpec() { + Map config = [ + 'grails.mongodb.url' : dbContainer.getReplicaSetUrl('myDb'), + 'grails.mongodb.transactional': true + ] + datastore = new MongoDatastore(config, TxPerson, TxPet) + } + + void setup() { + TxPerson.withNewSession { + TxPerson.DB.drop() + TxPet.DB.drop() + } + } + + void "test the datastore reports transactions enabled against a replica set"() { + expect: + datastore.isTransactionsEnabled() + } + + void "test a committed transaction persists all writes atomically"() { + when: "two documents are saved in one transaction" + TxPerson.withTransaction { + new TxPerson(name: "Fred").save() + new TxPerson(name: "Wilma").save() + } + + then: "both are persisted after commit" + TxPerson.withNewSession { TxPerson.count() } == 2 + } + + void "test a rolled back transaction discards all writes on the server"() { + when: "documents are flushed inside a transaction that then fails" + TxPerson.withTransaction { + new TxPerson(name: "Fred").save(flush: true) + new TxPerson(name: "Wilma").save(flush: true) + throw new RuntimeException("boom") + } + + then: "the exception propagates" + thrown(RuntimeException) + + and: "nothing was persisted - the server-side transaction was aborted, not merely the session cleared" + TxPerson.withNewSession { TxPerson.count() } == 0 + } + + void "test writes across multiple collections roll back together"() { + when: "a person and a pet are written before the transaction fails" + TxPerson.withTransaction { + new TxPerson(name: "Fred").save(flush: true) + new TxPet(name: "Dino").save(flush: true) + throw new RuntimeException("boom") + } + + then: + thrown(RuntimeException) + + and: "neither collection retains the write" + TxPerson.withNewSession { TxPerson.count() } == 0 + TxPet.withNewSession { TxPet.count() } == 0 + } + + void "test read-your-writes within an active transaction"() { + expect: "a query inside the transaction sees its own uncommitted write" + TxPerson.withTransaction { + new TxPerson(name: "Fred").save(flush: true) + TxPerson.count() == 1 + } + + and: "and it is visible to other sessions after commit" + TxPerson.withNewSession { TxPerson.count() } == 1 + } + + void "test a findOneAndDelete via the MongoEntity API participates in the transaction"() { + given: "an existing committed document" + TxPerson.withNewSession { + new TxPerson(name: "Fred").save(flush: true) + } + + when: "it is deleted inside a transaction that then fails" + TxPerson.withTransaction { + TxPerson.findOneAndDelete(Filters.eq("name", "Fred")) + throw new RuntimeException("boom") + } + + then: + thrown(RuntimeException) + + and: "the delete was rolled back rather than auto-committing outside the transaction" + TxPerson.withNewSession { TxPerson.count() } == 1 + } + + void "test a REQUIRES_NEW inner transaction commits independently of a rolled back outer transaction"() { + when: "an inner REQUIRES_NEW transaction commits while the outer transaction rolls back" + TxPerson.withTransaction { + new TxPerson(name: "outer").save(flush: true) + TxPerson.withTransaction([propagationBehavior: TransactionDefinition.PROPAGATION_REQUIRES_NEW]) { + new TxPerson(name: "inner").save(flush: true) + } + throw new RuntimeException("rollback outer") + } + + then: + thrown(RuntimeException) + + and: "only the inner transaction's write survived - suspend/resume kept the two sessions separate" + TxPerson.withNewSession { TxPerson.findAll()*.name } == ["inner"] + } +} + +@Entity +class TxPerson { + String name +} + +@Entity +class TxPet { + String name +} diff --git a/grails-data-mongodb/docs/src/docs/asciidoc/gettingStarted/advancedConfig.adoc b/grails-data-mongodb/docs/src/docs/asciidoc/gettingStarted/advancedConfig.adoc index c333acf4fdc..ef8ff1d8eb4 100644 --- a/grails-data-mongodb/docs/src/docs/asciidoc/gettingStarted/advancedConfig.adoc +++ b/grails-data-mongodb/docs/src/docs/asciidoc/gettingStarted/advancedConfig.adoc @@ -124,3 +124,32 @@ grails.mongodb.default.mapping = { The `*` method is used to indicate that the setting applies to all properties. You can also set a global default for the storage type of `String id` fields via `grails.mongodb.stringIds.defaultStoredAs` (values `string` or `objectid`). See <> for details. + +==== Multi-Document Transactions + + +By default a GORM transaction for MongoDB is a client-side unit of work: pending inserts, updates and deletes are batched and flushed to the server when the transaction commits, but each write is applied individually and is not rolled back if a later operation fails. + +Since MongoDB 4.0, the server supports multi-document ACID transactions. To have GORM run transactional operations inside a real server-side transaction — so that all writes within a `withTransaction` block (or a `@Transactional` service method) commit or roll back atomically — enable: + +[source,groovy] +---- +grails { + mongodb { + transactional = true + } +} +---- + +With this enabled, a GORM transaction starts a MongoDB session and transaction; on commit all buffered writes are committed together, and on rollback they are discarded on the server: + +[source,groovy] +---- +Person.withTransaction { + new Person(name: "Fred").save() + new Person(name: "Wilma").save() + // both inserts commit together, or neither is applied if an exception is thrown +} +---- + +NOTE: Multi-document transactions require a replica set or a sharded cluster; they are not supported on a standalone `mongod`. If `transactional` is enabled but a standalone topology is detected, GORM logs a warning once and falls back to the default client-side flush behavior. Identifier generation for native (`Long`) identity uses an independent counter and is intentionally not enrolled in the transaction, mirroring the non-transactional semantics of database sequences.