Skip to content

Add NonEmptyHotswap #4267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: series/3.6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions docs/std/hotswap.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,33 @@ Constructing a new [`Resource`](./resource.md) inside the body of a
until after the inner resource is released. Consider for example writing a
logger that will rotate log files every `n` bytes.

## Hotswap
## Hotswap2

`Hotswap` addresses this by exposing a linear sequence of resources as a single
`Hotswap2` addresses this by exposing a linear sequence of resources as a single
`Resource`. We can run the finalizers for the current resource and advance to
the next one in the sequence using `Hotswap#swap`. An error may be raised if
the next one in the sequence using `Hotswap2#swap`. An error may be raised if
the previous resource in the sequence is referenced after `swap` is invoked
(as the resource will have been finalized).

```scala
sealed trait Hotswap[F[_], R] {

def swap(next: Resource[F, R]): F[R]
sealed trait Hotswap2[F[_], R] {
def swap(next: Resource[F, R]): F[Unit]
def get: Resource[F, R]
}
```

A rotating logger would then look something like this:

```scala
def rotating(n: Int): Resource[IO, Logger[IO]] =
Hotswap.create[IO, File].flatMap { hs =>
def file(name: String): Resource[IO, File] = ???
def write(file: File, msg: String): IO[Unit] = ???
def rotating(n: Int): Resource[IO, Logger[IO]] = {
def file(name: String): Resource[IO, File] = ???
def write(file: File, msg: String): IO[Unit] = ???

Hotswap2[IO, File](file("0.log").flatMap { hs =>
Resource.eval {
for {
index <- Ref[IO].of(0)
count <- Ref[IO].of(0)
// Open the initial log file
_ <- hs.swap(file("0.log"))
} yield new Logger[IO] {
def log(msg: String): IO[Unit] =
count.get.flatMap { currentCount =>
Expand All @@ -61,4 +59,9 @@ def rotating(n: Int): Resource[IO, Logger[IO]] =
}
}
}
}
```

## Hotswap

In cases where the managed `Resource` may be missing, `Hotswap` provides some optimizations for this use case.
78 changes: 14 additions & 64 deletions std/shared/src/main/scala/cats/effect/std/Hotswap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package cats.effect.std

import cats.effect.kernel.{Concurrent, Ref, Resource}
import cats.effect.kernel.syntax.all._
import cats.effect.kernel.{Concurrent, Resource}
import cats.effect.kernel.Resource.ExitCase.Succeeded
import cats.syntax.all._

/**
Expand Down Expand Up @@ -102,70 +102,20 @@ object Hotswap {
* swapped during the lifetime of this [[Hotswap]].
*/
def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] =
Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore =>
sealed abstract class State
case object Cleared extends State
case class Acquired(r: R, fin: F[Unit]) extends State
case object Finalized extends State

def initialize: F[Ref[F, State]] =
F.ref(Cleared)

def finalize(state: Ref[F, State]): F[Unit] =
state.getAndSet(Finalized).flatMap {
case Acquired(_, finalizer) => exclusive.surround(finalizer)
case Cleared => F.unit
case Finalized => raise("Hotswap already finalized")
}

def raise(message: String): F[Unit] =
F.raiseError[Unit](new RuntimeException(message))

def exclusive: Resource[F, Unit] =
Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ =>
semaphore.releaseN(Long.MaxValue))

Resource.make(initialize)(finalize).map { state =>
new Hotswap[F, R] {

override def swap(next: Resource[F, R]): F[R] =
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, fin) =>
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
swapFinalizer(Acquired(r, fin)).as(r)
}
}
Hotswap2.create[F, R].map { nes =>
new Hotswap[F, R] {
override def swap(next: Resource[F, R]): F[R] =
nes.swap(next.map(_.some)) *> get.use(_.get.pure[F])

override def get: Resource[F, Option[R]] =
Resource.applyFull[F, Option[R]] { poll =>
poll(nes.get.allocatedCase).flatMap {
case (None, fin) => fin(Succeeded) *> F.pure((None, _ => F.unit))
case (r, fin) => F.pure((r, fin))
}
}

override def get: Resource[F, Option[R]] =
Resource.makeFull[F, Option[R]] { poll =>
poll(semaphore.acquire) *> // acquire shared lock
state.get.flatMap {
case Acquired(r, _) => F.pure(Some(r))
case _ => semaphore.release.as(None)
}
} { r => if (r.isDefined) semaphore.release else F.unit }

override def clear: F[Unit] =
exclusive.surround(swapFinalizer(Cleared).uncancelable)

private def swapFinalizer(next: State): F[Unit] =
state.modify {
case Acquired(_, fin) =>
next -> fin
case Cleared =>
next -> F.unit
case Finalized =>
val fin = next match {
case Acquired(_, fin) => fin
case _ => F.unit
}
Finalized -> (fin *> raise("Cannot swap after finalization"))
}.flatten

}
override def clear: F[Unit] = nes.clear
}
}

}
166 changes: 166 additions & 0 deletions std/shared/src/main/scala/cats/effect/std/Hotswap2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2020-2025 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel.{Concurrent, Ref, Resource}
import cats.syntax.all._

/**
* A concurrent data structure that exposes a linear sequence of `R` resources as a single
* [[cats.effect.kernel.Resource]] in `F` without accumulation.
*
* A [[Hotswap2]] is allocated within a [[cats.effect.kernel.Resource]] that dictates the scope
* of its lifetime. After creation, a `Resource[F, R]` can be swapped in by calling [[swap]].
* The newly acquired resource is returned and is released either when the [[Hotswap2]] is
* finalized or upon the next call to [[swap]], whichever occurs first.
*
* The following diagram illustrates the linear allocation and release of three resources `r1`,
* `r2`, and `r3` cycled through [[Hotswap2]]:
*
* {{{
* create(r1) ----- swap(r2) ---- swap(r3) ---- X
* | | | |
* r1 acquired | | |
* r2 acquired | |
* r1 released r3 acquired |
* r2 released |
* r3 released
* }}}
*
* [[Hotswap2]] is particularly useful when working with effects that cycle through resources,
* like writing bytes to files or rotating files every N bytes or M seconds. Without
* [[Hotswap2]], such effects leak resources: on each file rotation, a file handle or some
* internal resource handle accumulates. With [[Hotswap2]], the only registered resource is the
* [[Hotswap2]] itself, and each file is swapped in only after swapping the previous one out.
*
* Born from [[Hotswap]].
*/
sealed trait Hotswap2[F[_], R] {

/**
* Allocates a new resource, closes the previous one, and returns the newly allocated `R`.
*
* When the lifetime of the [[Hotswap2]] is completed, the resource allocated by the most
* recent [[swap]] will be finalized.
*
* [[swap]] finalizes the previous resource immediately, so users must ensure that the old `R`
* is not used thereafter. Failure to do so may result in an error on the _consumer_ side. In
* any case, no resources will be leaked.
*
* For safer access to the current resource see [[get]], which guarantees that it will not be
* released while it is being used.
*
* If [[swap]] is called after the lifetime of the [[Hotswap2]] is over, it will raise an
* error, but will ensure that all resources are finalized before returning.
*/
def swap(next: Resource[F, R]): F[Unit]

/**
* Gets the current resource. The returned resource is guaranteed to be available for the
* duration of the returned resource.
*/
def get: Resource[F, R]
}

object Hotswap2 {

/**
* Creates a new [[Hotswap2]] initialized with the specified resource, which represents a
* [[cats.effect.kernel.Resource]] that can be swapped during the lifetime of this
* [[Hotswap2]].
*/
def apply[F[_], R](initial: Resource[F, R])(
implicit F: Concurrent[F]): Resource[F, Hotswap2[F, R]] =
Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore =>
sealed abstract class State
case class Acquired(r: R, fin: F[Unit]) extends State
case object Finalized extends State

def initialize: F[Ref[F, State]] =
F.uncancelable { poll =>
poll(initial.allocated).flatMap {
case (r, fin) =>
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
F.ref(Acquired(r, fin))
}
}
}

def finalize(state: Ref[F, State]): F[Unit] =
state.getAndSet(Finalized).flatMap {
case Acquired(_, finalizer) => exclusive.surround(finalizer)
case Finalized => raise("NonEmptyHotswap already finalized")
}

def raise[A](message: String): F[A] =
F.raiseError[A](new IllegalStateException(message))

def exclusive: Resource[F, Unit] =
Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ =>
semaphore.releaseN(Long.MaxValue))

Resource.make(initialize)(finalize).map { state =>
new Hotswap2[F, R] {

override def swap(next: Resource[F, R]): F[Unit] =
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, fin) =>
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
swapFinalizer(Acquired(r, fin))
}
}
}

override def get: Resource[F, R] =
Resource.makeFull[F, R] { poll =>
poll(semaphore.acquire) *> // acquire shared lock
state.get.flatMap {
case Acquired(r, _) => F.pure(r)
case _ => raise("NonEmptyHotswap already finalized")
}
}(_ => semaphore.release)

private def swapFinalizer(next: State): F[Unit] =
state.flatModify {
case Acquired(_, fin) =>
next -> fin
case Finalized =>
val fin = next match {
case Acquired(_, fin) => fin
case _ => F.unit
}
Finalized -> (fin *> raise[Unit]("Cannot swap after finalization"))
}
}
}
}

/**
* Creates a `NonEmptyHotswap` that is largely functionally equivalent to `Hotswap`
*
* The primary difference is that [[Hotswap2.swap]] does not leak the resource.
*/
def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap2[F, Option[R]]] =
apply[F, Option[R]](Resource.pure(none))

implicit final class Hotswap2OptionalResourcesOpt[F[_], R](
private val hs: Hotswap2[F, Option[R]])
extends AnyVal {
def clear: F[Unit] = hs.swap(Resource.pure(none))
}
}
Loading
Loading