diff --git a/docs/std/hotswap.md b/docs/std/hotswap.md index 57ef66923f..bc9a50f65c 100644 --- a/docs/std/hotswap.md +++ b/docs/std/hotswap.md @@ -10,35 +10,33 @@ Constructing a new [`Resource`](./resource.md) inside the body of a until after the inner resource is released. Consider for example writing a logger that will rotate log files every `n` bytes. -## Hotswap +## Hotswap2 -`Hotswap` addresses this by exposing a linear sequence of resources as a single +`Hotswap2` addresses this by exposing a linear sequence of resources as a single `Resource`. We can run the finalizers for the current resource and advance to -the next one in the sequence using `Hotswap#swap`. An error may be raised if +the next one in the sequence using `Hotswap2#swap`. An error may be raised if the previous resource in the sequence is referenced after `swap` is invoked (as the resource will have been finalized). ```scala -sealed trait Hotswap[F[_], R] { - - def swap(next: Resource[F, R]): F[R] +sealed trait Hotswap2[F[_], R] { + def swap(next: Resource[F, R]): F[Unit] + def get: Resource[F, R] } ``` A rotating logger would then look something like this: ```scala -def rotating(n: Int): Resource[IO, Logger[IO]] = - Hotswap.create[IO, File].flatMap { hs => - def file(name: String): Resource[IO, File] = ??? - def write(file: File, msg: String): IO[Unit] = ??? +def rotating(n: Int): Resource[IO, Logger[IO]] = { + def file(name: String): Resource[IO, File] = ??? + def write(file: File, msg: String): IO[Unit] = ??? + Hotswap2[IO, File](file("0.log").flatMap { hs => Resource.eval { for { index <- Ref[IO].of(0) count <- Ref[IO].of(0) - // Open the initial log file - _ <- hs.swap(file("0.log")) } yield new Logger[IO] { def log(msg: String): IO[Unit] = count.get.flatMap { currentCount => @@ -61,4 +59,9 @@ def rotating(n: Int): Resource[IO, Logger[IO]] = } } } +} ``` + +## Hotswap + +In cases where the managed `Resource` may be missing, `Hotswap` provides some optimizations for this use case. diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index 581495e0ff..0b453e73b9 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -16,8 +16,8 @@ package cats.effect.std -import cats.effect.kernel.{Concurrent, Ref, Resource} -import cats.effect.kernel.syntax.all._ +import cats.effect.kernel.{Concurrent, Resource} +import cats.effect.kernel.Resource.ExitCase.Succeeded import cats.syntax.all._ /** @@ -102,70 +102,20 @@ object Hotswap { * swapped during the lifetime of this [[Hotswap]]. */ def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] = - Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore => - sealed abstract class State - case object Cleared extends State - case class Acquired(r: R, fin: F[Unit]) extends State - case object Finalized extends State - - def initialize: F[Ref[F, State]] = - F.ref(Cleared) - - def finalize(state: Ref[F, State]): F[Unit] = - state.getAndSet(Finalized).flatMap { - case Acquired(_, finalizer) => exclusive.surround(finalizer) - case Cleared => F.unit - case Finalized => raise("Hotswap already finalized") - } - - def raise(message: String): F[Unit] = - F.raiseError[Unit](new RuntimeException(message)) - - def exclusive: Resource[F, Unit] = - Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => - semaphore.releaseN(Long.MaxValue)) - - Resource.make(initialize)(finalize).map { state => - new Hotswap[F, R] { - - override def swap(next: Resource[F, R]): F[R] = - F.uncancelable { poll => - poll(next.allocated).flatMap { - case (r, fin) => - exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround { - swapFinalizer(Acquired(r, fin)).as(r) - } - } + Hotswap2.create[F, R].map { nes => + new Hotswap[F, R] { + override def swap(next: Resource[F, R]): F[R] = + nes.swap(next.map(_.some)) *> get.use(_.get.pure[F]) + + override def get: Resource[F, Option[R]] = + Resource.applyFull[F, Option[R]] { poll => + poll(nes.get.allocatedCase).flatMap { + case (None, fin) => fin(Succeeded) *> F.pure((None, _ => F.unit)) + case (r, fin) => F.pure((r, fin)) } + } - override def get: Resource[F, Option[R]] = - Resource.makeFull[F, Option[R]] { poll => - poll(semaphore.acquire) *> // acquire shared lock - state.get.flatMap { - case Acquired(r, _) => F.pure(Some(r)) - case _ => semaphore.release.as(None) - } - } { r => if (r.isDefined) semaphore.release else F.unit } - - override def clear: F[Unit] = - exclusive.surround(swapFinalizer(Cleared).uncancelable) - - private def swapFinalizer(next: State): F[Unit] = - state.modify { - case Acquired(_, fin) => - next -> fin - case Cleared => - next -> F.unit - case Finalized => - val fin = next match { - case Acquired(_, fin) => fin - case _ => F.unit - } - Finalized -> (fin *> raise("Cannot swap after finalization")) - }.flatten - - } + override def clear: F[Unit] = nes.clear } } - } diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap2.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap2.scala new file mode 100644 index 0000000000..15896a4cbc --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap2.scala @@ -0,0 +1,166 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.effect.kernel.{Concurrent, Ref, Resource} +import cats.syntax.all._ + +/** + * A concurrent data structure that exposes a linear sequence of `R` resources as a single + * [[cats.effect.kernel.Resource]] in `F` without accumulation. + * + * A [[Hotswap2]] is allocated within a [[cats.effect.kernel.Resource]] that dictates the scope + * of its lifetime. After creation, a `Resource[F, R]` can be swapped in by calling [[swap]]. + * The newly acquired resource is returned and is released either when the [[Hotswap2]] is + * finalized or upon the next call to [[swap]], whichever occurs first. + * + * The following diagram illustrates the linear allocation and release of three resources `r1`, + * `r2`, and `r3` cycled through [[Hotswap2]]: + * + * {{{ + * create(r1) ----- swap(r2) ---- swap(r3) ---- X + * | | | | + * r1 acquired | | | + * r2 acquired | | + * r1 released r3 acquired | + * r2 released | + * r3 released + * }}} + * + * [[Hotswap2]] is particularly useful when working with effects that cycle through resources, + * like writing bytes to files or rotating files every N bytes or M seconds. Without + * [[Hotswap2]], such effects leak resources: on each file rotation, a file handle or some + * internal resource handle accumulates. With [[Hotswap2]], the only registered resource is the + * [[Hotswap2]] itself, and each file is swapped in only after swapping the previous one out. + * + * Born from [[Hotswap]]. + */ +sealed trait Hotswap2[F[_], R] { + + /** + * Allocates a new resource, closes the previous one, and returns the newly allocated `R`. + * + * When the lifetime of the [[Hotswap2]] is completed, the resource allocated by the most + * recent [[swap]] will be finalized. + * + * [[swap]] finalizes the previous resource immediately, so users must ensure that the old `R` + * is not used thereafter. Failure to do so may result in an error on the _consumer_ side. In + * any case, no resources will be leaked. + * + * For safer access to the current resource see [[get]], which guarantees that it will not be + * released while it is being used. + * + * If [[swap]] is called after the lifetime of the [[Hotswap2]] is over, it will raise an + * error, but will ensure that all resources are finalized before returning. + */ + def swap(next: Resource[F, R]): F[Unit] + + /** + * Gets the current resource. The returned resource is guaranteed to be available for the + * duration of the returned resource. + */ + def get: Resource[F, R] +} + +object Hotswap2 { + + /** + * Creates a new [[Hotswap2]] initialized with the specified resource, which represents a + * [[cats.effect.kernel.Resource]] that can be swapped during the lifetime of this + * [[Hotswap2]]. + */ + def apply[F[_], R](initial: Resource[F, R])( + implicit F: Concurrent[F]): Resource[F, Hotswap2[F, R]] = + Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore => + sealed abstract class State + case class Acquired(r: R, fin: F[Unit]) extends State + case object Finalized extends State + + def initialize: F[Ref[F, State]] = + F.uncancelable { poll => + poll(initial.allocated).flatMap { + case (r, fin) => + exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround { + F.ref(Acquired(r, fin)) + } + } + } + + def finalize(state: Ref[F, State]): F[Unit] = + state.getAndSet(Finalized).flatMap { + case Acquired(_, finalizer) => exclusive.surround(finalizer) + case Finalized => raise("NonEmptyHotswap already finalized") + } + + def raise[A](message: String): F[A] = + F.raiseError[A](new IllegalStateException(message)) + + def exclusive: Resource[F, Unit] = + Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => + semaphore.releaseN(Long.MaxValue)) + + Resource.make(initialize)(finalize).map { state => + new Hotswap2[F, R] { + + override def swap(next: Resource[F, R]): F[Unit] = + F.uncancelable { poll => + poll(next.allocated).flatMap { + case (r, fin) => + exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround { + swapFinalizer(Acquired(r, fin)) + } + } + } + + override def get: Resource[F, R] = + Resource.makeFull[F, R] { poll => + poll(semaphore.acquire) *> // acquire shared lock + state.get.flatMap { + case Acquired(r, _) => F.pure(r) + case _ => raise("NonEmptyHotswap already finalized") + } + }(_ => semaphore.release) + + private def swapFinalizer(next: State): F[Unit] = + state.flatModify { + case Acquired(_, fin) => + next -> fin + case Finalized => + val fin = next match { + case Acquired(_, fin) => fin + case _ => F.unit + } + Finalized -> (fin *> raise[Unit]("Cannot swap after finalization")) + } + } + } + } + + /** + * Creates a `NonEmptyHotswap` that is largely functionally equivalent to `Hotswap` + * + * The primary difference is that [[Hotswap2.swap]] does not leak the resource. + */ + def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap2[F, Option[R]]] = + apply[F, Option[R]](Resource.pure(none)) + + implicit final class Hotswap2OptionalResourcesOpt[F[_], R]( + private val hs: Hotswap2[F, Option[R]]) + extends AnyVal { + def clear: F[Unit] = hs.swap(Resource.pure(none)) + } +} diff --git a/tests/shared/src/test/scala/cats/effect/std/Hotswap2Spec.scala b/tests/shared/src/test/scala/cats/effect/std/Hotswap2Spec.scala new file mode 100644 index 0000000000..39477cf702 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/Hotswap2Spec.scala @@ -0,0 +1,146 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats +package effect +package std + +import cats.effect.Resource +import cats.effect.kernel.Ref +import cats.effect.testkit.TestControl +import cats.effect.unsafe.IORuntimeConfig + +import scala.concurrent.duration._ + +class Hotswap2Spec extends BaseSpec { outer => + + sequential + + def logged(log: Ref[IO, List[String]], name: String): Resource[IO, Unit] = + Resource.make(log.update(_ :+ s"open $name"))(_ => log.update(_ :+ s"close $name")) + + "Hotswap2" should { + "run finalizer of target run when Hotswap2 is finalized" in real { + val op = for { + log <- Ref.of[IO, List[String]](List()) + _ <- Hotswap2[IO, Unit](logged(log, "a")).use(_ => IO.unit) + value <- log.get + } yield value + + op.flatMap { res => + IO { + res must beEqualTo(List("open a", "close a")) + } + } + } + + "acquire new resource and finalize old resource on swap" in real { + val op = for { + log <- Ref.of[IO, List[String]](List()) + _ <- + Hotswap2[IO, Unit](logged(log, "a")).use(_.swap(logged(log, "b"))) + value <- log.get + } yield value + + op.flatMap { res => + IO { + res must beEqualTo(List("open a", "open b", "close a", "close b")) + } + } + } + + "not release current resource while it is in use" in ticked { implicit ticker => + val r0 = Resource.make(IO.ref(1))(_.set(10)) + val r1 = Resource.make(IO.ref(2))(_.set(20)) + val r2 = Resource.make(IO.ref(3))(_.set(30)) + val go = Hotswap2[IO, Ref[IO, Int]](r0).use { hs => + hs.swap(r1) *> (IO.sleep(1.second) *> hs.swap(r2)).background.surround { + hs.get.use { ref => + val notReleased = ref.get.flatMap(b => IO(b must beEqualTo(2))) + notReleased *> IO.sleep(2.seconds) *> notReleased.void + } + } + } + + go must completeAs(()) + } + + "not finalize Hotswap2 while resource is in use" in ticked { implicit ticker => + val r0 = Resource.make(IO.ref(1))(_.set(10)) + val r1 = Resource.make(IO.ref(2))(_.set(20)) + val go = Hotswap2[IO, Ref[IO, Int]](r0).allocated.flatMap { + case (hs, fin) => + hs.swap(r1) *> (IO.sleep(1.second) *> fin).background.surround { + hs.get.use { ref => + val notReleased = ref.get.flatMap(b => IO(b must beEqualTo(2))) + notReleased *> IO.sleep(2.seconds) *> notReleased.void + } + } + } + + go must completeAs(()) + } + + "resource can be accessed concurrently" in ticked { implicit ticker => + val go = Hotswap2[IO, Unit](Resource.unit).use { hs => + hs.get.useForever.background.surround { + IO.sleep(1.second) *> hs.get.use_ + } + } + + go must completeAs(()) + } + + "not block current resource while swap is instantiating new one" in ticked { + implicit ticker => + val go = Hotswap2[IO, Unit](Resource.unit).use { hs => + hs.swap(IO.sleep(1.minute).toResource).start *> + IO.sleep(5.seconds) *> + hs.get.use_.timeout(1.second).void + } + go must completeAs(()) + } + + "successfully cancel during swap and run finalizer if cancelation is requested while waiting for get to release" in ticked { + implicit ticker => + val go = Ref.of[IO, List[String]](List()).flatMap { log => + Hotswap2[IO, Unit](logged(log, "a")).use { hs => + for { + _ <- hs.get.evalMap(_ => IO.sleep(1.minute)).use_.start + _ <- IO.sleep(2.seconds) + _ <- hs.swap(logged(log, "b")).timeoutTo(1.second, IO.unit) + value <- log.get + } yield value + } + } + + go must completeAs(List("open a", "open b", "close b")) + } + + "swap is safe to concurrent cancelation" in ticked { implicit ticker => + val go = IO.ref(false).flatMap { open => + Hotswap2[IO, Unit](Resource.unit) + .use { hs => + hs.swap(Resource.make(open.set(true))(_ => + open.getAndSet(false).map(_ should beTrue).void)) + } + .race(IO.unit) *> open.get.map(_ must beFalse) + } + + TestControl.executeEmbed(go, IORuntimeConfig(1, 2)).replicateA_(1000) must completeAs(()) + } + } +}