From 0011c6d05aec8facfb4cd123fbbd5b3ed6b1ffd7 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 28 May 2026 13:03:19 +0200 Subject: [PATCH 1/2] multiple function calls within single transaction --- .../co/absa/db/fadb/doobie/DoobieEngine.scala | 12 ++ .../absa/db/fadb/doobie/DoobieFunction.scala | 83 ++++++++++- ...ansactionCompositionIntegrationTests.scala | 140 ++++++++++++++++++ 3 files changed, 229 insertions(+), 6 deletions(-) create mode 100644 doobie/src/test/scala/za/co/absa/db/fadb/doobie/DoobieTransactionCompositionIntegrationTests.scala diff --git a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala index 90fc6d43..c49a5de0 100644 --- a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala +++ b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala @@ -86,4 +86,16 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[ override def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[FailedOrRow[R]]] = { executeQueryWithStatus(query)(query.readStatusWithData) } + + /** + * Executes an arbitrary `ConnectionIO` program in a single transaction. + * This enables composing multiple database operations (including multiple fa-db function calls) + * into a single atomic transaction. + * + * @param cio the `ConnectionIO` program to execute + * @tparam R the result type of the program + * @return the result wrapped in the effect type `F` + */ + def runConnectionIO[R](cio: ConnectionIO[R]): F[R] = + cio.transact(transactor) } diff --git a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala index fdf8d3bb..6be2e7a8 100644 --- a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala +++ b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala @@ -17,13 +17,14 @@ package za.co.absa.db.fadb.doobie import cats.MonadError -import doobie.implicits.toSqlInterpolator +import doobie.ConnectionIO +import doobie.implicits._ import doobie.util.Read import doobie.util.fragment.Fragment import za.co.absa.db.fadb.DBFunction._ import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.exceptions.StatusException -import za.co.absa.db.fadb.status.FunctionStatus +import za.co.absa.db.fadb.status.{FailedOrRow, FunctionStatus} import scala.language.higherKinds @@ -119,6 +120,19 @@ trait DoobieFunction[I, R, F[_]] extends DoobieFunctionBase[R] { meSql(values, selectEntry, functionName, alias)(read, me) } + /** + * Returns the database function call as a `ConnectionIO[Seq[R]]` without executing a transaction. + * This enables composing multiple function calls into a single transaction using for-comprehensions + * over `ConnectionIO`, and then executing them atomically via `DoobieEngine.runConnectionIO`. + * + * @param values the input values for the function + * @return the `ConnectionIO[Seq[R]]` representing the database function call + */ + def toConnectionIO(values: I): ConnectionIO[Seq[R]] = { + val fragments = toFragmentsSeq(values) + composeFragments(fragments).query[R].to[Seq] + } + } trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] { @@ -190,6 +204,21 @@ trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] { // This is to be mixed in by an implementation of StatusHandling def checkStatus(functionStatus: FunctionStatus): Option[StatusException] + + /** + * Returns the database function call as a `ConnectionIO[Seq[FailedOrRow[R]]]` without executing a transaction. + * This enables composing multiple function calls into a single transaction using for-comprehensions + * over `ConnectionIO`, and then executing them atomically via `DoobieEngine.runConnectionIO`. + * + * @param values the input values for the function + * @return the `ConnectionIO[Seq[FailedOrRow[R]]]` representing the database function call + */ + def toConnectionIO(values: I): ConnectionIO[Seq[FailedOrRow[R]]] = { + val fragments = toFragmentsSeq(values) + val fragment = composeFragments(fragments) + val queryWithStatus = new DoobieQueryWithStatus[R](fragment, checkStatus) + fragment.query[StatusWithData[R]].to[Seq].map(_.map(queryWithStatus.getResultOrException)) + } } /** @@ -216,7 +245,18 @@ object DoobieFunction { val dbEngine: DoobieEngine[F], val readR: Read[R] ) extends DBSingleResultFunction[I, R, DoobieEngine[F], F](functionNameOverride) - with DoobieFunction[I, R, F] + with DoobieFunction[I, R, F] { + + /** + * Returns the database function call as a `ConnectionIO[R]` expecting exactly one result row. + * @param values the input values for the function + * @return the `ConnectionIO[R]` representing the database function call + */ + def toConnectionIOSingle(values: I): ConnectionIO[R] = { + val fragments = toFragmentsSeq(values) + composeFragments(fragments).query[R].unique + } + } /** * `DoobieSingleResultFunctionWithStatus` represents a db function that returns a single result with status. @@ -238,7 +278,17 @@ object DoobieFunction { val readR: Read[R], val readStatusWithData: Read[StatusWithData[R]] ) extends DBSingleResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride) - with DoobieFunctionWithStatus[I, R, F] + with DoobieFunctionWithStatus[I, R, F] { + + /** + * Returns the database function call as a `ConnectionIO[FailedOrRow[R]]` expecting exactly one result row. + * @param values the input values for the function + * @return the `ConnectionIO[FailedOrRow[R]]` representing the database function call + */ + def toConnectionIOSingle(values: I): ConnectionIO[FailedOrRow[R]] = { + toConnectionIO(values).map(_.head) + } + } /** * `DoobieMultipleResultFunction` represents a db function that returns multiple results. @@ -296,7 +346,18 @@ object DoobieFunction { val dbEngine: DoobieEngine[F], val readR: Read[R] ) extends DBOptionalResultFunction[I, R, DoobieEngine[F], F](functionNameOverride) - with DoobieFunction[I, R, F] + with DoobieFunction[I, R, F] { + + /** + * Returns the database function call as a `ConnectionIO[Option[R]]` expecting zero or one result rows. + * @param values the input values for the function + * @return the `ConnectionIO[Option[R]]` representing the database function call + */ + def toConnectionIOOptional(values: I): ConnectionIO[Option[R]] = { + val fragments = toFragmentsSeq(values) + composeFragments(fragments).query[R].option + } + } /** * `DoobieOptionalResultFunctionWithStatus` represents a db function that returns an optional result. @@ -310,5 +371,15 @@ object DoobieFunction { val readR: Read[R], val readStatusWithData: Read[StatusWithData[R]] ) extends DBOptionalResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride) - with DoobieFunctionWithStatus[I, R, F] + with DoobieFunctionWithStatus[I, R, F] { + + /** + * Returns the database function call as a `ConnectionIO[Option[FailedOrRow[R]]]` expecting zero or one result rows. + * @param values the input values for the function + * @return the `ConnectionIO[Option[FailedOrRow[R]]]` representing the database function call + */ + def toConnectionIOOptional(values: I): ConnectionIO[Option[FailedOrRow[R]]] = { + toConnectionIO(values).map(_.headOption) + } + } } diff --git a/doobie/src/test/scala/za/co/absa/db/fadb/doobie/DoobieTransactionCompositionIntegrationTests.scala b/doobie/src/test/scala/za/co/absa/db/fadb/doobie/DoobieTransactionCompositionIntegrationTests.scala new file mode 100644 index 00000000..f087f35b --- /dev/null +++ b/doobie/src/test/scala/za/co/absa/db/fadb/doobie/DoobieTransactionCompositionIntegrationTests.scala @@ -0,0 +1,140 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.db.fadb.doobie + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import cats.implicits._ +import doobie.ConnectionIO +import doobie.implicits._ +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieFunction.{DoobieMultipleResultFunction, DoobieOptionalResultFunction, DoobieSingleResultFunctionWithStatus} +import za.co.absa.db.fadb.status.FailedOrRow +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.db.fadb.testing.classes.DoobieTest + +class DoobieTransactionCompositionIntegrationTests extends AnyFunSuite with DoobieTest { + + private val engine = new DoobieEngine(transactor) + + class CreateActor(implicit schema: DBSchema, dbEngine: DoobieEngine[IO]) + extends DoobieSingleResultFunctionWithStatus[CreateActorRequestBody, Int, IO]( + values => Seq(fr"${values.firstName}", fr"${values.lastName}") + ) + with StandardStatusHandling { + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("o_actor_id") + } + + class GetActorById(implicit schema: DBSchema, dbEngine: DoobieEngine[IO]) + extends DoobieOptionalResultFunction[Int, Actor, IO](id => Seq(fr"$id")) + + class GetActors(implicit schema: DBSchema, dbEngine: DoobieEngine[IO]) + extends DoobieMultipleResultFunction[GetActorsQueryParameters, Actor, IO]( + values => Seq(fr"${values.firstName}", fr"${values.lastName}") + ) + + private val createActor = new CreateActor()(Integration, engine) + private val getActorById = new GetActorById()(Integration, engine) + private val getActors = new GetActors()(Integration, engine) + + /** Lifts a FailedOrRow into ConnectionIO — raises StatusException on Left, unwraps data on Right */ + private def liftFailedOrRow[R](result: FailedOrRow[R]): ConnectionIO[R] = result match { + case Right(row) => row.data.pure[ConnectionIO] + case Left(ex) => doobie.FC.raiseError(ex) + } + + test("Compose two function calls into a single transaction using toConnectionIO") { + val uniqueFirst = s"TxTest_${System.currentTimeMillis()}" + val uniqueLast = "CompositionTest" + + val program: ConnectionIO[(Int, Option[Actor])] = for { + created <- createActor.toConnectionIO(CreateActorRequestBody(uniqueFirst, uniqueLast)) + actorId <- liftFailedOrRow(created.head) + found <- getActorById.toConnectionIOOptional(actorId) + } yield (actorId, found) + + val (actorId, foundActor) = engine.runConnectionIO(program).unsafeRunSync() + + assert(actorId > 0) + assert(foundActor.isDefined) + assert(foundActor.get.firstName == uniqueFirst) + assert(foundActor.get.lastName == uniqueLast) + } + + test("Compose using toConnectionIOSingle convenience method") { + val uniqueFirst = s"TxSingle_${System.currentTimeMillis()}" + val uniqueLast = "SingleTest" + + val program: ConnectionIO[Actor] = for { + _ <- createActor.toConnectionIOSingle(CreateActorRequestBody(uniqueFirst, uniqueLast)) + actors <- getActors.toConnectionIO(GetActorsQueryParameters(Some(uniqueFirst), Some(uniqueLast))) + } yield actors.head + + val actor = engine.runConnectionIO(program).unsafeRunSync() + + assert(actor.firstName == uniqueFirst) + assert(actor.lastName == uniqueLast) + } + + test("Compose multiple creates in a single transaction") { + val timestamp = System.currentTimeMillis() + val lastName = s"BatchTest_$timestamp" + + val program: ConnectionIO[Unit] = for { + _ <- createActor.toConnectionIOSingle(CreateActorRequestBody("BatchActor1", lastName)) + _ <- createActor.toConnectionIOSingle(CreateActorRequestBody("BatchActor2", lastName)) + _ <- createActor.toConnectionIOSingle(CreateActorRequestBody("BatchActor3", lastName)) + } yield () + + engine.runConnectionIO(program).unsafeRunSync() + + // Verify all actors exist — they were created in a single transaction + val actors = getActors(GetActorsQueryParameters(None, Some(lastName))).unsafeRunSync() + assert(actors.size >= 3) + } + + test("Transaction rolls back all operations on failure") { + val uniqueFirst = s"Rollback_${System.currentTimeMillis()}" + val uniqueLast = "RollbackTest" + + val program: ConnectionIO[Unit] = for { + _ <- createActor.toConnectionIOSingle(CreateActorRequestBody(uniqueFirst, uniqueLast)) + _ <- doobie.FC.raiseError[Unit](new RuntimeException("Simulated failure after create")) + } yield () + + val result = engine.runConnectionIO(program).attempt.unsafeRunSync() + assert(result.isLeft, "Transaction should have failed") + + // The actor should NOT exist because the transaction was rolled back + val actors = getActors(GetActorsQueryParameters(Some(uniqueFirst), Some(uniqueLast))).unsafeRunSync() + assert(actors.isEmpty, "Actor should not exist after transaction rollback") + } + + test("Individual apply() calls still work independently (backward compatibility)") { + val uniqueFirst = s"Compat_${System.currentTimeMillis()}" + val uniqueLast = "CompatTest" + + val result = createActor(CreateActorRequestBody(uniqueFirst, uniqueLast)).unsafeRunSync() + assert(result.isRight) + + val actorId = result.toOption.get.data + val found = getActorById(actorId).unsafeRunSync() + assert(found.isDefined) + assert(found.get.firstName == uniqueFirst) + } +} From c68774fc68499c0e3646b650fb49fc5b57725d07 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 12 Jun 2026 11:50:47 +0200 Subject: [PATCH 2/2] pr comments addressed --- .../co/absa/db/fadb/doobie/DoobieEngine.scala | 18 ++++++++++ .../absa/db/fadb/doobie/DoobieFunction.scala | 33 ++++++++++++------- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala index c49a5de0..93e1bec6 100644 --- a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala +++ b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieEngine.scala @@ -99,3 +99,21 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[ def runConnectionIO[R](cio: ConnectionIO[R]): F[R] = cio.transact(transactor) } + +object DoobieEngine { + + /** + * Executes an arbitrary `ConnectionIO` program in a single transaction, + * using an implicit `DoobieEngine` instance. + * Useful in repository classes where the engine is not directly available + * but is in implicit scope. + * + * @param cio the `ConnectionIO` program to execute + * @param engine the `DoobieEngine` instance (typically implicit) + * @tparam R the result type of the program + * @tparam F the effect type + * @return the result wrapped in the effect type `F` + */ + def runConnectionIO[R, F[_]: Async](cio: ConnectionIO[R])(implicit engine: DoobieEngine[F]): F[R] = + engine.runConnectionIO(cio) +} diff --git a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala index 6be2e7a8..3ae975c6 100644 --- a/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala +++ b/doobie/src/main/scala/za/co/absa/db/fadb/doobie/DoobieFunction.scala @@ -17,7 +17,7 @@ package za.co.absa.db.fadb.doobie import cats.MonadError -import doobie.ConnectionIO +import doobie.{ConnectionIO, FC} import doobie.implicits._ import doobie.util.Read import doobie.util.fragment.Fragment @@ -129,8 +129,10 @@ trait DoobieFunction[I, R, F[_]] extends DoobieFunctionBase[R] { * @return the `ConnectionIO[Seq[R]]` representing the database function call */ def toConnectionIO(values: I): ConnectionIO[Seq[R]] = { - val fragments = toFragmentsSeq(values) - composeFragments(fragments).query[R].to[Seq] + val fragments = FC.delay(toFragmentsSeq(values)) + fragments + .flatMap(frs => FC.delay(composeFragments(frs))) + .flatMap(fr => fr.query[R].to[Seq]) } } @@ -214,10 +216,15 @@ trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] { * @return the `ConnectionIO[Seq[FailedOrRow[R]]]` representing the database function call */ def toConnectionIO(values: I): ConnectionIO[Seq[FailedOrRow[R]]] = { - val fragments = toFragmentsSeq(values) - val fragment = composeFragments(fragments) - val queryWithStatus = new DoobieQueryWithStatus[R](fragment, checkStatus) - fragment.query[StatusWithData[R]].to[Seq].map(_.map(queryWithStatus.getResultOrException)) + val fragments = FC.delay(toFragmentsSeq(values)) + fragments + .flatMap(frs => FC.delay(composeFragments(frs))) + .flatMap { fr => + FC.delay(new DoobieQueryWithStatus[R](fr, checkStatus)) + } + .flatMap { queryWithStatus => + queryWithStatus.fragment.query[StatusWithData[R]].to[Seq].map(_.map(queryWithStatus.getResultOrException)) + } } } @@ -253,8 +260,10 @@ object DoobieFunction { * @return the `ConnectionIO[R]` representing the database function call */ def toConnectionIOSingle(values: I): ConnectionIO[R] = { - val fragments = toFragmentsSeq(values) - composeFragments(fragments).query[R].unique + val fragments = FC.delay(toFragmentsSeq(values)) + fragments + .flatMap(frs => FC.delay(composeFragments(frs))) + .flatMap(fr => fr.query[R].unique) } } @@ -354,8 +363,10 @@ object DoobieFunction { * @return the `ConnectionIO[Option[R]]` representing the database function call */ def toConnectionIOOptional(values: I): ConnectionIO[Option[R]] = { - val fragments = toFragmentsSeq(values) - composeFragments(fragments).query[R].option + val fragments = FC.delay(toFragmentsSeq(values)) + fragments + .flatMap(frs => FC.delay(composeFragments(frs))) + .flatMap(fr => fr.query[R].option) } }