Skip to content

Commit 941102b

Browse files
authored
xds/server: Fix xDS Server leak (#7664)
1 parent 7aee163 commit 941102b

File tree

3 files changed

+125
-15
lines changed

3 files changed

+125
-15
lines changed

xds/internal/server/conn_wrapper.go

+1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func (c *connWrapper) Close() error {
161161
if c.rootProvider != nil {
162162
c.rootProvider.Close()
163163
}
164+
c.parent.removeConn(c)
164165
return c.Conn.Close()
165166
}
166167

xds/internal/server/listener_wrapper.go

+19-15
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func NewListenerWrapper(params ListenerWrapperParams) net.Listener {
8686
xdsC: params.XDSClient,
8787
modeCallback: params.ModeCallback,
8888
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),
89+
conns: make(map[*connWrapper]bool),
8990

9091
mode: connectivity.ServingModeNotServing,
9192
closed: grpcsync.NewEvent(),
@@ -135,13 +136,13 @@ type listenerWrapper struct {
135136

136137
// mu guards access to the current serving mode and the active filter chain
137138
// manager.
138-
mu sync.RWMutex
139+
mu sync.Mutex
139140
// Current serving mode.
140141
mode connectivity.ServingMode
141142
// Filter chain manager currently serving.
142143
activeFilterChainManager *xdsresource.FilterChainManager
143144
// conns accepted with configuration from activeFilterChainManager.
144-
conns []*connWrapper
145+
conns map[*connWrapper]bool
145146

146147
// These fields are read/written to in the context of xDS updates, which are
147148
// guaranteed to be emitted synchronously from the xDS Client. Thus, they do
@@ -202,17 +203,14 @@ func (l *listenerWrapper) maybeUpdateFilterChains() {
202203
// gracefully shut down with a grace period of 10 minutes for long-lived
203204
// RPC's, such that clients will reconnect and have the updated
204205
// configuration apply." - A36
205-
var connsToClose []*connWrapper
206-
if l.activeFilterChainManager != nil { // If there is a filter chain manager to clean up.
207-
connsToClose = l.conns
208-
l.conns = nil
209-
}
206+
connsToClose := l.conns
207+
l.conns = make(map[*connWrapper]bool)
210208
l.activeFilterChainManager = l.pendingFilterChainManager
211209
l.pendingFilterChainManager = nil
212210
l.instantiateFilterChainRoutingConfigurationsLocked()
213211
l.mu.Unlock()
214212
go func() {
215-
for _, conn := range connsToClose {
213+
for conn := range connsToClose {
216214
conn.Drain()
217215
}
218216
}()
@@ -304,15 +302,15 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
304302
return nil, fmt.Errorf("received connection with non-TCP address (local: %T, remote %T)", conn.LocalAddr(), conn.RemoteAddr())
305303
}
306304

307-
l.mu.RLock()
305+
l.mu.Lock()
308306
if l.mode == connectivity.ServingModeNotServing {
309307
// Close connections as soon as we accept them when we are in
310308
// "not-serving" mode. Since we accept a net.Listener from the user
311309
// in Serve(), we cannot close the listener when we move to
312310
// "not-serving". Closing the connection immediately upon accepting
313311
// is one of the other ways to implement the "not-serving" mode as
314312
// outlined in gRFC A36.
315-
l.mu.RUnlock()
313+
l.mu.Unlock()
316314
conn.Close()
317315
continue
318316
}
@@ -324,7 +322,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
324322
SourcePort: srcAddr.Port,
325323
})
326324
if err != nil {
327-
l.mu.RUnlock()
325+
l.mu.Unlock()
328326
// When a matching filter chain is not found, we close the
329327
// connection right away, but do not return an error back to
330328
// `grpc.Serve()` from where this Accept() was invoked. Returning an
@@ -341,12 +339,18 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
341339
continue
342340
}
343341
cw := &connWrapper{Conn: conn, filterChain: fc, parent: l, urc: fc.UsableRouteConfiguration}
344-
l.conns = append(l.conns, cw)
345-
l.mu.RUnlock()
342+
l.conns[cw] = true
343+
l.mu.Unlock()
346344
return cw, nil
347345
}
348346
}
349347

348+
func (l *listenerWrapper) removeConn(conn *connWrapper) {
349+
l.mu.Lock()
350+
defer l.mu.Unlock()
351+
delete(l.conns, conn)
352+
}
353+
350354
// Close closes the underlying listener. It also cancels the xDS watch
351355
// registered in Serve() and closes any certificate provider instances created
352356
// based on security configuration received in the LDS response.
@@ -376,9 +380,9 @@ func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err
376380
l.mode = newMode
377381
if l.mode == connectivity.ServingModeNotServing {
378382
connsToClose := l.conns
379-
l.conns = nil
383+
l.conns = make(map[*connWrapper]bool)
380384
go func() {
381-
for _, conn := range connsToClose {
385+
for conn := range connsToClose {
382386
conn.Drain()
383387
}
384388
}()

xds/internal/server/listener_wrapper_test.go

+105
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@ import (
2323
"fmt"
2424
"net"
2525
"strconv"
26+
"sync"
2627
"testing"
28+
"time"
2729

30+
"google.golang.org/grpc"
2831
"google.golang.org/grpc/connectivity"
32+
"google.golang.org/grpc/credentials/insecure"
2933
"google.golang.org/grpc/internal"
3034
"google.golang.org/grpc/internal/testutils"
3135
"google.golang.org/grpc/internal/testutils/xds/e2e"
36+
testgrpc "google.golang.org/grpc/interop/grpc_testing"
37+
testpb "google.golang.org/grpc/interop/grpc_testing"
3238
xdsinternal "google.golang.org/grpc/xds/internal"
3339
"google.golang.org/grpc/xds/internal/xdsclient"
3440
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
@@ -151,5 +157,104 @@ func (s) TestListenerWrapper(t *testing.T) {
151157
t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeNotServing)
152158
}
153159
}
160+
}
161+
162+
type testService struct {
163+
testgrpc.TestServiceServer
164+
}
165+
166+
func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
167+
return &testpb.Empty{}, nil
168+
}
169+
170+
// TestConnsCleanup tests that the listener wrapper clears it's connection
171+
// references when connections close. It sets up a listener wrapper and gRPC
172+
// Server, and connects to the server 100 times and makes an RPC each time, and
173+
// then closes the connection. After these 100 connections Close, the listener
174+
// wrapper should have no more references to any connections.
175+
func (s) TestConnsCleanup(t *testing.T) {
176+
mgmtServer, nodeID, _, _, xdsC := xdsSetupForTests(t)
177+
lis, err := testutils.LocalTCPListener()
178+
if err != nil {
179+
t.Fatalf("Failed to create a local TCP listener: %v", err)
180+
}
181+
182+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
183+
defer cancel()
184+
185+
modeCh := make(chan connectivity.ServingMode, 1)
186+
vm := verifyMode{
187+
modeCh: modeCh,
188+
}
189+
190+
host, port := hostPortFromListener(t, lis)
191+
lisResourceName := fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port))))
192+
params := ListenerWrapperParams{
193+
Listener: lis,
194+
ListenerResourceName: lisResourceName,
195+
XDSClient: xdsC,
196+
ModeCallback: vm.verifyModeCallback,
197+
}
198+
lw := NewListenerWrapper(params)
199+
if lw == nil {
200+
t.Fatalf("NewListenerWrapper(%+v) returned nil", params)
201+
}
202+
defer lw.Close()
203+
204+
resources := e2e.UpdateOptions{
205+
NodeID: nodeID,
206+
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, route1)},
207+
SkipValidation: true,
208+
}
209+
if err := mgmtServer.Update(ctx, resources); err != nil {
210+
t.Fatal(err)
211+
}
212+
213+
// Wait for Listener Mode to go serving.
214+
select {
215+
case <-ctx.Done():
216+
t.Fatalf("timeout waiting for mode change")
217+
case mode := <-modeCh:
218+
if mode != connectivity.ServingModeServing {
219+
t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeServing)
220+
}
221+
}
222+
223+
server := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
224+
testgrpc.RegisterTestServiceServer(server, &testService{})
225+
wg := sync.WaitGroup{}
226+
go func() {
227+
if err := server.Serve(lw); err != nil {
228+
t.Errorf("failed to serve: %v", err)
229+
}
230+
}()
231+
232+
// Make 100 connections to the server, and make an RPC on each one.
233+
for i := 0; i < 100; i++ {
234+
cc, err := grpc.NewClient(lw.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
235+
if err != nil {
236+
t.Fatalf("grpc.NewClient failed with err: %v", err)
237+
}
238+
client := testgrpc.NewTestServiceClient(cc)
239+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
240+
t.Fatalf("client.EmptyCall() failed: %v", err)
241+
}
242+
cc.Close()
243+
}
244+
245+
lisWrapper := lw.(*listenerWrapper)
246+
// Eventually when the server processes the connection shutdowns, the
247+
// listener wrapper should clear its references to the wrapped connections.
248+
lenConns := 1
249+
for ; ctx.Err() == nil && lenConns > 0; <-time.After(time.Millisecond) {
250+
lisWrapper.mu.Lock()
251+
lenConns = len(lisWrapper.conns)
252+
lisWrapper.mu.Unlock()
253+
}
254+
if lenConns > 0 {
255+
t.Fatalf("timeout waiting for lis wrapper conns to clear, size: %v", lenConns)
256+
}
154257

258+
server.Stop()
259+
wg.Wait()
155260
}

0 commit comments

Comments
 (0)