Skip to content

Implement metrics for external queue #4292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 66 commits into
base: series/3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
6da0c9c
Adding metrics
Atharva-Kanherkar Mar 5, 2025
08806ce
Added demo test and some other getter methods
Atharva-Kanherkar Mar 5, 2025
084adf6
Removed the unnecessary commit
Atharva-Kanherkar Mar 5, 2025
af7b422
removed unnecessary files
Atharva-Kanherkar Mar 5, 2025
77b35ce
Remove .scala-build/ files from repository
Atharva-Kanherkar Mar 5, 2025
cdc302e
Add metrics for external queue tracking in WorkStealingThreadPool
Atharva-Kanherkar Mar 5, 2025
96bd55b
Final commit
Atharva-Kanherkar Mar 5, 2025
bb178dd
Some changes
Atharva-Kanherkar Mar 5, 2025
297c1cc
Added the requested changes
Atharva-Kanherkar Mar 7, 2025
ee97bc3
Added the changes requested
Atharva-Kanherkar Mar 7, 2025
8ab78e2
Fixed compile errors
Atharva-Kanherkar Mar 8, 2025
9386f2d
Fixed warnings and formatting issues
Atharva-Kanherkar Mar 8, 2025
b7cb918
Fixed CI errors
Atharva-Kanherkar Mar 8, 2025
e3eb52f
Fixed CI errors
Atharva-Kanherkar Mar 8, 2025
d3304a4
fixed more CI errrors
Atharva-Kanherkar Mar 8, 2025
6d548c0
Fixed more CI errors
Atharva-Kanherkar Mar 8, 2025
2ce80fa
Move external queue metrics tracking to ScalQueue
Atharva-Kanherkar Mar 9, 2025
6f7212b
fixed ci errors
Atharva-Kanherkar Mar 9, 2025
75f76ef
Fixed different CI errors again
Atharva-Kanherkar Mar 9, 2025
16f241f
Fixed CI errors
Atharva-Kanherkar Mar 9, 2025
02851e7
fixed other ci errors
Atharva-Kanherkar Mar 9, 2025
9e4156a
Fixed CI errors
Atharva-Kanherkar Mar 9, 2025
1f084bf
Added requested changes
Atharva-Kanherkar Mar 10, 2025
405d68b
Added the requested changes, without the test
Atharva-Kanherkar Mar 11, 2025
69ea0dd
Added headers
Atharva-Kanherkar Mar 11, 2025
c2f62a4
Removed warnings
Atharva-Kanherkar Mar 11, 2025
8e940ba
Added the requested changes
Atharva-Kanherkar Mar 12, 2025
448c999
Trying to fix the CI
Atharva-Kanherkar Mar 12, 2025
0496df1
Formatted
Atharva-Kanherkar Mar 12, 2025
4e9115b
Fixed tests
Atharva-Kanherkar Mar 12, 2025
f790bc2
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 12, 2025
41932bf
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 12, 2025
a21af3e
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 12, 2025
5c0aeee
Requested changes
Atharva-Kanherkar Mar 13, 2025
fa29e87
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 14, 2025
24a1d02
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 14, 2025
693dc11
Added requested changes
Atharva-Kanherkar Mar 14, 2025
3d9973e
Implement striped metrics counters in ScalQueue
Atharva-Kanherkar Mar 14, 2025
60bedf1
Fixed warnings
Atharva-Kanherkar Mar 14, 2025
0dd74d6
Update core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealin…
Atharva-Kanherkar Mar 18, 2025
69f2d3a
Added requested changes
Atharva-Kanherkar Mar 18, 2025
e0ff288
Fixed warnings
Atharva-Kanherkar Mar 18, 2025
1d03483
FIXED WARNINGS
Atharva-Kanherkar Mar 18, 2025
e2e1733
Removed un necessary comments
Atharva-Kanherkar Mar 19, 2025
8a92974
Adding requested changes
Atharva-Kanherkar Mar 20, 2025
9cd6a61
Added test
Atharva-Kanherkar Mar 20, 2025
24721c7
Removed not important file
Atharva-Kanherkar Mar 20, 2025
f24fb0d
Reformatting
Atharva-Kanherkar Mar 20, 2025
52be56c
Some changes
Atharva-Kanherkar Mar 20, 2025
c13a4c5
Fixed warnings
Atharva-Kanherkar Mar 20, 2025
c7c6f5d
Added tests
Atharva-Kanherkar Mar 21, 2025
ae89bcb
Fixed errors
Atharva-Kanherkar Mar 21, 2025
062c66e
ensured code quality
Atharva-Kanherkar Mar 21, 2025
bac1d6a
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 26, 2025
0661804
Changes
Atharva-Kanherkar Mar 26, 2025
268a6ea
Added requested changes
Atharva-Kanherkar Mar 26, 2025
4a6ce8e
Changed names
Atharva-Kanherkar Apr 3, 2025
d6baedc
Fixed build.sbt
Atharva-Kanherkar Apr 3, 2025
11c705a
Merge remote-tracking branch 'origin/series/3.x' into external-queue-…
Atharva-Kanherkar Apr 8, 2025
3759bff
Update tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala
Atharva-Kanherkar Apr 12, 2025
d623124
Changes
Atharva-Kanherkar Apr 12, 2025
ae146cb
Added requested changes
Atharva-Kanherkar Apr 12, 2025
33b9101
Formatting
Atharva-Kanherkar Apr 12, 2025
9286d86
Added changes
Atharva-Kanherkar Apr 12, 2025
1d4b6cd
Added shorthands
Atharva-Kanherkar Apr 12, 2025
ba72a2a
Formatting fixes
armanbilge Apr 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class ScalQueueBenchmark {
@Param(Array("4")) // keep this a power of 2
var threads: Int = _

val thing = new AnyRef
val thing = new Runnable { def run(): Unit = {} }

@Benchmark
def scalConcurrentEnqueueDequeue(): Unit = {
val q = new ScalQueue[AnyRef](threads)
val q = new ScalQueue(threads)
val latch = new CountDownLatch(threads)

// every thread will send and receive this number of events
Expand Down
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -725,11 +725,16 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
// internal API change, makes CpuStarvationMetrics available on all platforms
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics"),
// introduced by #4292, external queue metrics
// WorkStealingThreadPoolMetrics is a sealed trait, so we control all of its implementations.
ProblemFilters.exclude[ReversedMissingMethodProblem](
"cats.effect.unsafe.metrics.WorkStealingThreadPoolMetrics.externalQueue"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMetrics"),
// package-private classes moved to the `cats.effect.unsafe.metrics` package
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation$"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMBean")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMBean"),
ProblemFilters.exclude[Problem]("cats.effect.unsafe.ScalQueue*")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private final class LocalQueue extends LocalQueuePadding {
* a reference to an uncontended source of randomness, to be passed along to the striped
* concurrent queues when executing their enqueue operations
*/
def enqueue(fiber: Runnable, external: ScalQueue[AnyRef], random: ThreadLocalRandom): Unit = {
def enqueue(fiber: Runnable, external: ScalQueue, random: ThreadLocalRandom): Unit = {
// A plain, unsynchronized load of the tail of the local queue.
val tl = tail

Expand Down Expand Up @@ -281,7 +281,7 @@ private final class LocalQueue extends LocalQueuePadding {

// Enqueue all of the batches of fibers on the batched queue with a bulk
// add operation.
external.offerAll(batches, random)
external.offerAllBatches(batches, random)
// Loop again for a chance to insert the original fiber to be enqueued
// on the local queue.
}
Expand Down Expand Up @@ -657,7 +657,7 @@ private final class LocalQueue extends LocalQueuePadding {
* a reference to an uncontended source of randomness, to be passed along to the striped
* concurrent queues when executing their enqueue operations
*/
def drainBatch(external: ScalQueue[AnyRef], random: ThreadLocalRandom): Unit = {
def drainBatch(external: ScalQueue, random: ThreadLocalRandom): Unit = {
// A plain, unsynchronized load of the tail of the local queue.
val tl = tail

Expand Down Expand Up @@ -702,8 +702,7 @@ private final class LocalQueue extends LocalQueuePadding {
totalSpilloverCount += SpilloverBatchSize
Tail.updater.lazySet(this, tl)
}

external.offer(batch, random)
external.offerBatch(batch, random)
return
}
}
Expand Down
Loading
Loading