Skip to content

Commit ca4865d

Browse files
authored
balancer: automatically stop producers on subchannel state changes (#7663)
1 parent 941102b commit ca4865d

File tree

9 files changed

+133
-320
lines changed

9 files changed

+133
-320
lines changed

balancer/balancer.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,11 @@ type SubConn interface {
142142
Connect()
143143
// GetOrBuildProducer returns a reference to the existing Producer for this
144144
// ProducerBuilder in this SubConn, or, if one does not currently exist,
145-
// creates a new one and returns it. Returns a close function which must
146-
// be called when the Producer is no longer needed.
145+
// creates a new one and returns it. Returns a close function which may be
146+
// called when the Producer is no longer needed. Otherwise the producer
147+
// will automatically be closed upon connection loss or subchannel close.
148+
// Should only be called on a SubConn in state Ready. Otherwise the
149+
// producer will be unable to create streams.
147150
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
148151
// Shutdown shuts down the SubConn gracefully. Any started RPCs will be
149152
// allowed to complete. No future calls should be made on the SubConn.
@@ -452,8 +455,10 @@ type ProducerBuilder interface {
452455
// Build creates a Producer. The first parameter is always a
453456
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
454457
// associated SubConn), but is declared as `any` to avoid a dependency
455-
// cycle. Should also return a close function that will be called when all
456-
// references to the Producer have been given up.
458+
// cycle. Build also returns a close function that will be called when all
459+
// references to the Producer have been given up for a SubConn, or when a
460+
// connectivity state change occurs on the SubConn. The close function
461+
// should always block until all asynchronous cleanup work is completed.
457462
Build(grpcClientConnInterface any) (p Producer, close func())
458463
}
459464

balancer/weightedroundrobin/balancer.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -526,17 +526,21 @@ func (w *weightedSubConn) updateConfig(cfg *lbConfig) {
526526
w.cfg = cfg
527527
w.mu.Unlock()
528528

529-
newPeriod := cfg.OOBReportingPeriod
530529
if cfg.EnableOOBLoadReport == oldCfg.EnableOOBLoadReport &&
531-
newPeriod == oldCfg.OOBReportingPeriod {
530+
cfg.OOBReportingPeriod == oldCfg.OOBReportingPeriod {
532531
// Load reporting wasn't enabled before or after, or load reporting was
533532
// enabled before and after, and had the same period. (Note that with
534533
// load reporting disabled, OOBReportingPeriod is always 0.)
535534
return
536535
}
537-
// (Optionally stop and) start the listener to use the new config's
538-
// settings for OOB reporting.
536+
if w.connectivityState == connectivity.Ready {
537+
// (Re)start the listener to use the new config's settings for OOB
538+
// reporting.
539+
w.updateORCAListener(cfg)
540+
}
541+
}
539542

543+
func (w *weightedSubConn) updateORCAListener(cfg *lbConfig) {
540544
if w.stopORCAListener != nil {
541545
w.stopORCAListener()
542546
}
@@ -545,9 +549,9 @@ func (w *weightedSubConn) updateConfig(cfg *lbConfig) {
545549
return
546550
}
547551
if w.logger.V(2) {
548-
w.logger.Infof("Registering ORCA listener for %v with interval %v", w.SubConn, newPeriod)
552+
w.logger.Infof("Registering ORCA listener for %v with interval %v", w.SubConn, cfg.OOBReportingPeriod)
549553
}
550-
opts := orca.OOBListenerOptions{ReportInterval: time.Duration(newPeriod)}
554+
opts := orca.OOBListenerOptions{ReportInterval: time.Duration(cfg.OOBReportingPeriod)}
551555
w.stopORCAListener = orca.RegisterOOBListener(w.SubConn, w, opts)
552556
}
553557

@@ -569,11 +573,9 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
569573
w.mu.Lock()
570574
w.nonEmptySince = time.Time{}
571575
w.lastUpdated = time.Time{}
576+
cfg := w.cfg
572577
w.mu.Unlock()
573-
case connectivity.Shutdown:
574-
if w.stopORCAListener != nil {
575-
w.stopORCAListener()
576-
}
578+
w.updateORCAListener(cfg)
577579
}
578580

579581
oldCS := w.connectivityState

balancer_wrapper.go

+30-19
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ import (
2424
"sync"
2525

2626
"google.golang.org/grpc/balancer"
27+
"google.golang.org/grpc/codes"
2728
"google.golang.org/grpc/connectivity"
2829
"google.golang.org/grpc/internal"
2930
"google.golang.org/grpc/internal/balancer/gracefulswitch"
3031
"google.golang.org/grpc/internal/channelz"
3132
"google.golang.org/grpc/internal/grpcsync"
3233
"google.golang.org/grpc/resolver"
34+
"google.golang.org/grpc/status"
3335
)
3436

3537
var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
@@ -256,17 +258,20 @@ type acBalancerWrapper struct {
256258
ccb *ccBalancerWrapper // read-only
257259
stateListener func(balancer.SubConnState)
258260

259-
mu sync.Mutex
260-
producers map[balancer.ProducerBuilder]*refCountedProducer
261+
producersMu sync.Mutex
262+
producers map[balancer.ProducerBuilder]*refCountedProducer
261263
}
262264

263265
// updateState is invoked by grpc to push a subConn state update to the
264266
// underlying balancer.
265-
func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error, readyChan chan struct{}) {
267+
func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
266268
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
267269
if ctx.Err() != nil || acbw.ccb.balancer == nil {
268270
return
269271
}
272+
// Invalidate all producers on any state change.
273+
acbw.closeProducers()
274+
270275
// Even though it is optional for balancers, gracefulswitch ensures
271276
// opts.StateListener is set, so this cannot ever be nil.
272277
// TODO: delete this comment when UpdateSubConnState is removed.
@@ -275,15 +280,6 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
275280
setConnectedAddress(&scs, curAddr)
276281
}
277282
acbw.stateListener(scs)
278-
acbw.ac.mu.Lock()
279-
defer acbw.ac.mu.Unlock()
280-
if s == connectivity.Ready {
281-
// When changing states to READY, close stateReadyChan. Wait until
282-
// after we notify the LB policy's listener(s) in order to prevent
283-
// ac.getTransport() from unblocking before the LB policy starts
284-
// tracking the subchannel as READY.
285-
close(readyChan)
286-
}
287283
})
288284
}
289285

@@ -300,16 +296,18 @@ func (acbw *acBalancerWrapper) Connect() {
300296
}
301297

302298
func (acbw *acBalancerWrapper) Shutdown() {
299+
acbw.closeProducers()
303300
acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
304301
}
305302

306303
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
307304
// ready, blocks until it is or ctx expires. Returns an error when the context
308305
// expires or the addrConn is shut down.
309306
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
310-
transport, err := acbw.ac.getTransport(ctx)
311-
if err != nil {
312-
return nil, err
307+
transport := acbw.ac.getReadyTransport()
308+
if transport == nil {
309+
return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")
310+
313311
}
314312
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
315313
}
@@ -334,8 +332,8 @@ type refCountedProducer struct {
334332
}
335333

336334
func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
337-
acbw.mu.Lock()
338-
defer acbw.mu.Unlock()
335+
acbw.producersMu.Lock()
336+
defer acbw.producersMu.Unlock()
339337

340338
// Look up existing producer from this builder.
341339
pData := acbw.producers[pb]
@@ -352,13 +350,26 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
352350
// and delete the refCountedProducer from the map if the total reference
353351
// count goes to zero.
354352
unref := func() {
355-
acbw.mu.Lock()
353+
acbw.producersMu.Lock()
354+
// If closeProducers has already closed this producer instance, refs is
355+
// set to 0, so the check after decrementing will never pass, and the
356+
// producer will not be double-closed.
356357
pData.refs--
357358
if pData.refs == 0 {
358359
defer pData.close() // Run outside the acbw mutex
359360
delete(acbw.producers, pb)
360361
}
361-
acbw.mu.Unlock()
362+
acbw.producersMu.Unlock()
362363
}
363364
return pData.producer, grpcsync.OnceFunc(unref)
364365
}
366+
367+
func (acbw *acBalancerWrapper) closeProducers() {
368+
acbw.producersMu.Lock()
369+
defer acbw.producersMu.Unlock()
370+
for pb, pData := range acbw.producers {
371+
pData.refs = 0
372+
pData.close()
373+
delete(acbw.producers, pb)
374+
}
375+
}

clientconn.go

+9-44
Original file line numberDiff line numberDiff line change
@@ -825,14 +825,13 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
825825
}
826826

827827
ac := &addrConn{
828-
state: connectivity.Idle,
829-
cc: cc,
830-
addrs: copyAddresses(addrs),
831-
scopts: opts,
832-
dopts: cc.dopts,
833-
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
834-
resetBackoff: make(chan struct{}),
835-
stateReadyChan: make(chan struct{}),
828+
state: connectivity.Idle,
829+
cc: cc,
830+
addrs: copyAddresses(addrs),
831+
scopts: opts,
832+
dopts: cc.dopts,
833+
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
834+
resetBackoff: make(chan struct{}),
836835
}
837836
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
838837
// Start with our address set to the first address; this may be updated if
@@ -1179,8 +1178,7 @@ type addrConn struct {
11791178
addrs []resolver.Address // All addresses that the resolver resolved to.
11801179

11811180
// Use updateConnectivityState for updating addrConn's connectivity state.
1182-
state connectivity.State
1183-
stateReadyChan chan struct{} // closed and recreated on every READY state change.
1181+
state connectivity.State
11841182

11851183
backoffIdx int // Needs to be stateful for resetConnectBackoff.
11861184
resetBackoff chan struct{}
@@ -1193,22 +1191,14 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
11931191
if ac.state == s {
11941192
return
11951193
}
1196-
if ac.state == connectivity.Ready {
1197-
// When leaving ready, re-create the ready channel.
1198-
ac.stateReadyChan = make(chan struct{})
1199-
}
1200-
if s == connectivity.Shutdown {
1201-
// Wake any producer waiting to create a stream on the transport.
1202-
close(ac.stateReadyChan)
1203-
}
12041194
ac.state = s
12051195
ac.channelz.ChannelMetrics.State.Store(&s)
12061196
if lastErr == nil {
12071197
channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
12081198
} else {
12091199
channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
12101200
}
1211-
ac.acbw.updateState(s, ac.curAddr, lastErr, ac.stateReadyChan)
1201+
ac.acbw.updateState(s, ac.curAddr, lastErr)
12121202
}
12131203

12141204
// adjustParams updates parameters used to create transports upon
@@ -1512,31 +1502,6 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {
15121502
return nil
15131503
}
15141504

1515-
// getTransport waits until the addrconn is ready and returns the transport.
1516-
// If the context expires first, returns an appropriate status. If the
1517-
// addrConn is stopped first, returns an Unavailable status error.
1518-
func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) {
1519-
for ctx.Err() == nil {
1520-
ac.mu.Lock()
1521-
t, state, readyChan := ac.transport, ac.state, ac.stateReadyChan
1522-
ac.mu.Unlock()
1523-
if state == connectivity.Shutdown {
1524-
// Return an error immediately in only this case since a connection
1525-
// will never occur.
1526-
return nil, status.Errorf(codes.Unavailable, "SubConn shutting down")
1527-
}
1528-
1529-
select {
1530-
case <-ctx.Done():
1531-
case <-readyChan:
1532-
if state == connectivity.Ready {
1533-
return t, nil
1534-
}
1535-
}
1536-
}
1537-
return nil, status.FromContextError(ctx.Err()).Err()
1538-
}
1539-
15401505
// tearDown starts to tear down the addrConn.
15411506
//
15421507
// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct

interop/orcalb.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ func (orcabb) Name() string {
4646
}
4747

4848
type orcab struct {
49-
cc balancer.ClientConn
50-
sc balancer.SubConn
51-
cancelWatch func()
49+
cc balancer.ClientConn
50+
sc balancer.SubConn
5251

5352
reportMu sync.Mutex
5453
report *v3orcapb.OrcaLoadReport
@@ -70,7 +69,6 @@ func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
7069
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
7170
return nil
7271
}
73-
o.cancelWatch = orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
7472
o.sc.Connect()
7573
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
7674
return nil
@@ -89,6 +87,7 @@ func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnSt
8987
func (o *orcab) updateSubConnState(state balancer.SubConnState) {
9088
switch state.ConnectivityState {
9189
case connectivity.Ready:
90+
orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
9291
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}})
9392
case connectivity.TransientFailure:
9493
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))})
@@ -102,9 +101,7 @@ func (o *orcab) updateSubConnState(state balancer.SubConnState) {
102101
}
103102
}
104103

105-
func (o *orcab) Close() {
106-
o.cancelWatch()
107-
}
104+
func (o *orcab) Close() {}
108105

109106
func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
110107
o.reportMu.Lock()

orca/producer.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
4646
backoff: internal.DefaultBackoffFunc,
4747
}
4848
return p, func() {
49+
p.mu.Lock()
50+
if p.stop != nil {
51+
p.stop()
52+
p.stop = nil
53+
}
54+
p.mu.Unlock()
4955
<-p.stopped
5056
}
5157
}
@@ -67,19 +73,16 @@ type OOBListenerOptions struct {
6773
ReportInterval time.Duration
6874
}
6975

70-
// RegisterOOBListener registers an out-of-band load report listener on sc.
71-
// Any OOBListener may only be registered once per subchannel at a time. The
72-
// returned stop function must be called when no longer needed. Do not
76+
// RegisterOOBListener registers an out-of-band load report listener on a Ready
77+
// sc. Any OOBListener may only be registered once per subchannel at a time.
78+
// The returned stop function must be called when no longer needed. Do not
7379
// register a single OOBListener more than once per SubConn.
7480
func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
7581
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
7682
p := pr.(*producer)
7783

7884
p.registerListener(l, opts.ReportInterval)
7985

80-
// TODO: When we can register for SubConn state updates, automatically call
81-
// stop() on SHUTDOWN.
82-
8386
// If stop is called multiple times, prevent it from having any effect on
8487
// subsequent calls.
8588
return grpcsync.OnceFunc(func() {
@@ -96,13 +99,13 @@ type producer struct {
9699
// is incremented when stream errors occur and is reset when the stream
97100
// reports a result.
98101
backoff func(int) time.Duration
102+
stopped chan struct{} // closed when the run goroutine exits
99103

100104
mu sync.Mutex
101105
intervals map[time.Duration]int // map from interval time to count of listeners requesting that time
102106
listeners map[OOBListener]struct{} // set of registered listeners
103107
minInterval time.Duration
104-
stop func() // stops the current run goroutine
105-
stopped chan struct{} // closed when the run goroutine exits
108+
stop func() // stops the current run goroutine
106109
}
107110

108111
// registerListener adds the listener and its requested report interval to the

0 commit comments

Comments
 (0)