Skip to content

Commit 767739e

Browse files
authored
feat: embedded cluster manager websocket (#5015)
* feat: embedded cluster manager websocket
1 parent 2417b58 commit 767739e

File tree

12 files changed

+461
-0
lines changed

12 files changed

+461
-0
lines changed

pkg/apiserver/server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,15 @@ func Start(params *APIServerParams) {
165165

166166
handlers.RegisterUnauthenticatedRoutes(handler, kotsStore, debugRouter, loggingRouter)
167167

168+
/**********************************************************************
169+
* Websocket routes (only for embedded cluster)
170+
**********************************************************************/
171+
172+
if util.IsEmbeddedCluster() {
173+
wsRouter := r.NewRoute().Subrouter()
174+
wsRouter.HandleFunc("/ec-ws", handler.ConnectToECWebsocket)
175+
}
176+
168177
/**********************************************************************
169178
* KOTS token auth routes
170179
**********************************************************************/

pkg/handlers/debug.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package handlers
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/replicatedhq/kots/pkg/websocket"
7+
websockettypes "github.com/replicatedhq/kots/pkg/websocket/types"
8+
)
9+
10+
type DebugInfoResponse struct {
11+
WSClients map[string]websockettypes.WSClient `json:"wsClients"`
12+
}
13+
14+
func (h *Handler) GetDebugInfo(w http.ResponseWriter, r *http.Request) {
15+
response := DebugInfoResponse{
16+
WSClients: websocket.GetClients(),
17+
}
18+
19+
JSON(w, http.StatusOK, response)
20+
}

pkg/handlers/handlers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,10 @@ func RegisterSessionAuthRoutes(r *mux.Router, kotsStore store.Store, handler KOT
331331
r.Name("ChangePassword").Path("/api/v1/password/change").Methods("PUT").
332332
HandlerFunc(middleware.EnforceAccess(policy.PasswordChange, handler.ChangePassword))
333333

334+
// Debug info
335+
r.Name("GetDebugInfo").Path("/api/v1/debug").Methods("GET").
336+
HandlerFunc(middleware.EnforceAccess(policy.ClusterRead, handler.GetDebugInfo))
337+
334338
// Upgrade service
335339
r.Name("StartUpgradeService").Path("/api/v1/app/{appSlug}/start-upgrade-service").Methods("POST").
336340
HandlerFunc(middleware.EnforceAccess(policy.AppUpdate, handler.StartUpgradeService))

pkg/handlers/handlers_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,6 +1429,16 @@ var HandlerPolicyTests = map[string][]HandlerPolicyTest{
14291429
ExpectStatus: http.StatusOK,
14301430
},
14311431
},
1432+
"GetDebugInfo": {
1433+
{
1434+
Roles: []rbactypes.Role{rbac.ClusterAdminRole},
1435+
SessionRoles: []string{rbac.ClusterAdminRoleID},
1436+
Calls: func(storeRecorder *mock_store.MockStoreMockRecorder, handlerRecorder *mock_handlers.MockKOTSHandlerMockRecorder) {
1437+
handlerRecorder.GetDebugInfo(gomock.Any(), gomock.Any())
1438+
},
1439+
ExpectStatus: http.StatusOK,
1440+
},
1441+
},
14321442

14331443
// Upgrade Service
14341444
"StartUpgradeService": {

pkg/handlers/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,13 @@ type KOTSHandler interface {
164164
// Password change
165165
ChangePassword(w http.ResponseWriter, r *http.Request)
166166

167+
// Debug info
168+
GetDebugInfo(w http.ResponseWriter, r *http.Request)
169+
167170
// Upgrade service
168171
StartUpgradeService(w http.ResponseWriter, r *http.Request)
169172
GetUpgradeServiceStatus(w http.ResponseWriter, r *http.Request)
173+
174+
// EC Websocket
175+
ConnectToECWebsocket(w http.ResponseWriter, r *http.Request)
170176
}

pkg/handlers/mock/mock.go

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/handlers/websocket.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package handlers
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/pkg/errors"
7+
"github.com/replicatedhq/kots/pkg/logger"
8+
"github.com/replicatedhq/kots/pkg/websocket"
9+
)
10+
11+
type ConnectToECWebsocketResponse struct {
12+
Error string `json:"error,omitempty"`
13+
}
14+
15+
func (h *Handler) ConnectToECWebsocket(w http.ResponseWriter, r *http.Request) {
16+
response := ConnectToECWebsocketResponse{}
17+
18+
nodeName := r.URL.Query().Get("nodeName")
19+
if nodeName == "" {
20+
response.Error = "missing node name"
21+
logger.Error(errors.New(response.Error))
22+
JSON(w, http.StatusBadRequest, response)
23+
return
24+
}
25+
26+
if err := websocket.Connect(w, r, nodeName); err != nil {
27+
response.Error = "failed to establish websocket connection"
28+
logger.Error(errors.Wrap(err, response.Error))
29+
JSON(w, http.StatusInternalServerError, response)
30+
return
31+
}
32+
}

pkg/websocket/types/types.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package types
2+
3+
import (
4+
"time"
5+
6+
"github.com/gorilla/websocket"
7+
)
8+
9+
type WSClient struct {
10+
Conn *websocket.Conn `json:"-"`
11+
ConnectedAt time.Time `json:"connectedAt"`
12+
LastPingSent PingPongInfo `json:"lastPingSent"`
13+
LastPongRecv PingPongInfo `json:"lastPongRecv"`
14+
LastPingRecv PingPongInfo `json:"lastPingRecv"`
15+
LastPongSent PingPongInfo `json:"lastPongSent"`
16+
}
17+
18+
type PingPongInfo struct {
19+
Time time.Time `json:"time"`
20+
Message string `json:"message"`
21+
}

pkg/websocket/websocket.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package websocket
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"net"
7+
"net/http"
8+
"sync"
9+
"time"
10+
11+
"github.com/gorilla/websocket"
12+
"github.com/pkg/errors"
13+
"github.com/replicatedhq/kots/pkg/logger"
14+
"github.com/replicatedhq/kots/pkg/websocket/types"
15+
)
16+
17+
var wsUpgrader = websocket.Upgrader{}
18+
var wsClients = make(map[string]types.WSClient)
19+
var wsMutex = sync.Mutex{}
20+
21+
func Connect(w http.ResponseWriter, r *http.Request, nodeName string) error {
22+
conn, err := wsUpgrader.Upgrade(w, r, nil)
23+
if err != nil {
24+
return errors.Wrap(err, "failed to upgrade to websocket")
25+
}
26+
defer conn.Close()
27+
28+
conn.SetPingHandler(wsPingHandler(nodeName, conn))
29+
conn.SetPongHandler(wsPongHandler(nodeName))
30+
conn.SetCloseHandler(wsCloseHandler(nodeName, conn))
31+
32+
// register the client
33+
registerWSClient(nodeName, conn)
34+
35+
// ping client on a regular interval to make sure it's still connected
36+
go pingWSClient(nodeName, conn)
37+
38+
// listen to client messages
39+
listenToWSClient(nodeName, conn)
40+
return nil
41+
}
42+
43+
func pingWSClient(nodeName string, conn *websocket.Conn) {
44+
for {
45+
sleepDuration := time.Second * time.Duration(5+rand.Intn(16)) // 5-20 seconds
46+
time.Sleep(sleepDuration)
47+
48+
pingMsg := fmt.Sprintf("%x", rand.Int())
49+
50+
if err := conn.WriteControl(websocket.PingMessage, []byte(pingMsg), time.Now().Add(1*time.Second)); err != nil {
51+
if isWSConnClosed(nodeName, err) {
52+
removeWSClient(nodeName, err)
53+
return
54+
}
55+
logger.Debugf("Failed to send ping message to %s: %v", nodeName, err)
56+
continue
57+
}
58+
59+
wsMutex.Lock()
60+
client := wsClients[nodeName]
61+
wsMutex.Unlock()
62+
63+
client.LastPingSent = types.PingPongInfo{
64+
Time: time.Now(),
65+
Message: pingMsg,
66+
}
67+
wsClients[nodeName] = client
68+
}
69+
}
70+
71+
func listenToWSClient(nodeName string, conn *websocket.Conn) {
72+
for {
73+
_, _, err := conn.ReadMessage() // this is required to receive ping/pong messages
74+
if err != nil {
75+
if isWSConnClosed(nodeName, err) {
76+
removeWSClient(nodeName, err)
77+
return
78+
}
79+
logger.Debugf("Error reading websocket message from %s: %v", nodeName, err)
80+
}
81+
}
82+
}
83+
84+
func registerWSClient(nodeName string, conn *websocket.Conn) {
85+
wsMutex.Lock()
86+
defer wsMutex.Unlock()
87+
88+
if e, ok := wsClients[nodeName]; ok {
89+
e.Conn.Close()
90+
delete(wsClients, nodeName)
91+
}
92+
93+
wsClients[nodeName] = types.WSClient{
94+
Conn: conn,
95+
ConnectedAt: time.Now(),
96+
}
97+
98+
logger.Infof("Registered new websocket for %s", nodeName)
99+
}
100+
101+
func removeWSClient(nodeName string, err error) {
102+
wsMutex.Lock()
103+
defer wsMutex.Unlock()
104+
105+
if _, ok := wsClients[nodeName]; !ok {
106+
return
107+
}
108+
logger.Infof("Websocket connection closed for %s: %v", nodeName, err)
109+
delete(wsClients, nodeName)
110+
}
111+
112+
func wsPingHandler(nodeName string, conn *websocket.Conn) func(message string) error {
113+
return func(message string) error {
114+
wsMutex.Lock()
115+
defer wsMutex.Unlock()
116+
117+
client := wsClients[nodeName]
118+
client.LastPingRecv = types.PingPongInfo{
119+
Time: time.Now(),
120+
Message: message,
121+
}
122+
123+
if err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(1*time.Second)); err != nil {
124+
logger.Debugf("Failed to send pong message to %s: %v", nodeName, err)
125+
} else {
126+
client.LastPongSent = types.PingPongInfo{
127+
Time: time.Now(),
128+
Message: message,
129+
}
130+
}
131+
132+
wsClients[nodeName] = client
133+
return nil
134+
}
135+
}
136+
137+
func wsPongHandler(nodeName string) func(message string) error {
138+
return func(message string) error {
139+
wsMutex.Lock()
140+
defer wsMutex.Unlock()
141+
142+
client := wsClients[nodeName]
143+
client.LastPongRecv = types.PingPongInfo{
144+
Time: time.Now(),
145+
Message: message,
146+
}
147+
wsClients[nodeName] = client
148+
149+
return nil
150+
}
151+
}
152+
153+
func wsCloseHandler(nodeName string, conn *websocket.Conn) func(code int, text string) error {
154+
return func(code int, text string) error {
155+
logger.Infof("Websocket connection closed for %s: %d (exit code), message: %q", nodeName, code, text)
156+
157+
wsMutex.Lock()
158+
delete(wsClients, nodeName)
159+
wsMutex.Unlock()
160+
161+
message := websocket.FormatCloseMessage(code, text)
162+
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
163+
return nil
164+
}
165+
}
166+
167+
func isWSConnClosed(nodeName string, err error) bool {
168+
wsMutex.Lock()
169+
defer wsMutex.Unlock()
170+
171+
if _, ok := wsClients[nodeName]; !ok {
172+
return true
173+
}
174+
if _, ok := err.(*websocket.CloseError); ok {
175+
return true
176+
}
177+
if e, ok := err.(*net.OpError); ok && !e.Temporary() {
178+
return true
179+
}
180+
return false
181+
}
182+
183+
func GetClients() map[string]types.WSClient {
184+
return wsClients
185+
}

web/src/Root.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import AppLicense from "@components/apps/AppLicense";
3131
import AppRegistrySettings from "@components/apps/AppRegistrySettings";
3232
import AppIdentityServiceSettings from "@components/apps/AppIdentityServiceSettings";
3333
import TroubleshootContainer from "@components/troubleshoot/TroubleshootContainer";
34+
import DebugInfo from "@components/DebugInfo";
3435

3536
import Footer from "./components/shared/Footer";
3637
import NavBar from "./components/shared/NavBar";
@@ -749,6 +750,7 @@ const Root = () => {
749750
/>
750751
<Route path="/crashz" element={<Crashz />} />{" "}
751752
<Route path="*" element={<NotFound />} />
753+
<Route path="/debug" element={<DebugInfo />} />
752754
<Route
753755
path="/secure-console"
754756
element={

0 commit comments

Comments
 (0)