Skip to content

Commit 195c079

Browse files
authored
Merge pull request #1 from gasparian/refactoring
worker pool manager structure refactored
2 parents e6fbb7c + 5ffbebb commit 195c079

File tree

3 files changed

+161
-121
lines changed

3 files changed

+161
-121
lines changed

README.md

+23-26
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,27 @@
44

55
Implementation of worker pool with extendable functionality.
66
By default, I've implemented "round-robin" balancing to evenly distribute jobs across workers.
7-
You need to implement the `Manager` interface in order to use other balancing strategies:
7+
You need to implement the `BalancingStrategy` interface in order to use other balancing strategies:
88
```go
9-
type Manager interface {
10-
ScheduleJob(f JobFunc) (chan Result, error)
9+
type BalancingStrategy interface {
10+
NextWorkerId(workersStats []Stats) int
1111
}
1212
```
1313

1414
API:
1515
```go
16-
New(config Config) *WorkerPool
17-
(wp WorkerPool) GetCurrentJobsNumber() int64
18-
(wp *WorkerPool) GetWorkerStats(workerId int) (Stats, error)
19-
(wp *WorkerPool) TerminateWorker(workerId int) error
20-
(wp *WorkerPool) ReloadWorker(workerId int) error
16+
// worker pool struct private api
17+
(wp *workerPool) getCurrentJobsNumber() int64
18+
(wp *workerPool) getWorkerStats(workerId int) (Stats, error)
19+
(wp *workerPool) terminateWorker(workerId int) error
20+
(wp *workerPool) reloadWorker(workerId int) error
2121

22-
NewRoundRobin(pool *WorkerPool) *RoundRobin
23-
(rr *RoundRobin) ScheduleJob(f JobFunc) (chan Result, error)
22+
// key public structs and methods
23+
New(config Config, balancer BalancingStrategy) *Manager
24+
(m *Manager) ScheduleJob(f JobFunc) (chan Result, error)
25+
26+
NewRoundRobin() *RoundRobin
27+
(rr *RoundRobin) NextWorkerId(workerStats []Stats) int
2428
```
2529

2630
Install:
@@ -37,6 +41,7 @@ import (
3741
wp "github.com/gasparian/worker-pool-go"
3842
)
3943

44+
// Use closure to create job function
4045
func exampleJob(inp int) wp.JobFunc {
4146
return func() wp.Result {
4247
res := inp * inp
@@ -50,33 +55,25 @@ func exampleJob(inp int) wp.JobFunc {
5055
func main() {
5156
config := wp.Config{
5257
NWorkers: 3, // Number of workers to spawn
53-
MaxJobs: 10, // Max jobs in a queue, new jobs will be rejected with error
58+
MaxJobs: 10, // Max jobs in a queue, new jobs will be rejected with an error
5459
}
55-
pool := wp.New(config)
56-
roundRobinPool := wp.NewRoundRobin(pool)
60+
balancer := wp.NewRoundRobin()
61+
pool := wp.New(config, balancer)
5762

5863
nJobs := 50
59-
jobs := make([]chan wp.Result, 0)
64+
results := make([]chan wp.Result, 0)
6065
for i := 0; i < nJobs; i++ {
61-
ch, err := roundRobinPool.ScheduleJob(exampleJob(i))
66+
ch, err := pool.ScheduleJob(exampleJob(i))
6267
if err != nil {
6368
break
6469
}
65-
jobs = append(jobs, ch)
70+
results = append(results, ch)
6671
}
6772

68-
for _, j := range jobs {
69-
res := <-j
73+
for _, r := range results {
74+
res := <-r
7075
fmt.Println(res)
7176
}
72-
73-
for w := 0; w < config.NWorkers; w++ {
74-
s, err := roundRobinPool.GetWorkerStats(w)
75-
if err != nil {
76-
panic(err)
77-
}
78-
fmt.Printf("Worker %v stats: %v\n", w, s)
79-
}
8077
}
8178
```
8279

workerpool.go

+103-71
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,8 @@ type Job struct {
2828

2929
// Stats holds basic stats for the worker
3030
type Stats struct {
31-
ProcessedJobs int64
32-
TotalElapsedTime int64 // NOTE: nanoseconds
33-
}
34-
35-
type workerStats struct {
36-
Stats
37-
mx sync.RWMutex
31+
ProcessedJobs int64
32+
TotalJobsExecutionTime int64 // NOTE: nanoseconds
3833
}
3934

4035
// Config holds needed data to start worker pool
@@ -43,8 +38,14 @@ type Config struct {
4338
MaxJobs int64
4439
}
4540

46-
// WorkerPool holds data needed for pool operation
47-
type WorkerPool struct {
41+
type workerStats struct {
42+
Stats
43+
mx sync.RWMutex
44+
}
45+
46+
// workerPool holds data needed for pool operation
47+
type workerPool struct {
48+
mx sync.RWMutex
4849
config Config
4950
workersChan []chan Job
5051
workersStats []*workerStats
@@ -60,7 +61,7 @@ func worker(jobs chan Job, terminate chan bool, s *workerStats) {
6061
elapsedTime := time.Since(start).Nanoseconds()
6162
s.mx.Lock()
6263
s.ProcessedJobs++
63-
s.TotalElapsedTime += elapsedTime
64+
s.TotalJobsExecutionTime += elapsedTime
6465
s.mx.Unlock()
6566
j.ResultChan <- res
6667
close(j.ResultChan)
@@ -72,69 +73,58 @@ func worker(jobs chan Job, terminate chan bool, s *workerStats) {
7273
}
7374
}
7475

75-
// New created new worker pool
76-
func New(config Config) *WorkerPool {
77-
pool := &WorkerPool{
78-
config: config,
79-
workersChan: make([]chan Job, config.NWorkers),
80-
workersStats: make([]*workerStats, config.NWorkers),
81-
terminateChan: make([]chan bool, config.NWorkers),
82-
}
83-
for w := 0; w < config.NWorkers; w++ {
84-
pool.workersChan[w] = make(chan Job, config.MaxJobs)
85-
pool.terminateChan[w] = make(chan bool)
86-
pool.workersStats[w] = &workerStats{}
87-
go worker(
88-
pool.workersChan[w],
89-
pool.terminateChan[w],
90-
pool.workersStats[w],
91-
)
92-
}
93-
return pool
94-
}
76+
// getCurrentJobsAmount calculates amount of jobs across all workers queues
77+
func (wp *workerPool) getCurrentJobsAmount() int64 {
78+
wp.mx.RLock()
79+
defer wp.mx.RUnlock()
9580

96-
// GetCurrentJobsNumber calculates amount of jobs across all workers queues
97-
func (wp WorkerPool) GetCurrentJobsNumber() int64 {
98-
var currentJobsN int64 = 0
81+
var currentJobsAmount int64 = 0
9982
for _, ch := range wp.workersChan {
100-
currentJobsN += int64(len(ch))
83+
currentJobsAmount += int64(len(ch))
10184
}
102-
return currentJobsN
85+
return currentJobsAmount
10386
}
10487

105-
// GetWorkerStats by specified worker id
106-
func (wp *WorkerPool) GetWorkerStats(workerId int) (Stats, error) {
88+
// getWorkerStats by specified worker id
89+
func (wp *workerPool) getWorkerStats(workerId int) (Stats, error) {
10790
resStats := Stats{}
10891
if workerId > len(wp.workersChan) || workerId < 0 {
10992
return resStats, workerIndexIsOutOfBoundsErr
11093
}
94+
wp.mx.RLock()
11195
s := wp.workersStats[workerId]
96+
wp.mx.RUnlock()
97+
11298
s.mx.RLock()
11399
defer s.mx.RUnlock()
114100
resStats.ProcessedJobs = s.ProcessedJobs
115-
resStats.TotalElapsedTime = s.TotalElapsedTime
101+
resStats.TotalJobsExecutionTime = s.TotalJobsExecutionTime
116102
return resStats, nil
117103
}
118104

119-
// TerminateWorker sends termination signal to the specified worker
120-
func (wp *WorkerPool) TerminateWorker(workerId int) error {
105+
// terminateWorker sends termination signal to the specified worker
106+
func (wp *workerPool) terminateWorker(workerId int) error {
121107
if workerId > len(wp.terminateChan) || workerId < 0 {
122108
return workerIndexIsOutOfBoundsErr
123109
}
110+
wp.mx.RLock()
124111
wp.terminateChan[workerId] <- true
112+
wp.mx.RUnlock()
125113
return nil
126114
}
127115

128-
// ReloadWorker terminates worker by id, and spawns new one
129-
func (wp *WorkerPool) ReloadWorker(workerId int) error {
116+
// reloadWorker terminates worker by id, and spawns new one
117+
func (wp *workerPool) reloadWorker(workerId int) error {
130118
if workerId > len(wp.workersChan) || workerId < 0 {
131119
return workerIndexIsOutOfBoundsErr
132120
}
133-
err := wp.TerminateWorker(workerId)
121+
err := wp.terminateWorker(workerId)
134122
if err != nil {
135123
return err
136124
}
125+
wp.mx.Lock()
137126
wp.workersStats[workerId] = &workerStats{}
127+
wp.mx.Unlock()
138128
go worker(
139129
wp.workersChan[workerId],
140130
wp.terminateChan[workerId],
@@ -143,45 +133,87 @@ func (wp *WorkerPool) ReloadWorker(workerId int) error {
143133
return nil
144134
}
145135

146-
// Manafer interface that holds implementation of balancing strategy
147-
type Manager interface {
148-
ScheduleJob(f JobFunc) (chan Result, error)
136+
// Strategy interface that holds implementation of balancing strategy
137+
type BalancingStrategy interface {
138+
NextWorkerId(workersStats []Stats) int
149139
}
150140

151-
/*------------------------------------------------------------------------*/
152-
// NOTE: default implmementation of the Manager
153-
154-
// RoundRobin evenly distribute jobs across workers
155-
type RoundRobin struct {
156-
*WorkerPool
157-
mx sync.RWMutex
158-
nextWorkerId int
141+
// Manager spawns workers and schedule jobs
142+
type Manager struct {
143+
wp *workerPool
144+
balancer BalancingStrategy
159145
}
160146

161-
// NewRoundRobin ...
162-
func NewRoundRobin(pool *WorkerPool) *RoundRobin {
163-
return &RoundRobin{
164-
WorkerPool: pool,
165-
nextWorkerId: 0,
147+
// New creates new worker pool
148+
func New(config Config, balancer BalancingStrategy) *Manager {
149+
pool := &workerPool{
150+
config: config,
151+
workersChan: make([]chan Job, config.NWorkers),
152+
workersStats: make([]*workerStats, config.NWorkers),
153+
terminateChan: make([]chan bool, config.NWorkers),
166154
}
155+
for w := 0; w < config.NWorkers; w++ {
156+
pool.workersChan[w] = make(chan Job, config.MaxJobs)
157+
pool.terminateChan[w] = make(chan bool)
158+
pool.workersStats[w] = &workerStats{}
159+
go worker(
160+
pool.workersChan[w],
161+
pool.terminateChan[w],
162+
pool.workersStats[w],
163+
)
164+
}
165+
manager := &Manager{
166+
wp: pool,
167+
balancer: balancer,
168+
}
169+
// TODO: add routine that reloads workers
170+
return manager
167171
}
168172

169173
// ScheduleJob puts job in a queue
170-
func (rr *RoundRobin) ScheduleJob(f JobFunc) (chan Result, error) {
171-
rr.mx.RLock()
172-
config := rr.config
173-
nextWorkerId := rr.nextWorkerId
174-
rr.mx.RUnlock()
175-
176-
currentJobsN := rr.GetCurrentJobsNumber()
177-
if currentJobsN >= config.MaxJobs {
174+
func (m *Manager) ScheduleJob(f JobFunc) (chan Result, error) {
175+
m.wp.mx.RLock()
176+
config := m.wp.config
177+
workersStats := make([]Stats, config.NWorkers)
178+
for i, s := range m.wp.workersStats {
179+
s.mx.RLock()
180+
workersStats[i] = s.Stats
181+
s.mx.RUnlock()
182+
}
183+
m.wp.mx.RUnlock()
184+
185+
currentJobsAmount := m.wp.getCurrentJobsAmount()
186+
if currentJobsAmount >= config.MaxJobs {
178187
return nil, maxJobsLimitReachedErr
179188
}
180189
ch := make(chan Result)
181-
rr.workersChan[nextWorkerId] <- Job{f, ch}
182190

183-
rr.mx.Lock()
184-
rr.nextWorkerId = (nextWorkerId + 1) % config.NWorkers
185-
rr.mx.Unlock()
191+
nextWorkerId := m.balancer.NextWorkerId(workersStats)
192+
m.wp.workersChan[nextWorkerId] <- Job{f, ch}
186193
return ch, nil
187194
}
195+
196+
/*------------------------------------------------------------------------*/
197+
198+
// NOTE: default balancing strategy
199+
200+
// RoundRobin evenly distributes jobs across workers
201+
type RoundRobin struct {
202+
mx sync.RWMutex
203+
nextWorkerId int
204+
}
205+
206+
// NewRoundRobin ...
207+
func NewRoundRobin() *RoundRobin {
208+
return &RoundRobin{nextWorkerId: 0}
209+
}
210+
211+
// NextWorkerId generates worker id to schedule the job
212+
func (rr *RoundRobin) NextWorkerId(workerStats []Stats) int {
213+
rr.mx.Lock()
214+
defer rr.mx.Unlock()
215+
216+
workerId := rr.nextWorkerId
217+
rr.nextWorkerId = (workerId + 1) % len(workerStats)
218+
return workerId
219+
}

0 commit comments

Comments
 (0)