Skip to content

Commit 32ccf15

Browse files
authored
Refactor ServerRunner to use Resource, fix unsafe .allocated cases (#4267)
1 parent ba21ead commit 32ccf15

File tree

20 files changed

+152
-179
lines changed

20 files changed

+152
-179
lines changed

client/testserver/src/main/scala/sttp/tapir/client/tests/HttpServer.scala

+7-22
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,22 @@ import org.http4s._
1616
import org.slf4j.LoggerFactory
1717
import org.typelevel.ci.CIString
1818
import scodec.bits.ByteVector
19-
import sttp.tapir.client.tests.HttpServer._
2019

2120
import scala.concurrent.ExecutionContext
2221

23-
object HttpServer {
22+
object HttpServer extends ResourceApp.Forever {
2423
type Port = Int
2524

26-
def main(args: Array[String]): Unit = {
25+
def run(args: List[String]): Resource[IO, Unit] = {
2726
val port = args.headOption.map(_.toInt).getOrElse(51823)
28-
new HttpServer(port).start()
27+
new HttpServer(port).build.void
2928
}
3029
}
3130

32-
class HttpServer(port: Port) {
31+
class HttpServer(port: HttpServer.Port) {
3332

3433
private val logger = LoggerFactory.getLogger(getClass)
3534

36-
private var stopServer: IO[Unit] = _
37-
3835
//
3936

4037
private object numParam extends QueryParamDecoderMatcher[Int]("num")
@@ -212,23 +209,11 @@ class HttpServer(port: Port) {
212209

213210
//
214211

215-
def start(): Unit = {
216-
val (_, _stopServer) = BlazeServerBuilder[IO]
212+
def build: Resource[IO, server.Server] = BlazeServerBuilder[IO]
217213
.withExecutionContext(ExecutionContext.global)
218214
.bindHttp(port)
219215
.withHttpWebSocketApp(app)
220216
.resource
221-
.map(_.address.getPort)
222-
.allocated
223-
.unsafeRunSync()
224-
225-
stopServer = _stopServer
226-
227-
logger.info(s"Server on port $port started")
228-
}
229-
230-
def close(): Unit = {
231-
stopServer.unsafeRunSync()
232-
logger.info(s"Server on port $port stopped")
233-
}
217+
.evalTap(_ => IO(logger.info(s"Server on port $port started")))
218+
.onFinalize(IO(logger.info(s"Server on port $port stopped")))
234219
}

doc/tutorials/07_cats_effect.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp:
230230
.bindHttp(8080, "localhost")
231231
.withHttpApp(Router("/" -> allRoutes).orNotFound)
232232
.resource
233-
.use(_ => IO.never)
234-
.as(ExitCode.Success)
233+
.useForever
235234
```
236235

237236
Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated

examples/src/main/scala/sttp/tapir/examples/streaming/ProxyHttp4sFs2Server.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,4 @@ object ProxyHttp4sFs2Server extends IOApp:
6262
.bindHttp(8080, "localhost")
6363
.withHttpApp(Router("/" -> routes).orNotFound)
6464
.resource
65-
} yield ())
66-
.use { _ => IO.never }
67-
.as(ExitCode.Success)
65+
} yield ()).useForever

examples/src/main/scala/sttp/tapir/examples/streaming/StreamingHttp4sFs2ServerOrError.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,4 @@ object StreamingHttp4sFs2ServerOrError extends IOApp:
5656
.bindHttp(8080, "localhost")
5757
.withHttpApp(Router("/" -> userDataRoutes).orNotFound)
5858
.resource
59-
.use { _ => IO.never }
60-
.as(ExitCode.Success)
59+
.useForever

generated-doc/out/tutorials/07_cats_effect.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp:
230230
.bindHttp(8080, "localhost")
231231
.withHttpApp(Router("/" -> allRoutes).orNotFound)
232232
.resource
233-
.use(_ => IO.never)
234-
.as(ExitCode.Success)
233+
.useForever
235234
```
236235

237236
Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,42 @@
11
package sttp.tapir.perf.apis
22

3-
import cats.effect.{ExitCode, IO, IOApp}
3+
import cats.effect.{IO, Resource, ResourceApp}
44

55
import scala.reflect.runtime.universe
66

77
trait ServerRunner {
8-
def start: IO[ServerRunner.KillSwitch]
8+
def runServer: Resource[IO, Unit]
99
}
1010

1111
/** Can be used as a Main object to run a single server using its short name. Running perfTests/runMain
1212
* [[sttp.tapir.perf.apis.ServerRunner]] will load special javaOptions configured in build.sbt, enabling recording JFR metrics. This is
1313
* useful when you want to guarantee that the server runs in a different JVM than test runner, so that memory and CPU metrics are recorded
1414
* only in the scope of the server JVM.
1515
*/
16-
object ServerRunner extends IOApp {
17-
type KillSwitch = IO[Unit]
18-
val NoopKillSwitch = IO.pure(IO.unit)
16+
object ServerRunner extends ResourceApp.Forever {
17+
1918
private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
19+
private val requireArg: Resource[IO, Unit] = Resource.raiseError(
20+
new IllegalArgumentException(s"Unspecified server name. Use one of: ${TypeScanner.allServers}"): Throwable
21+
)
22+
private def notInstantiated(name: ServerName)(e: Throwable): IO[ServerRunner] = IO.raiseError(
23+
new IllegalArgumentException(
24+
s"ERROR! Could not find object ${name.fullName} or it doesn't extend ServerRunner", e
25+
)
26+
)
2027

21-
def run(args: List[String]): IO[ExitCode] = {
22-
val shortServerName = args.headOption.getOrElse {
23-
throw new IllegalArgumentException(s"Unspecified server name. Use one of: ${TypeScanner.allServers}")
24-
}
25-
for {
26-
killSwitch <- startServerByTypeName(ServerName.fromShort(shortServerName))
27-
_ <- IO.never.guarantee(killSwitch)
28-
} yield ExitCode.Success
29-
}
28+
def run(args: List[String]): Resource[IO, Unit] =
29+
args.headOption.map(ServerName.fromShort).map(startServerByTypeName).getOrElse(requireArg)
3030

31-
def startServerByTypeName(serverName: ServerName): IO[ServerRunner.KillSwitch] = {
31+
def startServerByTypeName(serverName: ServerName): Resource[IO, Unit] =
3232
serverName match {
33-
case ExternalServerName => NoopKillSwitch
34-
case _ =>
35-
try {
33+
case ExternalServerName => Resource.unit
34+
case _ => Resource.eval(
35+
IO({
3636
val moduleSymbol = runtimeMirror.staticModule(serverName.fullName)
3737
val moduleMirror = runtimeMirror.reflectModule(moduleSymbol)
38-
val instance: ServerRunner = moduleMirror.instance.asInstanceOf[ServerRunner]
39-
instance.start
40-
} catch {
41-
case e: Throwable =>
42-
IO.raiseError(
43-
new IllegalArgumentException(s"ERROR! Could not find object ${serverName.fullName} or it doesn't extend ServerRunner", e)
44-
)
45-
}
38+
moduleMirror.instance.asInstanceOf[ServerRunner]
39+
}).handleErrorWith(notInstantiated(serverName))
40+
).flatMap(_.runServer)
4641
}
47-
}
4842
}

perf-tests/src/main/scala/sttp/tapir/perf/apis/TypeScanner.scala

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import io.github.classgraph.ClassGraph
44

55
import scala.jdk.CollectionConverters._
66
import scala.reflect.ClassTag
7-
import scala.util.{Failure, Success, Try}
87

98
import sttp.tapir.perf.Common._
109

perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala

+8-13
Original file line numberDiff line numberDiff line change
@@ -106,26 +106,21 @@ object Tapir extends Endpoints {
106106
object server {
107107
val maxConnections = 65536
108108
val connectorPoolSize: Int = Math.max(2, Runtime.getRuntime.availableProcessors() / 4)
109-
def runServer(
110-
router: WebSocketBuilder2[IO] => HttpRoutes[IO]
111-
): IO[ServerRunner.KillSwitch] =
109+
def runServer(router: WebSocketBuilder2[IO] => HttpRoutes[IO]): Resource[IO, Unit] =
112110
BlazeServerBuilder[IO]
113111
.bindHttp(Port, "localhost")
114112
.withHttpWebSocketApp(wsb => router(wsb).orNotFound)
115113
.withMaxConnections(maxConnections)
116114
.withConnectorPoolSize(connectorPoolSize)
117115
.resource
118-
.allocated
119-
.map(_._2)
120-
.map(_.flatTap { _ =>
121-
IO.println("Http4s server closed.")
122-
})
116+
.map(_ => ())
117+
.onFinalize(IO.println("Http4s server closed."))
123118
}
124119

125-
object TapirServer extends ServerRunner { override def start = server.runServer(Tapir.router(1)) }
126-
object TapirMultiServer extends ServerRunner { override def start = server.runServer(Tapir.router(128)) }
120+
object TapirServer extends ServerRunner { override def runServer = server.runServer(Tapir.router(1)) }
121+
object TapirMultiServer extends ServerRunner { override def runServer = server.runServer(Tapir.router(128)) }
127122
object TapirInterceptorMultiServer extends ServerRunner {
128-
override def start = server.runServer(Tapir.router(128, withServerLog = true))
123+
override def runServer = server.runServer(Tapir.router(128, withServerLog = true))
129124
}
130-
object VanillaServer extends ServerRunner { override def start = server.runServer(Vanilla.router(1)) }
131-
object VanillaMultiServer extends ServerRunner { override def start = server.runServer(Vanilla.router(128)) }
125+
object VanillaServer extends ServerRunner { override def runServer = server.runServer(Vanilla.router(1)) }
126+
object VanillaMultiServer extends ServerRunner { override def runServer = server.runServer(Vanilla.router(128)) }

perf-tests/src/main/scala/sttp/tapir/perf/netty/cats/NettyCats.scala

+14-16
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import sttp.tapir.perf.apis._
1010
import sttp.tapir.server.ServerEndpoint
1111
import sttp.tapir.server.netty.cats.NettyCatsServer
1212
import sttp.tapir.server.netty.cats.NettyCatsServerOptions
13-
import sttp.ws.WebSocketFrame
1413
import sttp.capabilities.fs2.Fs2Streams
1514

1615
import scala.concurrent.duration._
@@ -33,27 +32,26 @@ object NettyCats {
3332
Tapir.wsResponseStream.evalMap(_ => IO.realTime.map(_.toMillis)).concurrently(in.as(()))
3433
}
3534
)
36-
def runServer(endpoints: List[ServerEndpoint[Any, IO]], withServerLog: Boolean = false): IO[ServerRunner.KillSwitch] = {
35+
def runServer(endpoints: List[ServerEndpoint[Any, IO]], withServerLog: Boolean = false): Resource[IO, Unit] = {
3736
val declaredPort = Port
3837
val declaredHost = "0.0.0.0"
39-
(for {
38+
for {
4039
dispatcher <- Dispatcher.parallel[IO]
4140
serverOptions = buildOptions(NettyCatsServerOptions.customiseInterceptors(dispatcher), withServerLog)
42-
server <- NettyCatsServer.io()
43-
_ <-
44-
Resource.make(
45-
server
46-
.port(declaredPort)
47-
.host(declaredHost)
48-
.addEndpoints(wsServerEndpoint :: endpoints)
49-
.start()
50-
)(binding => binding.stop())
51-
} yield ()).allocated.map(_._2)
41+
server <- NettyCatsServer.io().map(_.options(serverOptions))
42+
_ <- Resource.make(
43+
server
44+
.port(declaredPort)
45+
.host(declaredHost)
46+
.addEndpoints(wsServerEndpoint :: endpoints)
47+
.start()
48+
)(_.stop())
49+
} yield ()
5250
}
5351
}
5452

55-
object TapirServer extends ServerRunner { override def start = NettyCats.runServer(Tapir.genEndpointsIO(1)) }
56-
object TapirMultiServer extends ServerRunner { override def start = NettyCats.runServer(Tapir.genEndpointsIO(128)) }
53+
object TapirServer extends ServerRunner { override def runServer = NettyCats.runServer(Tapir.genEndpointsIO(1)) }
54+
object TapirMultiServer extends ServerRunner { override def runServer = NettyCats.runServer(Tapir.genEndpointsIO(128)) }
5755
object TapirInterceptorMultiServer extends ServerRunner {
58-
override def start = NettyCats.runServer(Tapir.genEndpointsIO(128), withServerLog = true)
56+
override def runServer = NettyCats.runServer(Tapir.genEndpointsIO(128), withServerLog = true)
5957
}

perf-tests/src/main/scala/sttp/tapir/perf/netty/future/NettyFuture.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package sttp.tapir.perf.netty.future
22

3-
import cats.effect.IO
3+
import cats.effect.{IO, Resource}
44
import sttp.tapir.perf.apis._
55
import sttp.tapir.perf.Common._
66
import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding, NettyFutureServerOptions}
@@ -14,7 +14,7 @@ object Tapir extends Endpoints
1414

1515
object NettyFuture {
1616

17-
def runServer(endpoints: List[ServerEndpoint[Any, Future]], withServerLog: Boolean = false): IO[ServerRunner.KillSwitch] = {
17+
def runServer(endpoints: List[ServerEndpoint[Any, Future]], withServerLog: Boolean = false): Resource[IO, Unit] = {
1818
val declaredPort = Port
1919
val declaredHost = "0.0.0.0"
2020
val serverOptions = buildOptions(NettyFutureServerOptions.customiseInterceptors, withServerLog)
@@ -29,13 +29,12 @@ object NettyFuture {
2929
.start()
3030
)
3131
)
32-
33-
serverBinding.map(b => IO.fromFuture(IO(b.stop())))
32+
Resource.make(serverBinding)(b => IO.fromFuture(IO(b.stop()))).map(_ => ())
3433
}
3534
}
3635

37-
object TapirServer extends ServerRunner { override def start = NettyFuture.runServer(Tapir.genEndpointsFuture(1)) }
38-
object TapirMultiServer extends ServerRunner { override def start = NettyFuture.runServer(Tapir.genEndpointsFuture(128)) }
36+
object TapirServer extends ServerRunner { override def runServer = NettyFuture.runServer(Tapir.genEndpointsFuture(1)) }
37+
object TapirMultiServer extends ServerRunner { override def runServer = NettyFuture.runServer(Tapir.genEndpointsFuture(128)) }
3938
object TapirInterceptorMultiServer extends ServerRunner {
40-
override def start = NettyFuture.runServer(Tapir.genEndpointsFuture(128), withServerLog = true)
39+
override def runServer = NettyFuture.runServer(Tapir.genEndpointsFuture(128), withServerLog = true)
4140
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package sttp.tapir.perf.nima
22

3-
import cats.effect.IO
3+
import cats.effect.{IO, Resource}
44
import io.helidon.webserver.WebServer
55
import sttp.shared.Identity
66
import sttp.tapir.perf.apis._
@@ -14,27 +14,29 @@ object Tapir extends Endpoints {
1414

1515
object Nima {
1616

17-
def runServer(endpoints: List[ServerEndpoint[Any, Identity]], withServerLog: Boolean = false): IO[ServerRunner.KillSwitch] = {
17+
def runServer(endpoints: List[ServerEndpoint[Any, Identity]], withServerLog: Boolean = false): Resource[IO, Unit] = {
1818
val declaredPort = Port
1919
val serverOptions = buildOptions(NimaServerOptions.customiseInterceptors, withServerLog)
2020
// Starting Nima server
2121

2222
val handler = NimaServerInterpreter(serverOptions).toHandler(endpoints)
23-
val server = WebServer
24-
.builder()
25-
.routing { builder =>
26-
builder.any(handler)
27-
()
28-
}
29-
.port(declaredPort)
30-
.build()
31-
.start()
32-
IO(IO { val _ = server.stop() })
23+
val startServer = IO {
24+
WebServer
25+
.builder()
26+
.routing { builder =>
27+
builder.any(handler)
28+
()
29+
}
30+
.port(declaredPort)
31+
.build()
32+
.start()
33+
}
34+
Resource.make(startServer)(server => IO(server.stop()).void).map(_ => ())
3335
}
3436
}
3537

36-
object TapirServer extends ServerRunner { override def start = Nima.runServer(Tapir.genEndpointsNId(1)) }
37-
object TapirMultiServer extends ServerRunner { override def start = Nima.runServer(Tapir.genEndpointsNId(128)) }
38+
object TapirServer extends ServerRunner { override def runServer = Nima.runServer(Tapir.genEndpointsNId(1)) }
39+
object TapirMultiServer extends ServerRunner { override def runServer = Nima.runServer(Tapir.genEndpointsNId(128)) }
3840
object TapirInterceptorMultiServer extends ServerRunner {
39-
override def start = Nima.runServer(Tapir.genEndpointsNId(128), withServerLog = true)
41+
override def runServer = Nima.runServer(Tapir.genEndpointsNId(128), withServerLog = true)
4042
}

0 commit comments

Comments
 (0)