Skip to content

Commit e9dd266

Browse files
committed
Reporting for multi-node EC installations
1 parent 1a79714 commit e9dd266

File tree

12 files changed

+156
-123
lines changed

12 files changed

+156
-123
lines changed

pkg/api/reporting/types/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type ReportingInfo struct {
1515
KURLInstallID string `json:"kurl_install_id" yaml:"kurl_install_id"`
1616
EmbeddedClusterID string `json:"embedded_cluster_id" yaml:"embedded_cluster_id"`
1717
EmbeddedClusterVersion string `json:"embedded_cluster_version" yaml:"embedded_cluster_version"`
18+
EmbeddedClusterNodes string `json:"embedded_cluster_nodes" yaml:"embedded_cluster_nodes"`
1819
IsGitOpsEnabled bool `json:"is_gitops_enabled" yaml:"is_gitops_enabled"`
1920
GitOpsProvider string `json:"gitops_provider" yaml:"gitops_provider"`
2021
SnapshotProvider string `json:"snapshot_provider" yaml:"snapshot_provider"`

pkg/apiserver/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/replicatedhq/kots/pkg/binaries"
1616
"github.com/replicatedhq/kots/pkg/handlers"
1717
identitymigrate "github.com/replicatedhq/kots/pkg/identity/migrate"
18-
"github.com/replicatedhq/kots/pkg/informers"
1918
"github.com/replicatedhq/kots/pkg/k8sutil"
2019
"github.com/replicatedhq/kots/pkg/operator"
2120
operatorclient "github.com/replicatedhq/kots/pkg/operator/client"
@@ -31,6 +30,7 @@ import (
3130
"github.com/replicatedhq/kots/pkg/updatechecker"
3231
"github.com/replicatedhq/kots/pkg/upgradeservice"
3332
"github.com/replicatedhq/kots/pkg/util"
33+
"github.com/replicatedhq/kots/pkg/watchers"
3434
"golang.org/x/crypto/bcrypt"
3535
)
3636

@@ -121,8 +121,8 @@ func Start(params *APIServerParams) {
121121

122122
supportbundle.StartServer()
123123

124-
if err := informers.Start(); err != nil {
125-
log.Println("Failed to start informers:", err)
124+
if err := watchers.Start(op.GetClusterID()); err != nil {
125+
log.Println("Failed to start watchers:", err)
126126
}
127127

128128
if err := updatechecker.Start(); err != nil {
File renamed without changes.
File renamed without changes.

pkg/informers/informers.go

Lines changed: 0 additions & 116 deletions
This file was deleted.

pkg/operator/operator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ func startLoop(fn func(), intervalInSeconds time.Duration) {
126126
}()
127127
}
128128

129+
func (o *Operator) GetClusterID() string {
130+
return o.clusterID
131+
}
132+
129133
func (o *Operator) resumeDeployments() {
130134
apps, err := o.store.ListAppsForDownstream(o.clusterID)
131135
if err != nil {

pkg/reporting/app.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
downstreamtypes "github.com/replicatedhq/kots/pkg/api/downstream/types"
1212
"github.com/replicatedhq/kots/pkg/api/reporting/types"
1313
"github.com/replicatedhq/kots/pkg/buildversion"
14+
"github.com/replicatedhq/kots/pkg/embeddedcluster"
1415
"github.com/replicatedhq/kots/pkg/gitops"
1516
"github.com/replicatedhq/kots/pkg/k8sutil"
1617
"github.com/replicatedhq/kots/pkg/kotsadm"
@@ -157,6 +158,7 @@ func GetReportingInfo(appID string) *types.ReportingInfo {
157158
r.AppStatus = string(appStatus.State)
158159
}
159160

161+
// kurl
160162
r.IsKurl, err = kurl.IsKurl(clientset)
161163
if err != nil {
162164
logger.Debugf(errors.Wrap(err, "failed to check if cluster is kurl").Error())
@@ -167,11 +169,28 @@ func GetReportingInfo(appID string) *types.ReportingInfo {
167169
if err != nil {
168170
logger.Debugf(errors.Wrap(err, "failed to get kurl nodes").Error())
169171
}
172+
if kurlNodes != nil {
173+
for _, kurlNode := range kurlNodes.Nodes {
174+
r.KurlNodeCountTotal++
175+
if kurlNode.IsConnected && kurlNode.IsReady {
176+
r.KurlNodeCountReady++
177+
}
178+
}
179+
}
180+
}
170181

171-
for _, kurlNode := range kurlNodes.Nodes {
172-
r.KurlNodeCountTotal++
173-
if kurlNode.IsConnected && kurlNode.IsReady {
174-
r.KurlNodeCountReady++
182+
// embedded cluster
183+
if util.IsEmbeddedCluster() && clientset != nil {
184+
ecNodes, err := embeddedcluster.GetNodes(context.TODO(), clientset)
185+
if err != nil {
186+
logger.Debugf("failed to get embedded cluster nodes: %v", err.Error())
187+
}
188+
if ecNodes != nil {
189+
marshalled, err := json.Marshal(ecNodes.Nodes)
190+
if err != nil {
191+
logger.Debugf("failed to marshal embedded cluster node: %v", err.Error())
192+
} else {
193+
r.EmbeddedClusterNodes = string(marshalled)
175194
}
176195
}
177196
}

pkg/reporting/app_airgap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func BuildInstanceReport(licenseID string, reportingInfo *types.ReportingInfo) *
6969
KurlInstallID: reportingInfo.KURLInstallID,
7070
EmbeddedClusterID: reportingInfo.EmbeddedClusterID,
7171
EmbeddedClusterVersion: reportingInfo.EmbeddedClusterVersion,
72+
EmbeddedClusterNodes: reportingInfo.EmbeddedClusterNodes,
7273
IsGitOpsEnabled: reportingInfo.IsGitOpsEnabled,
7374
GitOpsProvider: reportingInfo.GitOpsProvider,
7475
SnapshotProvider: reportingInfo.SnapshotProvider,

pkg/reporting/instance_report.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type InstanceReportEvent struct {
2929
KurlInstallID string `json:"kurl_install_id,omitempty"`
3030
EmbeddedClusterID string `json:"embedded_cluster_id,omitempty"`
3131
EmbeddedClusterVersion string `json:"embedded_cluster_version,omitempty"`
32+
EmbeddedClusterNodes string `json:"embedded_cluster_nodes"`
3233
IsGitOpsEnabled bool `json:"is_gitops_enabled"`
3334
GitOpsProvider string `json:"gitops_provider"`
3435
SnapshotProvider string `json:"snapshot_provider"`

pkg/reporting/report_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ func createTestInstanceEvent(reportedAt int64) InstanceReportEvent {
353353
KurlInstallID: "test-kurl-install-id",
354354
EmbeddedClusterID: "test-embedded-cluster-id",
355355
EmbeddedClusterVersion: "test-embedded-cluster-version",
356+
EmbeddedClusterNodes: `[{"name":"node-1","isReady":true},{"name":"node-2","isReady":false},{"name":"node-3","isReady":true},{"name":"node-4","isReady":false}]`,
356357
IsGitOpsEnabled: true,
357358
GitOpsProvider: "test-gitops-provider",
358359
SnapshotProvider: "test-snapshot-provider",

pkg/reporting/util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func GetReportingInfoHeaders(reportingInfo *types.ReportingInfo) map[string]stri
6868
if reportingInfo.EmbeddedClusterVersion != "" {
6969
headers["X-Replicated-EmbeddedClusterVersion"] = reportingInfo.EmbeddedClusterVersion
7070
}
71+
if reportingInfo.EmbeddedClusterNodes != "" {
72+
headers["X-Replicated-EmbeddedClusterNodes"] = reportingInfo.EmbeddedClusterNodes
73+
}
7174

7275
headers["X-Replicated-KurlNodeCountTotal"] = strconv.Itoa(reportingInfo.KurlNodeCountTotal)
7376
headers["X-Replicated-KurlNodeCountReady"] = strconv.Itoa(reportingInfo.KurlNodeCountReady)

pkg/watchers/watchers.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package watchers
2+
3+
import (
4+
"context"
5+
6+
"github.com/pkg/errors"
7+
"github.com/replicatedhq/kots/pkg/k8sutil"
8+
"github.com/replicatedhq/kots/pkg/logger"
9+
"github.com/replicatedhq/kots/pkg/reporting"
10+
"github.com/replicatedhq/kots/pkg/store"
11+
"github.com/replicatedhq/kots/pkg/util"
12+
corev1 "k8s.io/api/core/v1"
13+
"k8s.io/client-go/informers"
14+
"k8s.io/client-go/kubernetes"
15+
"k8s.io/client-go/tools/cache"
16+
)
17+
18+
func Start(clusterID string) error {
19+
cfg, err := k8sutil.GetClusterConfig()
20+
if err != nil {
21+
return errors.Wrap(err, "get cluster config")
22+
}
23+
24+
clientset, err := kubernetes.NewForConfig(cfg)
25+
if err != nil {
26+
return errors.Wrap(err, "create clientset")
27+
}
28+
29+
if err := watchECNodes(clientset, clusterID); err != nil {
30+
return errors.Wrap(err, "watch embedded cluster nodes")
31+
}
32+
33+
return nil
34+
}
35+
36+
func watchECNodes(clientset kubernetes.Interface, clusterID string) error {
37+
if !util.IsEmbeddedCluster() {
38+
return nil
39+
}
40+
41+
logger.Info("starting embedded cluster nodes watcher")
42+
43+
factory := informers.NewSharedInformerFactory(clientset, 0)
44+
nodeInformer := factory.Core().V1().Nodes().Informer()
45+
46+
// by default, add func gets called for existing nodes, and
47+
// we don't want to report N times every time kotsadm restarts,
48+
// specially if there are a lot of nodes.
49+
hasSynced := false
50+
51+
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
52+
AddFunc: func(obj interface{}) {
53+
if !hasSynced {
54+
return
55+
}
56+
node := obj.(*corev1.Node)
57+
logger.Infof("Node added: %s", node.Name)
58+
if err := submitAppInfo(clusterID); err != nil {
59+
logger.Debugf("failed to submit app info: %v", err)
60+
}
61+
},
62+
DeleteFunc: func(obj interface{}) {
63+
node := obj.(*corev1.Node)
64+
logger.Infof("Node deleted: %s", node.Name)
65+
if err := submitAppInfo(clusterID); err != nil {
66+
logger.Debugf("failed to submit app info: %v", err)
67+
}
68+
},
69+
UpdateFunc: func(oldObj, newObj interface{}) {
70+
oldNode := oldObj.(*corev1.Node)
71+
newNode := newObj.(*corev1.Node)
72+
73+
// Check if ready condition changed
74+
oldReady := getNodeReadyStatus(oldNode)
75+
newReady := getNodeReadyStatus(newNode)
76+
77+
if oldReady != newReady {
78+
logger.Infof("Node %s ready status changed from %v to %v", newNode.Name, oldReady, newReady)
79+
if err := submitAppInfo(clusterID); err != nil {
80+
logger.Debugf("failed to submit app info: %v", err)
81+
}
82+
}
83+
},
84+
})
85+
86+
ctx := context.Background()
87+
go nodeInformer.Run(ctx.Done())
88+
89+
if !cache.WaitForCacheSync(ctx.Done(), nodeInformer.HasSynced) {
90+
return errors.New("sync node cache")
91+
}
92+
93+
hasSynced = true
94+
95+
return nil
96+
}
97+
98+
func getNodeReadyStatus(node *corev1.Node) corev1.ConditionStatus {
99+
for _, condition := range node.Status.Conditions {
100+
if condition.Type == corev1.NodeReady {
101+
return condition.Status
102+
}
103+
}
104+
return corev1.ConditionUnknown
105+
}
106+
107+
func submitAppInfo(clusterID string) error {
108+
apps, err := store.GetStore().ListAppsForDownstream(clusterID)
109+
if err != nil {
110+
return errors.Wrap(err, "list installed apps for downstream")
111+
}
112+
113+
if len(apps) == 0 {
114+
return nil
115+
}
116+
117+
// embedded cluster only supports one app
118+
return reporting.GetReporter().SubmitAppInfo(apps[0].ID)
119+
}

0 commit comments

Comments
 (0)