Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,34 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[
override def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[FailedOrRow[R]]] = {
executeQueryWithStatus(query)(query.readStatusWithData)
}

/**
* Executes an arbitrary `ConnectionIO` program in a single transaction.
* This enables composing multiple database operations (including multiple fa-db function calls)
* into a single atomic transaction.
*
* @param cio the `ConnectionIO` program to execute
* @tparam R the result type of the program
* @return the result wrapped in the effect type `F`
*/
def runConnectionIO[R](cio: ConnectionIO[R]): F[R] =
cio.transact(transactor)
}

object DoobieEngine {

/**
* Executes an arbitrary `ConnectionIO` program in a single transaction,
* using an implicit `DoobieEngine` instance.
* Useful in repository classes where the engine is not directly available
* but is in implicit scope.
*
* @param cio the `ConnectionIO` program to execute
* @param engine the `DoobieEngine` instance (typically implicit)
* @tparam R the result type of the program
* @tparam F the effect type
* @return the result wrapped in the effect type `F`
*/
def runConnectionIO[R, F[_]: Async](cio: ConnectionIO[R])(implicit engine: DoobieEngine[F]): F[R] =
engine.runConnectionIO(cio)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package za.co.absa.db.fadb.doobie

import cats.MonadError
import doobie.implicits.toSqlInterpolator
import doobie.{ConnectionIO, FC}
import doobie.implicits._
import doobie.util.Read
import doobie.util.fragment.Fragment
import za.co.absa.db.fadb.DBFunction._
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.exceptions.StatusException
import za.co.absa.db.fadb.status.FunctionStatus
import za.co.absa.db.fadb.status.{FailedOrRow, FunctionStatus}

import scala.language.higherKinds

Expand Down Expand Up @@ -119,6 +120,21 @@ trait DoobieFunction[I, R, F[_]] extends DoobieFunctionBase[R] {
meSql(values, selectEntry, functionName, alias)(read, me)
}

/**
* Returns the database function call as a `ConnectionIO[Seq[R]]` without executing a transaction.
* This enables composing multiple function calls into a single transaction using for-comprehensions
* over `ConnectionIO`, and then executing them atomically via `DoobieEngine.runConnectionIO`.
*
* @param values the input values for the function
* @return the `ConnectionIO[Seq[R]]` representing the database function call
*/
def toConnectionIO(values: I): ConnectionIO[Seq[R]] = {
val fragments = FC.delay(toFragmentsSeq(values))
fragments
.flatMap(frs => FC.delay(composeFragments(frs)))
.flatMap(fr => fr.query[R].to[Seq])
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

}

trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] {
Expand Down Expand Up @@ -190,6 +206,26 @@ trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] {

// This is to be mixed in by an implementation of StatusHandling
def checkStatus(functionStatus: FunctionStatus): Option[StatusException]

/**
* Returns the database function call as a `ConnectionIO[Seq[FailedOrRow[R]]]` without executing a transaction.
* This enables composing multiple function calls into a single transaction using for-comprehensions
* over `ConnectionIO`, and then executing them atomically via `DoobieEngine.runConnectionIO`.
*
* @param values the input values for the function
* @return the `ConnectionIO[Seq[FailedOrRow[R]]]` representing the database function call
*/
def toConnectionIO(values: I): ConnectionIO[Seq[FailedOrRow[R]]] = {
val fragments = FC.delay(toFragmentsSeq(values))
fragments
.flatMap(frs => FC.delay(composeFragments(frs)))
.flatMap { fr =>
FC.delay(new DoobieQueryWithStatus[R](fr, checkStatus))
}
.flatMap { queryWithStatus =>
queryWithStatus.fragment.query[StatusWithData[R]].to[Seq].map(_.map(queryWithStatus.getResultOrException))
}
}
}

/**
Expand All @@ -216,7 +252,20 @@ object DoobieFunction {
val dbEngine: DoobieEngine[F],
val readR: Read[R]
) extends DBSingleResultFunction[I, R, DoobieEngine[F], F](functionNameOverride)
with DoobieFunction[I, R, F]
with DoobieFunction[I, R, F] {

/**
* Returns the database function call as a `ConnectionIO[R]` expecting exactly one result row.
* @param values the input values for the function
* @return the `ConnectionIO[R]` representing the database function call
*/
def toConnectionIOSingle(values: I): ConnectionIO[R] = {
val fragments = FC.delay(toFragmentsSeq(values))
fragments
.flatMap(frs => FC.delay(composeFragments(frs)))
.flatMap(fr => fr.query[R].unique)
}
}

/**
* `DoobieSingleResultFunctionWithStatus` represents a db function that returns a single result with status.
Expand All @@ -238,7 +287,17 @@ object DoobieFunction {
val readR: Read[R],
val readStatusWithData: Read[StatusWithData[R]]
) extends DBSingleResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride)
with DoobieFunctionWithStatus[I, R, F]
with DoobieFunctionWithStatus[I, R, F] {

/**
* Returns the database function call as a `ConnectionIO[FailedOrRow[R]]` expecting exactly one result row.
* @param values the input values for the function
* @return the `ConnectionIO[FailedOrRow[R]]` representing the database function call
*/
def toConnectionIOSingle(values: I): ConnectionIO[FailedOrRow[R]] = {
toConnectionIO(values).map(_.head)
}
}

/**
* `DoobieMultipleResultFunction` represents a db function that returns multiple results.
Expand Down Expand Up @@ -296,7 +355,20 @@ object DoobieFunction {
val dbEngine: DoobieEngine[F],
val readR: Read[R]
) extends DBOptionalResultFunction[I, R, DoobieEngine[F], F](functionNameOverride)
with DoobieFunction[I, R, F]
with DoobieFunction[I, R, F] {

/**
* Returns the database function call as a `ConnectionIO[Option[R]]` expecting zero or one result rows.
* @param values the input values for the function
* @return the `ConnectionIO[Option[R]]` representing the database function call
*/
def toConnectionIOOptional(values: I): ConnectionIO[Option[R]] = {
val fragments = FC.delay(toFragmentsSeq(values))
fragments
.flatMap(frs => FC.delay(composeFragments(frs)))
.flatMap(fr => fr.query[R].option)
}
}

/**
* `DoobieOptionalResultFunctionWithStatus` represents a db function that returns an optional result.
Expand All @@ -310,5 +382,15 @@ object DoobieFunction {
val readR: Read[R],
val readStatusWithData: Read[StatusWithData[R]]
) extends DBOptionalResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride)
with DoobieFunctionWithStatus[I, R, F]
with DoobieFunctionWithStatus[I, R, F] {

/**
* Returns the database function call as a `ConnectionIO[Option[FailedOrRow[R]]]` expecting zero or one result rows.
* @param values the input values for the function
* @return the `ConnectionIO[Option[FailedOrRow[R]]]` representing the database function call
*/
def toConnectionIOOptional(values: I): ConnectionIO[Option[FailedOrRow[R]]] = {
toConnectionIO(values).map(_.headOption)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.db.fadb.doobie

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits._
import doobie.ConnectionIO
import doobie.implicits._
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieFunction.{DoobieMultipleResultFunction, DoobieOptionalResultFunction, DoobieSingleResultFunctionWithStatus}
import za.co.absa.db.fadb.status.FailedOrRow
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.db.fadb.testing.classes.DoobieTest

class DoobieTransactionCompositionIntegrationTests extends AnyFunSuite with DoobieTest {

private val engine = new DoobieEngine(transactor)

class CreateActor(implicit schema: DBSchema, dbEngine: DoobieEngine[IO])
extends DoobieSingleResultFunctionWithStatus[CreateActorRequestBody, Int, IO](
values => Seq(fr"${values.firstName}", fr"${values.lastName}")
)
with StandardStatusHandling {
override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("o_actor_id")
}

class GetActorById(implicit schema: DBSchema, dbEngine: DoobieEngine[IO])
extends DoobieOptionalResultFunction[Int, Actor, IO](id => Seq(fr"$id"))

class GetActors(implicit schema: DBSchema, dbEngine: DoobieEngine[IO])
extends DoobieMultipleResultFunction[GetActorsQueryParameters, Actor, IO](
values => Seq(fr"${values.firstName}", fr"${values.lastName}")
)

private val createActor = new CreateActor()(Integration, engine)
private val getActorById = new GetActorById()(Integration, engine)
private val getActors = new GetActors()(Integration, engine)

/** Lifts a FailedOrRow into ConnectionIO — raises StatusException on Left, unwraps data on Right */
private def liftFailedOrRow[R](result: FailedOrRow[R]): ConnectionIO[R] = result match {
case Right(row) => row.data.pure[ConnectionIO]
case Left(ex) => doobie.FC.raiseError(ex)
}

test("Compose two function calls into a single transaction using toConnectionIO") {
val uniqueFirst = s"TxTest_${System.currentTimeMillis()}"
val uniqueLast = "CompositionTest"

val program: ConnectionIO[(Int, Option[Actor])] = for {
created <- createActor.toConnectionIO(CreateActorRequestBody(uniqueFirst, uniqueLast))
actorId <- liftFailedOrRow(created.head)
found <- getActorById.toConnectionIOOptional(actorId)
} yield (actorId, found)

val (actorId, foundActor) = engine.runConnectionIO(program).unsafeRunSync()

assert(actorId > 0)
assert(foundActor.isDefined)
assert(foundActor.get.firstName == uniqueFirst)
assert(foundActor.get.lastName == uniqueLast)
}

test("Compose using toConnectionIOSingle convenience method") {
val uniqueFirst = s"TxSingle_${System.currentTimeMillis()}"
val uniqueLast = "SingleTest"

val program: ConnectionIO[Actor] = for {
_ <- createActor.toConnectionIOSingle(CreateActorRequestBody(uniqueFirst, uniqueLast))
actors <- getActors.toConnectionIO(GetActorsQueryParameters(Some(uniqueFirst), Some(uniqueLast)))
} yield actors.head

val actor = engine.runConnectionIO(program).unsafeRunSync()

assert(actor.firstName == uniqueFirst)
assert(actor.lastName == uniqueLast)
}

test("Compose multiple creates in a single transaction") {
val timestamp = System.currentTimeMillis()
val lastName = s"BatchTest_$timestamp"

val program: ConnectionIO[Unit] = for {
_ <- createActor.toConnectionIOSingle(CreateActorRequestBody("BatchActor1", lastName))
_ <- createActor.toConnectionIOSingle(CreateActorRequestBody("BatchActor2", lastName))
_ <- createActor.toConnectionIOSingle(CreateActorRequestBody("BatchActor3", lastName))
} yield ()

engine.runConnectionIO(program).unsafeRunSync()

// Verify all actors exist — they were created in a single transaction
val actors = getActors(GetActorsQueryParameters(None, Some(lastName))).unsafeRunSync()
assert(actors.size >= 3)
}

test("Transaction rolls back all operations on failure") {
val uniqueFirst = s"Rollback_${System.currentTimeMillis()}"
val uniqueLast = "RollbackTest"

val program: ConnectionIO[Unit] = for {
_ <- createActor.toConnectionIOSingle(CreateActorRequestBody(uniqueFirst, uniqueLast))
_ <- doobie.FC.raiseError[Unit](new RuntimeException("Simulated failure after create"))
} yield ()

val result = engine.runConnectionIO(program).attempt.unsafeRunSync()
assert(result.isLeft, "Transaction should have failed")

// The actor should NOT exist because the transaction was rolled back
val actors = getActors(GetActorsQueryParameters(Some(uniqueFirst), Some(uniqueLast))).unsafeRunSync()
assert(actors.isEmpty, "Actor should not exist after transaction rollback")
}

test("Individual apply() calls still work independently (backward compatibility)") {
val uniqueFirst = s"Compat_${System.currentTimeMillis()}"
val uniqueLast = "CompatTest"

val result = createActor(CreateActorRequestBody(uniqueFirst, uniqueLast)).unsafeRunSync()
assert(result.isRight)

val actorId = result.toOption.get.data
val found = getActorById(actorId).unsafeRunSync()
assert(found.isDefined)
assert(found.get.firstName == uniqueFirst)
}
}
Loading