Skip to content

Commit 73637ae

Browse files
author
gas
committed
manager --> worker pool
1 parent 2e0f02b commit 73637ae

File tree

3 files changed

+76
-95
lines changed

3 files changed

+76
-95
lines changed

README.md

+15-20
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,18 @@ type BalancingStrategy interface {
1414
API:
1515
```go
1616
// worker pool struct private api
17-
(wp *workerPool) getCurrentJobsNumber() int64
18-
(wp *workerPool) getWorkerStats(workerId int) (Stats, error)
19-
(wp *workerPool) terminateWorker(workerId int) error
20-
(wp *workerPool) reloadWorker(workerId int) error
17+
(wp *WorkerPool) getCurrentJobsNumber() int64
18+
(wp *WorkerPool) getWorkerStats(workerId int) (Stats, error)
19+
(wp *WorkerPool) terminateWorker(workerId int) error
20+
(wp *WorkerPool) reloadWorker(workerId int) error
2121

22-
// key public structs and methods
23-
// Config holds worker pool params, like
24-
// it's size and max jobs per worker (without blocking)
25-
type Config struct {
26-
NWorkers int
27-
MaxJobs int64
28-
}
29-
// New creates new instance of workers pool manager
30-
New(config Config, balancer BalancingStrategy) *Manager
22+
// worker pool public methods
23+
// New creates new instance of workers pool
24+
// params defining it's size and max jobs per worker (without blocking)
25+
NewWorkerPool(nWorkers, maxJobs uint, balancer BalancingStrategy) *WorkerPool
3126
// ScheduleJob puts new job in some worker queue
32-
(m *Manager) ScheduleJob(f JobFunc) (chan Result, error)
27+
(wp *WorkerPool) ScheduleJob(f JobFunc) (chan Result, error)
28+
3329
// NewRoundRobin creates new instance of the "load balancer"
3430
NewRoundRobin() *RoundRobin
3531
// NextWorkerId returns worker id selected by some
@@ -63,13 +59,12 @@ func exampleJob(inp int) wp.JobFunc {
6359
}
6460

6561
func main() {
66-
config := wp.Config{
67-
NWorkers: 3, // NOTE: Number of workers to spawn
68-
MaxJobs: 10, // NOTE: Max length of the buffered
69-
// channel with submitted jobs (per worker)
70-
}
71-
balancer := wp.NewRoundRobin()
7262
pool := wp.New(config, balancer)
63+
pool := NewWorkerPool(
64+
3,
65+
10,
66+
NewRoundRobin(),
67+
)
7368

7469
// NOTE: if number of jobs per woker will be > MaxJobs,
7570
// ScheduleJob func will block

workerpool.go

+41-55
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,13 @@ type Job struct {
2727

2828
// Stats holds basic stats for the worker
2929
type Stats struct {
30-
ProcessedJobs int64
30+
ProcessedJobs uint
3131
TotalJobsExecutionTime int64 // NOTE: nanoseconds
3232
}
3333

34-
// Config holds needed data to start worker pool
35-
type Config struct {
36-
NWorkers int
37-
MaxJobs int64
34+
// Strategy interface that holds implementation of balancing strategy
35+
type BalancingStrategy interface {
36+
NextWorkerId(workersStats []Stats) int
3837
}
3938

4039
type workerStats struct {
@@ -43,12 +42,36 @@ type workerStats struct {
4342
}
4443

4544
// workerPool holds data needed for pool operation
46-
type workerPool struct {
45+
type WorkerPool struct {
4746
mx sync.RWMutex
48-
config Config
47+
nWorkers uint
4948
workersChan []chan Job
5049
workersStats []*workerStats
5150
terminateChan []chan bool
51+
balancer BalancingStrategy
52+
}
53+
54+
// NewWorkerPool creates new worker pool
55+
func NewWorkerPool(nWorkers, maxJobs uint, balancer BalancingStrategy) *WorkerPool {
56+
pool := &WorkerPool{
57+
nWorkers: nWorkers,
58+
workersChan: make([]chan Job, nWorkers),
59+
workersStats: make([]*workerStats, nWorkers),
60+
terminateChan: make([]chan bool, nWorkers),
61+
balancer: balancer,
62+
}
63+
var w uint
64+
for w = 0; w < nWorkers; w++ {
65+
pool.workersChan[w] = make(chan Job, maxJobs)
66+
pool.terminateChan[w] = make(chan bool)
67+
pool.workersStats[w] = &workerStats{}
68+
go worker(
69+
pool.workersChan[w],
70+
pool.terminateChan[w],
71+
pool.workersStats[w],
72+
)
73+
}
74+
return pool
5275
}
5376

5477
func worker(jobs chan Job, terminate chan bool, s *workerStats) {
@@ -74,7 +97,7 @@ func worker(jobs chan Job, terminate chan bool, s *workerStats) {
7497
}
7598

7699
// getWorkerStats by specified worker id
77-
func (wp *workerPool) getWorkerStats(workerId int) (Stats, error) {
100+
func (wp *WorkerPool) getWorkerStats(workerId int) (Stats, error) {
78101
resStats := Stats{}
79102
if workerId > len(wp.workersChan) || workerId < 0 {
80103
return resStats, workerIndexIsOutOfBoundsErr
@@ -91,7 +114,7 @@ func (wp *workerPool) getWorkerStats(workerId int) (Stats, error) {
91114
}
92115

93116
// terminateWorker sends termination signal to the specified worker
94-
func (wp *workerPool) terminateWorker(workerId int) error {
117+
func (wp *WorkerPool) terminateWorker(workerId int) error {
95118
if workerId > len(wp.terminateChan) || workerId < 0 {
96119
return workerIndexIsOutOfBoundsErr
97120
}
@@ -102,7 +125,7 @@ func (wp *workerPool) terminateWorker(workerId int) error {
102125
}
103126

104127
// reloadWorker terminates worker by id, and spawns new one
105-
func (wp *workerPool) reloadWorker(workerId int) error {
128+
func (wp *WorkerPool) reloadWorker(workerId int) error {
106129
if workerId > len(wp.workersChan) || workerId < 0 {
107130
return workerIndexIsOutOfBoundsErr
108131
}
@@ -121,57 +144,20 @@ func (wp *workerPool) reloadWorker(workerId int) error {
121144
return nil
122145
}
123146

124-
// Strategy interface that holds implementation of balancing strategy
125-
type BalancingStrategy interface {
126-
NextWorkerId(workersStats []Stats) int
127-
}
128-
129-
// Manager spawns workers and schedule jobs
130-
type Manager struct {
131-
wp *workerPool
132-
balancer BalancingStrategy
133-
}
134-
135-
// New creates new worker pool
136-
func New(config Config, balancer BalancingStrategy) *Manager {
137-
pool := &workerPool{
138-
config: config,
139-
workersChan: make([]chan Job, config.NWorkers),
140-
workersStats: make([]*workerStats, config.NWorkers),
141-
terminateChan: make([]chan bool, config.NWorkers),
142-
}
143-
for w := 0; w < config.NWorkers; w++ {
144-
pool.workersChan[w] = make(chan Job, config.MaxJobs)
145-
pool.terminateChan[w] = make(chan bool)
146-
pool.workersStats[w] = &workerStats{}
147-
go worker(
148-
pool.workersChan[w],
149-
pool.terminateChan[w],
150-
pool.workersStats[w],
151-
)
152-
}
153-
manager := &Manager{
154-
wp: pool,
155-
balancer: balancer,
156-
}
157-
// TODO: add routine that reloads workers
158-
return manager
159-
}
160-
161147
// ScheduleJob puts job in a queue
162-
func (m *Manager) ScheduleJob(f JobFunc) chan Result {
163-
m.wp.mx.RLock()
164-
config := m.wp.config
165-
workersStats := make([]Stats, config.NWorkers)
166-
for i, s := range m.wp.workersStats {
148+
func (wp *WorkerPool) ScheduleJob(f JobFunc) chan Result {
149+
wp.mx.RLock()
150+
nWorkers := wp.nWorkers
151+
workersStats := make([]Stats, nWorkers)
152+
for i, s := range wp.workersStats {
167153
s.mx.RLock()
168154
workersStats[i] = s.Stats
169155
s.mx.RUnlock()
170156
}
171-
m.wp.mx.RUnlock()
157+
wp.mx.RUnlock()
172158
ch := make(chan Result, 1)
173-
nextWorkerId := m.balancer.NextWorkerId(workersStats)
174-
m.wp.workersChan[nextWorkerId] <- Job{f, ch}
159+
nextWorkerId := wp.balancer.NextWorkerId(workersStats)
160+
wp.workersChan[nextWorkerId] <- Job{f, ch}
175161
return ch
176162
}
177163

workerpool_test.go

+20-20
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,21 @@ func exampleJob(inp int) JobFunc {
2424
}
2525

2626
func TestPool(t *testing.T) {
27-
config := Config{
28-
NWorkers: 3,
29-
MaxJobs: 10,
30-
}
31-
balancer := NewRoundRobin()
32-
roundRobinPool := New(config, balancer)
33-
wp := roundRobinPool.wp
27+
nWorkers := 3
28+
maxJobs := 10
29+
wp := NewWorkerPool(
30+
uint(nWorkers),
31+
uint(maxJobs),
32+
NewRoundRobin(),
33+
)
3434

3535
var actualProcessedJobs float64 = 0.0
3636
nJobs := 50
3737

3838
t.Run("RunJobsSync", func(t *testing.T) {
3939
results := make([]chan Result, 0)
4040
for i := 0; i < nJobs; i++ {
41-
ch := roundRobinPool.ScheduleJob(exampleJob(i))
41+
ch := wp.ScheduleJob(exampleJob(i))
4242
results = append(results, ch)
4343
}
4444

@@ -57,7 +57,7 @@ func TestPool(t *testing.T) {
5757

5858
t.Run("CheckStats", func(t *testing.T) {
5959
var averageProcessedJobs float64 = 0.0
60-
for w := 0; w < config.NWorkers; w++ {
60+
for w := 0; w < nWorkers; w++ {
6161
s, err := wp.getWorkerStats(w)
6262
if err != nil {
6363
t.Fatal(err)
@@ -68,8 +68,8 @@ func TestPool(t *testing.T) {
6868
averageProcessedJobs += float64(s.ProcessedJobs)
6969
t.Logf("Worker %v stats: %v\n", w, s) // Worker 0 stats: {4 1335} ...
7070
}
71-
averageProcessedJobs /= float64(config.NWorkers)
72-
idealAverageProcessedJobs := actualProcessedJobs / float64(config.NWorkers)
71+
averageProcessedJobs /= float64(nWorkers)
72+
idealAverageProcessedJobs := actualProcessedJobs / float64(nWorkers)
7373
t.Log("Average processed jobs per worker:", averageProcessedJobs, idealAverageProcessedJobs)
7474
if math.Abs(averageProcessedJobs-idealAverageProcessedJobs) > 1 {
7575
t.Error(jobsDistributionNotEvenErr)
@@ -95,7 +95,7 @@ func TestPool(t *testing.T) {
9595
})
9696

9797
t.Run("RunJobsConcurrent", func(t *testing.T) {
98-
for w := 0; w < config.NWorkers; w++ {
98+
for w := 0; w < nWorkers; w++ {
9999
err := wp.reloadWorker(w)
100100
if err != nil {
101101
t.Fatal(err)
@@ -108,7 +108,7 @@ func TestPool(t *testing.T) {
108108
wg.Add(1)
109109
go func(idx int, wg *sync.WaitGroup) {
110110
defer wg.Done()
111-
ch := roundRobinPool.ScheduleJob(exampleJob(idx))
111+
ch := wp.ScheduleJob(exampleJob(idx))
112112
jobs <- ch
113113
}(i, &wg)
114114
}
@@ -122,7 +122,7 @@ func TestPool(t *testing.T) {
122122
}
123123
}
124124

125-
for w := 0; w < config.NWorkers; w++ {
125+
for w := 0; w < nWorkers; w++ {
126126
s, err := wp.getWorkerStats(w)
127127
if err != nil {
128128
t.Fatal(err)
@@ -135,9 +135,9 @@ func TestPool(t *testing.T) {
135135

136136
t.Run("CheckStatsConcurrent", func(t *testing.T) {
137137
wg := sync.WaitGroup{}
138-
stats := make(chan Stats, config.NWorkers)
139-
errs := make(chan error, config.NWorkers)
140-
for w := 0; w < config.NWorkers; w++ {
138+
stats := make(chan Stats, nWorkers)
139+
errs := make(chan error, nWorkers)
140+
for w := 0; w < nWorkers; w++ {
141141
wg.Add(1)
142142
go func(w int, wg *sync.WaitGroup) {
143143
defer wg.Done()
@@ -167,8 +167,8 @@ func TestPool(t *testing.T) {
167167

168168
t.Run("ReloadWorkerConcurrent", func(t *testing.T) {
169169
wg := sync.WaitGroup{}
170-
errs := make(chan error, config.NWorkers)
171-
for w := 0; w < config.NWorkers; w++ {
170+
errs := make(chan error, nWorkers)
171+
for w := 0; w < nWorkers; w++ {
172172
wg.Add(1)
173173
go func(w int, wg *sync.WaitGroup) {
174174
defer wg.Done()
@@ -185,7 +185,7 @@ func TestPool(t *testing.T) {
185185
}
186186
}
187187

188-
for w := 0; w < config.NWorkers; w++ {
188+
for w := 0; w < nWorkers; w++ {
189189
s, err := wp.getWorkerStats(w)
190190
if err != nil {
191191
t.Fatal(err)

0 commit comments

Comments
 (0)