-
Notifications
You must be signed in to change notification settings - Fork 552
Implement metrics for external queue #4292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.x
Are you sure you want to change the base?
Implement metrics for external queue #4292
Conversation
Hey @armanbilge! Please ignore some of the previous commits as i got myself into a git command hell :p , Anyways, the PR is now ready to review. Please only check the recent commit. It will clearly show what the code I have changed. Thanks :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work on this!
May I suggest an alternative approach? I think we should make ScalQueue
no longer generic. Internally, it can be backed by queues of AnyRef
. Then, we can adjust its API to be:
def offer(a: Runnable, random: ThreadLocalRandom): Unit
def offer(a: Array[Runnable], random: ThreadLocalRandom): Unit
Then, we can keep the counters inside of ScalQueue
, and increment the appropriate counters in the specific offer
methods.
poll
will have to return AnyRef
and use isInstanceOf
checks to decrement the appropriate counter.
For sure. Give me a day, i will look into it! |
Hey @armanbilge :) I have added the requested changes. Please review! If there is anything to fix again, please let me know! |
Thanks for making those changes! It looks like the code doesn't compile currently. |
Yes! I am sorry, I actually slept after pushing the changes and forgot fixing the issues. I will be doing it right away! ~Atharva |
@armanbilge Actually there were so many references and inter dependencies of how scalqueue was generic, that is why this happened. It seems like there is a lot of code which is written kept in mind that it is generic. However, Ill fix it soon:) |
tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala
Outdated
Show resolved
Hide resolved
Hello @armanbilge ! I have added the changes! Please review :) |
Hey @armanbilge! |
core/jvm-native/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Outdated
Show resolved
Hide resolved
core/jvm-native/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Outdated
Show resolved
Hide resolved
tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala
Outdated
Show resolved
Hide resolved
tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala
Outdated
Show resolved
Hide resolved
tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Arman Bilge <[email protected]>
tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala
Outdated
Show resolved
Hide resolved
hey @armanbilge ! do you need anything else from my side! :) happy to help.. |
@djspiewak do you have any performance concerns here, should we run some benchmarks? |
I would definitely like to run some benchmarks on this one. I'll review the diff more closely though. |
Hi @armanbilge @djspiewak! Hope you're doing well. Just wanted to follow up on this—any updates by chance? |
WorkStealingThreadPool Metrics Implementation
This PR implements metrics for tracking singleton and batch task submissions to the external queue in
WorkStealingThreadPool
, as in issue #4269 .Changes Made:
Added Atomic Counters to WorkStealingThreadPool:
singletonsSubmittedCount
- tracks total singleton task submissionssingletonsPresentCount
- tracks current singleton tasks in the queuebatchesSubmittedCount
- tracks total batch task submissionsbatchesPresentCount
- tracks current batch tasks in the queueUpdated Submission Logic:
submitToExternalQueue()
to increment counters when tasks are addedpollTask()
to decrement present counters when tasks are polledAdded Accessor Methods:
getSingletonsSubmittedCount()
- returns total singleton submissionsgetSingletonsPresentCount()
- returns current singletons in queuegetBatchesSubmittedCount()
- returns total batch submissionsgetBatchesPresentCount()
- returns current batches in queuelogQueueMetrics()
- Helper for logging all metricsAdded Verification Test:

I made a test (which i am not sure about, but It did work -->)
This might confirm that the implementation works as fine as it should be. The code too compiles without any errors, at least in my terminal :)
The test is included in the PR for reference, showing how to access these metrics from client code.