From e9dd26670c22fc5c6a4ce12c626690388cb2a606 Mon Sep 17 00:00:00 2001 From: Salah Aldeen Al Saleh Date: Thu, 27 Mar 2025 13:34:07 -0700 Subject: [PATCH 1/4] Reporting for multi-node EC installations --- pkg/api/reporting/types/types.go | 1 + pkg/apiserver/server.go | 6 +- .../{helmvm_node.go => node.go} | 0 .../{helmvm_nodes.go => nodes.go} | 0 pkg/informers/informers.go | 116 ----------------- pkg/operator/operator.go | 4 + pkg/reporting/app.go | 27 +++- pkg/reporting/app_airgap.go | 1 + pkg/reporting/instance_report.go | 1 + pkg/reporting/report_test.go | 1 + pkg/reporting/util.go | 3 + pkg/watchers/watchers.go | 119 ++++++++++++++++++ 12 files changed, 156 insertions(+), 123 deletions(-) rename pkg/embeddedcluster/{helmvm_node.go => node.go} (100%) rename pkg/embeddedcluster/{helmvm_nodes.go => nodes.go} (100%) delete mode 100644 pkg/informers/informers.go create mode 100644 pkg/watchers/watchers.go diff --git a/pkg/api/reporting/types/types.go b/pkg/api/reporting/types/types.go index b92e16c74e..bba75c3ccb 100644 --- a/pkg/api/reporting/types/types.go +++ b/pkg/api/reporting/types/types.go @@ -15,6 +15,7 @@ type ReportingInfo struct { KURLInstallID string `json:"kurl_install_id" yaml:"kurl_install_id"` EmbeddedClusterID string `json:"embedded_cluster_id" yaml:"embedded_cluster_id"` EmbeddedClusterVersion string `json:"embedded_cluster_version" yaml:"embedded_cluster_version"` + EmbeddedClusterNodes string `json:"embedded_cluster_nodes" yaml:"embedded_cluster_nodes"` IsGitOpsEnabled bool `json:"is_gitops_enabled" yaml:"is_gitops_enabled"` GitOpsProvider string `json:"gitops_provider" yaml:"gitops_provider"` SnapshotProvider string `json:"snapshot_provider" yaml:"snapshot_provider"` diff --git a/pkg/apiserver/server.go b/pkg/apiserver/server.go index 962f40ef5f..5521bcadd7 100644 --- a/pkg/apiserver/server.go +++ b/pkg/apiserver/server.go @@ -15,7 +15,6 @@ import ( "github.com/replicatedhq/kots/pkg/binaries" "github.com/replicatedhq/kots/pkg/handlers" identitymigrate "github.com/replicatedhq/kots/pkg/identity/migrate" - "github.com/replicatedhq/kots/pkg/informers" "github.com/replicatedhq/kots/pkg/k8sutil" "github.com/replicatedhq/kots/pkg/operator" operatorclient "github.com/replicatedhq/kots/pkg/operator/client" @@ -31,6 +30,7 @@ import ( "github.com/replicatedhq/kots/pkg/updatechecker" "github.com/replicatedhq/kots/pkg/upgradeservice" "github.com/replicatedhq/kots/pkg/util" + "github.com/replicatedhq/kots/pkg/watchers" "golang.org/x/crypto/bcrypt" ) @@ -121,8 +121,8 @@ func Start(params *APIServerParams) { supportbundle.StartServer() - if err := informers.Start(); err != nil { - log.Println("Failed to start informers:", err) + if err := watchers.Start(op.GetClusterID()); err != nil { + log.Println("Failed to start watchers:", err) } if err := updatechecker.Start(); err != nil { diff --git a/pkg/embeddedcluster/helmvm_node.go b/pkg/embeddedcluster/node.go similarity index 100% rename from pkg/embeddedcluster/helmvm_node.go rename to pkg/embeddedcluster/node.go diff --git a/pkg/embeddedcluster/helmvm_nodes.go b/pkg/embeddedcluster/nodes.go similarity index 100% rename from pkg/embeddedcluster/helmvm_nodes.go rename to pkg/embeddedcluster/nodes.go diff --git a/pkg/informers/informers.go b/pkg/informers/informers.go deleted file mode 100644 index 12a6fa4403..0000000000 --- a/pkg/informers/informers.go +++ /dev/null @@ -1,116 +0,0 @@ -package informers - -import ( - "context" - "log" - "time" - - "github.com/pkg/errors" - "github.com/replicatedhq/kots/pkg/k8sutil" - "github.com/replicatedhq/kots/pkg/logger" - kotssnapshot "github.com/replicatedhq/kots/pkg/snapshot" - "github.com/replicatedhq/kots/pkg/supportbundle" - "github.com/replicatedhq/kots/pkg/util" - velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - kuberneteserrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - kbclient "sigs.k8s.io/controller-runtime/pkg/client" -) - -// Start will start the kots informers -// These are not the application level informers, but they are the general purpose KOTS -// informers. For example, we want to watch Velero Backup -// in order to handle out-of-band updates -func Start() error { - cfg, err := k8sutil.GetClusterConfig() - if err != nil { - return errors.Wrap(err, "failed to get cluster config") - } - - clientset, err := kubernetes.NewForConfig(cfg) - if err != nil { - return errors.Wrap(err, "failed to create clientset") - } - - veleroNamespace, err := kotssnapshot.DetectVeleroNamespace(context.TODO(), clientset, util.PodNamespace) - if err != nil { - return errors.Wrap(err, "failed to detect velero namespace") - } - - veleroClient, err := k8sutil.GetKubeClient(context.TODO()) - if err != nil { - return errors.Wrap(err, "failed to create velero client") - } - - var backupList velerov1.BackupList - backupWatch, err := veleroClient.Watch(context.TODO(), &backupList, kbclient.InNamespace(veleroNamespace), &kbclient.ListOptions{ - Raw: &metav1.ListOptions{ResourceVersion: "0"}, - }) - if err != nil { - if kuberneteserrors.IsNotFound(err) { - return nil - } - - return errors.Wrap(err, "failed to watch") - } - - go func() { - ch := backupWatch.ResultChan() - for { - obj, ok := <-ch // this channel gets closed often - if !ok { - if err := Start(); err != nil { - log.Println("Failed to re-start informers", err) - } - break - } - if obj.Type == watch.Modified { - backup, ok := obj.Object.(*velerov1.Backup) - if !ok { - logger.Errorf("failed to cast obj to backup") - } - - if backup.Status.Phase == velerov1.BackupPhaseFailed || backup.Status.Phase == velerov1.BackupPhasePartiallyFailed { - if backup.Annotations == nil { - backup.Annotations = map[string]string{} - } - - _, ok := backup.Annotations["kots.io/support-bundle-requested"] - if !ok { - // here. finally.. request a support bundle for this - logger.Debugf("requesting support bundle for failed or partially failed backup %s", backup.Name) - - appID, ok := backup.Annotations["kots.io/app-id"] - if !ok { - logger.Errorf("failed to find app id anotation on backup") - } - - backup.Annotations["kots.io/support-bundle-requested"] = time.Now().UTC().Format(time.RFC3339) - - var backup velerov1.Backup - if err := veleroClient.Update(context.TODO(), &backup); err != nil { - logger.Error(err) - continue - } - - supportBundleID, err := supportbundle.CreateBundleForBackup(appID, backup.Name, backup.Namespace) - if err != nil { - logger.Error(err) - continue - } - - backup.Annotations["kots.io/support-bundle-id"] = supportBundleID - if err := veleroClient.Update(context.TODO(), &backup); err != nil { - logger.Error(err) - continue - } - } - } - } - } - }() - - return nil -} diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 107674d114..3bbb8adbf1 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -126,6 +126,10 @@ func startLoop(fn func(), intervalInSeconds time.Duration) { }() } +func (o *Operator) GetClusterID() string { + return o.clusterID +} + func (o *Operator) resumeDeployments() { apps, err := o.store.ListAppsForDownstream(o.clusterID) if err != nil { diff --git a/pkg/reporting/app.go b/pkg/reporting/app.go index 7fb5527b88..5ef8a82928 100644 --- a/pkg/reporting/app.go +++ b/pkg/reporting/app.go @@ -11,6 +11,7 @@ import ( downstreamtypes "github.com/replicatedhq/kots/pkg/api/downstream/types" "github.com/replicatedhq/kots/pkg/api/reporting/types" "github.com/replicatedhq/kots/pkg/buildversion" + "github.com/replicatedhq/kots/pkg/embeddedcluster" "github.com/replicatedhq/kots/pkg/gitops" "github.com/replicatedhq/kots/pkg/k8sutil" "github.com/replicatedhq/kots/pkg/kotsadm" @@ -157,6 +158,7 @@ func GetReportingInfo(appID string) *types.ReportingInfo { r.AppStatus = string(appStatus.State) } + // kurl r.IsKurl, err = kurl.IsKurl(clientset) if err != nil { logger.Debugf(errors.Wrap(err, "failed to check if cluster is kurl").Error()) @@ -167,11 +169,28 @@ func GetReportingInfo(appID string) *types.ReportingInfo { if err != nil { logger.Debugf(errors.Wrap(err, "failed to get kurl nodes").Error()) } + if kurlNodes != nil { + for _, kurlNode := range kurlNodes.Nodes { + r.KurlNodeCountTotal++ + if kurlNode.IsConnected && kurlNode.IsReady { + r.KurlNodeCountReady++ + } + } + } + } - for _, kurlNode := range kurlNodes.Nodes { - r.KurlNodeCountTotal++ - if kurlNode.IsConnected && kurlNode.IsReady { - r.KurlNodeCountReady++ + // embedded cluster + if util.IsEmbeddedCluster() && clientset != nil { + ecNodes, err := embeddedcluster.GetNodes(context.TODO(), clientset) + if err != nil { + logger.Debugf("failed to get embedded cluster nodes: %v", err.Error()) + } + if ecNodes != nil { + marshalled, err := json.Marshal(ecNodes.Nodes) + if err != nil { + logger.Debugf("failed to marshal embedded cluster node: %v", err.Error()) + } else { + r.EmbeddedClusterNodes = string(marshalled) } } } diff --git a/pkg/reporting/app_airgap.go b/pkg/reporting/app_airgap.go index 16aab9e6a2..93ced35606 100644 --- a/pkg/reporting/app_airgap.go +++ b/pkg/reporting/app_airgap.go @@ -69,6 +69,7 @@ func BuildInstanceReport(licenseID string, reportingInfo *types.ReportingInfo) * KurlInstallID: reportingInfo.KURLInstallID, EmbeddedClusterID: reportingInfo.EmbeddedClusterID, EmbeddedClusterVersion: reportingInfo.EmbeddedClusterVersion, + EmbeddedClusterNodes: reportingInfo.EmbeddedClusterNodes, IsGitOpsEnabled: reportingInfo.IsGitOpsEnabled, GitOpsProvider: reportingInfo.GitOpsProvider, SnapshotProvider: reportingInfo.SnapshotProvider, diff --git a/pkg/reporting/instance_report.go b/pkg/reporting/instance_report.go index 9bf0b3fbf2..e7773a7ef2 100644 --- a/pkg/reporting/instance_report.go +++ b/pkg/reporting/instance_report.go @@ -29,6 +29,7 @@ type InstanceReportEvent struct { KurlInstallID string `json:"kurl_install_id,omitempty"` EmbeddedClusterID string `json:"embedded_cluster_id,omitempty"` EmbeddedClusterVersion string `json:"embedded_cluster_version,omitempty"` + EmbeddedClusterNodes string `json:"embedded_cluster_nodes"` IsGitOpsEnabled bool `json:"is_gitops_enabled"` GitOpsProvider string `json:"gitops_provider"` SnapshotProvider string `json:"snapshot_provider"` diff --git a/pkg/reporting/report_test.go b/pkg/reporting/report_test.go index f4354adf2d..481f8a12e8 100644 --- a/pkg/reporting/report_test.go +++ b/pkg/reporting/report_test.go @@ -353,6 +353,7 @@ func createTestInstanceEvent(reportedAt int64) InstanceReportEvent { KurlInstallID: "test-kurl-install-id", EmbeddedClusterID: "test-embedded-cluster-id", EmbeddedClusterVersion: "test-embedded-cluster-version", + EmbeddedClusterNodes: `[{"name":"node-1","isReady":true},{"name":"node-2","isReady":false},{"name":"node-3","isReady":true},{"name":"node-4","isReady":false}]`, IsGitOpsEnabled: true, GitOpsProvider: "test-gitops-provider", SnapshotProvider: "test-snapshot-provider", diff --git a/pkg/reporting/util.go b/pkg/reporting/util.go index 1db80e9c03..e2ea1a240d 100644 --- a/pkg/reporting/util.go +++ b/pkg/reporting/util.go @@ -68,6 +68,9 @@ func GetReportingInfoHeaders(reportingInfo *types.ReportingInfo) map[string]stri if reportingInfo.EmbeddedClusterVersion != "" { headers["X-Replicated-EmbeddedClusterVersion"] = reportingInfo.EmbeddedClusterVersion } + if reportingInfo.EmbeddedClusterNodes != "" { + headers["X-Replicated-EmbeddedClusterNodes"] = reportingInfo.EmbeddedClusterNodes + } headers["X-Replicated-KurlNodeCountTotal"] = strconv.Itoa(reportingInfo.KurlNodeCountTotal) headers["X-Replicated-KurlNodeCountReady"] = strconv.Itoa(reportingInfo.KurlNodeCountReady) diff --git a/pkg/watchers/watchers.go b/pkg/watchers/watchers.go new file mode 100644 index 0000000000..7696cc72e3 --- /dev/null +++ b/pkg/watchers/watchers.go @@ -0,0 +1,119 @@ +package watchers + +import ( + "context" + + "github.com/pkg/errors" + "github.com/replicatedhq/kots/pkg/k8sutil" + "github.com/replicatedhq/kots/pkg/logger" + "github.com/replicatedhq/kots/pkg/reporting" + "github.com/replicatedhq/kots/pkg/store" + "github.com/replicatedhq/kots/pkg/util" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func Start(clusterID string) error { + cfg, err := k8sutil.GetClusterConfig() + if err != nil { + return errors.Wrap(err, "get cluster config") + } + + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "create clientset") + } + + if err := watchECNodes(clientset, clusterID); err != nil { + return errors.Wrap(err, "watch embedded cluster nodes") + } + + return nil +} + +func watchECNodes(clientset kubernetes.Interface, clusterID string) error { + if !util.IsEmbeddedCluster() { + return nil + } + + logger.Info("starting embedded cluster nodes watcher") + + factory := informers.NewSharedInformerFactory(clientset, 0) + nodeInformer := factory.Core().V1().Nodes().Informer() + + // by default, add func gets called for existing nodes, and + // we don't want to report N times every time kotsadm restarts, + // specially if there are a lot of nodes. + hasSynced := false + + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if !hasSynced { + return + } + node := obj.(*corev1.Node) + logger.Infof("Node added: %s", node.Name) + if err := submitAppInfo(clusterID); err != nil { + logger.Debugf("failed to submit app info: %v", err) + } + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*corev1.Node) + logger.Infof("Node deleted: %s", node.Name) + if err := submitAppInfo(clusterID); err != nil { + logger.Debugf("failed to submit app info: %v", err) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldNode := oldObj.(*corev1.Node) + newNode := newObj.(*corev1.Node) + + // Check if ready condition changed + oldReady := getNodeReadyStatus(oldNode) + newReady := getNodeReadyStatus(newNode) + + if oldReady != newReady { + logger.Infof("Node %s ready status changed from %v to %v", newNode.Name, oldReady, newReady) + if err := submitAppInfo(clusterID); err != nil { + logger.Debugf("failed to submit app info: %v", err) + } + } + }, + }) + + ctx := context.Background() + go nodeInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), nodeInformer.HasSynced) { + return errors.New("sync node cache") + } + + hasSynced = true + + return nil +} + +func getNodeReadyStatus(node *corev1.Node) corev1.ConditionStatus { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + return condition.Status + } + } + return corev1.ConditionUnknown +} + +func submitAppInfo(clusterID string) error { + apps, err := store.GetStore().ListAppsForDownstream(clusterID) + if err != nil { + return errors.Wrap(err, "list installed apps for downstream") + } + + if len(apps) == 0 { + return nil + } + + // embedded cluster only supports one app + return reporting.GetReporter().SubmitAppInfo(apps[0].ID) +} From 65790366d1ab7a6e211e31ce76c0cf3d62fa4d93 Mon Sep 17 00:00:00 2001 From: Salah Aldeen Al Saleh Date: Thu, 27 Mar 2025 17:55:34 -0700 Subject: [PATCH 2/4] f --- pkg/supportbundle/backup.go | 311 ------------------------------------ pkg/watchers/watchers.go | 14 +- 2 files changed, 9 insertions(+), 316 deletions(-) delete mode 100644 pkg/supportbundle/backup.go diff --git a/pkg/supportbundle/backup.go b/pkg/supportbundle/backup.go deleted file mode 100644 index 6e8b1395c6..0000000000 --- a/pkg/supportbundle/backup.go +++ /dev/null @@ -1,311 +0,0 @@ -package supportbundle - -import ( - "context" - "encoding/json" - "fmt" - "io/ioutil" - "os" - "path/filepath" - - "github.com/mholt/archiver/v3" - "github.com/pkg/errors" - "github.com/replicatedhq/kots/pkg/k8sutil" - "github.com/replicatedhq/kots/pkg/logger" - "github.com/replicatedhq/kots/pkg/redact" - "github.com/replicatedhq/kots/pkg/store" - troubleshootanalyze "github.com/replicatedhq/troubleshoot/pkg/analyze" - troubleshootv1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2" - troubleshootcollect "github.com/replicatedhq/troubleshoot/pkg/collect" - "github.com/replicatedhq/troubleshoot/pkg/convert" - troubleshootversion "github.com/replicatedhq/troubleshoot/pkg/version" - "go.uber.org/zap" - "gopkg.in/yaml.v2" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func CreateBundleForBackup(appID string, backupName string, backupNamespace string) (string, error) { - logger.Debug("executing support bundle for backup", - zap.String("backupName", backupName), - zap.String("backupNamespace", backupNamespace)) - - progressChan := make(chan interface{}, 0) - defer close(progressChan) - - go func() { - for { - msg, ok := <-progressChan - if ok { - logger.Debugf("%v", msg) - } else { - return - } - } - }() - - var RBACErrors []error - - var collectors []troubleshootcollect.Collector - - restConfig, err := k8sutil.GetClusterConfig() - if err != nil { - return "", errors.Wrap(err, "failed to get cluster config") - } - - k8sClientSet, err := k8sutil.GetClientset() - if err != nil { - return "", errors.Wrap(err, "failed to get kubernetes client") - } - - selectors := []string{ - "component=velero", - "app.kubernetes.io/name=velero", - } - - ctx := context.TODO() - - for _, selector := range selectors { - logsCollector := &troubleshootv1beta2.Logs{ - CollectorMeta: troubleshootv1beta2.CollectorMeta{ - CollectorName: "velero", - }, - Name: "velero", - Namespace: backupNamespace, - Selector: []string{selector}, - } - - collectors = append(collectors, &troubleshootcollect.CollectLogs{ - Collector: logsCollector, - Namespace: backupNamespace, - ClientConfig: restConfig, - Client: k8sClientSet, - Context: ctx, - RBACErrors: RBACErrors, - }) - } - - // make a temp file to store the bundle in - bundlePath, err := ioutil.TempDir("", "troubleshoot") - if err != nil { - return "", errors.Wrap(err, "failed to create temp dir") - } - defer os.RemoveAll(bundlePath) - - if err = writeVersionFile(bundlePath); err != nil { - return "", errors.Wrap(err, "failed to write version file") - } - - redacts := []*troubleshootv1beta2.Redact{} - globalRedact, err := redact.GetRedact() - if err == nil && globalRedact != nil { - redacts = globalRedact.Spec.Redactors - } else if err != nil { - return "", errors.Wrap(err, "failed to get global redactors") - } - - result := make(map[string][]byte) - - // Run preflights collectors synchronously - for _, collector := range collectors { - if collector.HasRBACErrors() { - // don't skip clusterResources collector due to RBAC issues - if _, ok := collector.(*troubleshootcollect.CollectClusterResources); !ok { - progressChan <- fmt.Sprintf("skipping collector %s with insufficient RBAC permissions", collector.Title()) - continue - } - } - - progressChan <- collector.Title() - - result, err = collector.Collect(progressChan) - if err != nil { - progressChan <- errors.Wrapf(err, "failed to run collector %q", collector.Title()) - continue - } - - if result != nil { - err = saveCollectorOutput(result, bundlePath) - if err != nil { - progressChan <- errors.Wrapf(err, "failed to parse collector spec %q", collector.Title()) - continue - } - } - } - - // Redact result before creating archive - err = troubleshootcollect.RedactResult(bundlePath, result, redacts) - if err != nil { - return "", errors.Wrap(err, "failed to redact") - } - - // create an archive of this bundle - supportBundleArchivePath, err := ioutil.TempDir("", "kotsadm") - if err != nil { - return "", errors.Wrap(err, "failed to create archive dir") - } - defer os.RemoveAll(supportBundleArchivePath) - - if err = tarSupportBundleDir(bundlePath, filepath.Join(supportBundleArchivePath, "support-bundle.tar.gz")); err != nil { - return "", errors.Wrap(err, "failed to create support bundle archive") - } - - // we have a support bundle... - // store it - supportBundle, err := CreateBundle( - fmt.Sprintf("backup-%s", backupName), - appID, - filepath.Join(supportBundleArchivePath, "support-bundle.tar.gz")) - if err != nil { - return "", errors.Wrap(err, "failed to create support bundle") - } - - // analyze it - analyzers := []*troubleshootv1beta2.Analyze{} - - analyzers = append(analyzers, &troubleshootv1beta2.Analyze{ - TextAnalyze: &troubleshootv1beta2.TextAnalyze{ - AnalyzeMeta: troubleshootv1beta2.AnalyzeMeta{ - CheckName: "Velero Errors", - }, - CollectorName: "velero", - FileName: "velero/velero*/velero.log", - RegexPattern: "level=error", - Outcomes: []*troubleshootv1beta2.Outcome{ - { - Fail: &troubleshootv1beta2.SingleOutcome{ - Message: "Velero has errors", - }, - }, - { - Pass: &troubleshootv1beta2.SingleOutcome{ - Message: "Velero does not have errors", - }, - }, - }, - }, - }) - analyzers = append(analyzers, &troubleshootv1beta2.Analyze{ - TextAnalyze: &troubleshootv1beta2.TextAnalyze{ - AnalyzeMeta: troubleshootv1beta2.AnalyzeMeta{ - CheckName: "Restic Volumes", - }, - CollectorName: "restic", - FileName: "restic/*.log", - RegexPattern: "expected one matching path, got 0", - Outcomes: []*troubleshootv1beta2.Outcome{ - { - Fail: &troubleshootv1beta2.SingleOutcome{ - Message: "Restic volume error", - }, - }, - { - Pass: &troubleshootv1beta2.SingleOutcome{ - Message: "No restic volume error", - }, - }, - }, - }, - }) - - analyzer := troubleshootv1beta2.Analyzer{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "troubleshoot.sh/v1beta2", - Kind: "Analyzer", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: backupName, - }, - Spec: troubleshootv1beta2.AnalyzerSpec{ - Analyzers: analyzers, - }, - } - b, err := json.Marshal(analyzer) - if err != nil { - return "", errors.Wrap(err, "failed to marshal analyzers") - } - - analyzeResult, err := troubleshootanalyze.DownloadAndAnalyze(filepath.Join(supportBundleArchivePath, "support-bundle.tar.gz"), string(b)) - if err != nil { - return "", errors.Wrap(err, "failed to analyze") - } - - data := convert.FromAnalyzerResult(analyzeResult) - insights, err := json.MarshalIndent(data, "", " ") - if err != nil { - return "", errors.Wrap(err, "failed to marshal analysis") - } - - if err := store.GetStore().SetSupportBundleAnalysis(supportBundle.ID, insights); err != nil { - return "", errors.Wrap(err, "failed to update bundle status") - } - return supportBundle.ID, nil -} - -func tarSupportBundleDir(inputDir string, outputFilename string) error { - tarGz := archiver.TarGz{ - Tar: &archiver.Tar{ - ImplicitTopLevelFolder: false, - OverwriteExisting: true, - }, - } - - paths := []string{ - filepath.Join(inputDir, "version.yaml"), // version file should be first in tar archive for quick extraction - } - - topLevelFiles, err := ioutil.ReadDir(inputDir) - if err != nil { - return errors.Wrap(err, "failed to list bundle directory contents") - } - for _, f := range topLevelFiles { - if f.Name() == "version.yaml" { - continue - } - paths = append(paths, filepath.Join(inputDir, f.Name())) - } - - if err := tarGz.Archive(paths, outputFilename); err != nil { - return errors.Wrap(err, "failed to create archive") - } - - return nil -} - -func saveCollectorOutput(output map[string][]byte, bundlePath string) error { - for filename, maybeContents := range output { - fileDir, fileName := filepath.Split(filename) - outPath := filepath.Join(bundlePath, fileDir) - - if err := os.MkdirAll(outPath, 0777); err != nil { - return errors.Wrap(err, "failed to create output file") - } - - if err := ioutil.WriteFile(filepath.Join(outPath, fileName), maybeContents, 0644); err != nil { - return errors.Wrap(err, "failed to write file") - } - } - - return nil -} - -func writeVersionFile(path string) error { - version := troubleshootv1beta2.SupportBundleVersion{ - ApiVersion: "troubleshoot.sh/v1beta2", - Kind: "SupportBundle", - Spec: troubleshootv1beta2.SupportBundleVersionSpec{ - VersionNumber: troubleshootversion.Version(), - }, - } - b, err := yaml.Marshal(version) - if err != nil { - return err - } - - filename := filepath.Join(path, "version.yaml") - err = ioutil.WriteFile(filename, b, 0644) - if err != nil { - return err - } - - return nil -} diff --git a/pkg/watchers/watchers.go b/pkg/watchers/watchers.go index 7696cc72e3..02bfcf2fbe 100644 --- a/pkg/watchers/watchers.go +++ b/pkg/watchers/watchers.go @@ -13,6 +13,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" ) func Start(clusterID string) error { @@ -46,11 +47,11 @@ func watchECNodes(clientset kubernetes.Interface, clusterID string) error { // by default, add func gets called for existing nodes, and // we don't want to report N times every time kotsadm restarts, // specially if there are a lot of nodes. - hasSynced := false + hasSynced := ptr.To(false) - nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + handler, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - if !hasSynced { + if !*hasSynced { return } node := obj.(*corev1.Node) @@ -82,15 +83,18 @@ func watchECNodes(clientset kubernetes.Interface, clusterID string) error { } }, }) + if err != nil { + return errors.Wrap(err, "add event handler") + } ctx := context.Background() go nodeInformer.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), nodeInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), handler.HasSynced) { return errors.New("sync node cache") } - hasSynced = true + *hasSynced = true return nil } From 15670ca6b4137759fedc2deb8fa5832a38a8a125 Mon Sep 17 00:00:00 2001 From: Salah Aldeen Al Saleh Date: Thu, 27 Mar 2025 18:30:36 -0700 Subject: [PATCH 3/4] feedback --- pkg/reporting/app.go | 32 +++++++++++++++++--------------- pkg/watchers/watchers.go | 6 +++--- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pkg/reporting/app.go b/pkg/reporting/app.go index 5ef8a82928..893201502a 100644 --- a/pkg/reporting/app.go +++ b/pkg/reporting/app.go @@ -121,18 +121,18 @@ func GetReportingInfo(appID string) *types.ReportingInfo { cfg, err := k8sutil.GetClusterConfig() if err != nil { - logger.Debugf("failed to get cluster config: %v", err.Error()) + logger.Warnf("failed to get cluster config: %v", err.Error()) } clientset, err := kubernetes.NewForConfig(cfg) if err != nil { - logger.Debugf("failed to get clientset: %v", err.Error()) + logger.Warnf("failed to get clientset: %v", err.Error()) } r.ClusterID = k8sutil.GetKotsadmID(clientset) di, err := getDownstreamInfo(appID) if err != nil { - logger.Debugf("failed to get downstream info: %v", err.Error()) + logger.Warnf("failed to get downstream info: %v", err.Error()) } if di != nil { r.Downstream = *di @@ -141,7 +141,7 @@ func GetReportingInfo(appID string) *types.ReportingInfo { // get kubernetes cluster version k8sVersion, err := k8sutil.GetK8sVersion(clientset) if err != nil { - logger.Debugf("failed to get k8s version: %v", err.Error()) + logger.Warnf("failed to get k8s version: %v", err.Error()) } else { r.K8sVersion = k8sVersion } @@ -153,7 +153,7 @@ func GetReportingInfo(appID string) *types.ReportingInfo { // get app status appStatus, err := store.GetStore().GetAppStatus(appID) if err != nil { - logger.Debugf("failed to get app status: %v", err.Error()) + logger.Warnf("failed to get app status: %v", err.Error()) } else { r.AppStatus = string(appStatus.State) } @@ -161,13 +161,13 @@ func GetReportingInfo(appID string) *types.ReportingInfo { // kurl r.IsKurl, err = kurl.IsKurl(clientset) if err != nil { - logger.Debugf(errors.Wrap(err, "failed to check if cluster is kurl").Error()) + logger.Warnf(errors.Wrap(err, "failed to check if cluster is kurl").Error()) } if r.IsKurl && clientset != nil { kurlNodes, err := kurl.GetNodes(clientset) if err != nil { - logger.Debugf(errors.Wrap(err, "failed to get kurl nodes").Error()) + logger.Warnf(errors.Wrap(err, "failed to get kurl nodes").Error()) } if kurlNodes != nil { for _, kurlNode := range kurlNodes.Nodes { @@ -183,12 +183,12 @@ func GetReportingInfo(appID string) *types.ReportingInfo { if util.IsEmbeddedCluster() && clientset != nil { ecNodes, err := embeddedcluster.GetNodes(context.TODO(), clientset) if err != nil { - logger.Debugf("failed to get embedded cluster nodes: %v", err.Error()) + logger.Warnf("failed to get embedded cluster nodes: %v", err.Error()) } if ecNodes != nil { marshalled, err := json.Marshal(ecNodes.Nodes) if err != nil { - logger.Debugf("failed to marshal embedded cluster node: %v", err.Error()) + logger.Warnf("failed to marshal embedded cluster node: %v", err.Error()) } else { r.EmbeddedClusterNodes = string(marshalled) } @@ -199,17 +199,17 @@ func GetReportingInfo(appID string) *types.ReportingInfo { veleroClient, err := k8sutil.GetKubeClient(context.TODO()) if err != nil { - logger.Debugf("failed to get velero client: %v", err.Error()) + logger.Warnf("failed to get velero client: %v", err.Error()) } if clientset != nil && veleroClient != nil { bsl, err := snapshot.FindBackupStoreLocation(context.TODO(), clientset, veleroClient, util.PodNamespace) if err != nil { - logger.Debugf("failed to find backup store location: %v", err.Error()) + logger.Warnf("failed to find backup store location: %v", err.Error()) } else { report, err := getSnapshotReport(store.GetStore(), bsl, appID, r.ClusterID) if err != nil { - logger.Debugf("failed to get snapshot report: %v", err.Error()) + logger.Warnf("failed to get snapshot report: %v", err.Error()) } else { r.SnapshotProvider = report.Provider r.SnapshotFullSchedule = report.FullSchedule @@ -231,7 +231,9 @@ func getDownstreamInfo(appID string) (*types.DownstreamInfo, error) { return nil, errors.Wrap(err, "failed to list downstreams for app") } if len(downstreams) == 0 { - return nil, errors.New("no downstreams found for app") + // this can happen during initial install workflow until the app is installed and has downstreams + logger.Debugf("no downstreams found for app") + return nil, nil } currentVersion, err := store.GetStore().GetCurrentDownstreamVersion(appID, downstreams[0].ClusterID) @@ -266,7 +268,7 @@ func getDownstreamInfo(appID string) (*types.DownstreamInfo, error) { var preflightResults *troubleshootpreflight.UploadPreflightResults if err := json.Unmarshal([]byte(currentVersion.PreflightResult), &preflightResults); err != nil { - logger.Debugf("failed to unmarshal preflight results: %v", err.Error()) + logger.Warnf("failed to unmarshal preflight results: %v", err.Error()) } di.PreflightState = getPreflightState(preflightResults) di.SkipPreflights = currentVersion.PreflightSkipped @@ -288,7 +290,7 @@ func getDownstreamInfo(appID string) (*types.DownstreamInfo, error) { func getGitOpsReport(clientset kubernetes.Interface, appID string, clusterID string) (bool, string) { gitOpsConfig, err := gitops.GetDownstreamGitOpsConfig(clientset, appID, clusterID) if err != nil { - logger.Debugf("failed to get gitops config: %v", err.Error()) + logger.Warnf("failed to get gitops config: %v", err.Error()) return false, "" } diff --git a/pkg/watchers/watchers.go b/pkg/watchers/watchers.go index 02bfcf2fbe..6a3354db1b 100644 --- a/pkg/watchers/watchers.go +++ b/pkg/watchers/watchers.go @@ -57,14 +57,14 @@ func watchECNodes(clientset kubernetes.Interface, clusterID string) error { node := obj.(*corev1.Node) logger.Infof("Node added: %s", node.Name) if err := submitAppInfo(clusterID); err != nil { - logger.Debugf("failed to submit app info: %v", err) + logger.Warnf("failed to submit app info: %v", err) } }, DeleteFunc: func(obj interface{}) { node := obj.(*corev1.Node) logger.Infof("Node deleted: %s", node.Name) if err := submitAppInfo(clusterID); err != nil { - logger.Debugf("failed to submit app info: %v", err) + logger.Warnf("failed to submit app info: %v", err) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -78,7 +78,7 @@ func watchECNodes(clientset kubernetes.Interface, clusterID string) error { if oldReady != newReady { logger.Infof("Node %s ready status changed from %v to %v", newNode.Name, oldReady, newReady) if err := submitAppInfo(clusterID); err != nil { - logger.Debugf("failed to submit app info: %v", err) + logger.Warnf("failed to submit app info: %v", err) } } }, From 598d385eaa45883fe93b45d53f990210f7bb9cf7 Mon Sep 17 00:00:00 2001 From: Salah Aldeen Al Saleh Date: Fri, 28 Mar 2025 07:20:54 -0700 Subject: [PATCH 4/4] roles not labels --- pkg/embeddedcluster/node.go | 2 +- pkg/embeddedcluster/types/types.go | 2 +- pkg/watchers/watchers.go | 7 +++---- web/src/components/apps/EmbeddedClusterManagement.tsx | 4 ++-- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/embeddedcluster/node.go b/pkg/embeddedcluster/node.go index 13d72002a9..e9760ad53f 100644 --- a/pkg/embeddedcluster/node.go +++ b/pkg/embeddedcluster/node.go @@ -131,7 +131,7 @@ func nodeMetrics(ctx context.Context, client kubernetes.Interface, metricsClient CPU: cpuCapacity, Memory: memoryCapacity, Pods: podCapacity, - Labels: nodeRolesFromLabels(node.Labels), + Roles: nodeRolesFromLabels(node.Labels), Conditions: findNodeConditions(node.Status.Conditions), PodList: podInfo, }, nil diff --git a/pkg/embeddedcluster/types/types.go b/pkg/embeddedcluster/types/types.go index f9b93a4b8c..80add9eb63 100644 --- a/pkg/embeddedcluster/types/types.go +++ b/pkg/embeddedcluster/types/types.go @@ -22,7 +22,7 @@ type Node struct { CPU CapacityUsed `json:"cpu"` Memory CapacityUsed `json:"memory"` Pods CapacityUsed `json:"pods"` - Labels []string `json:"labels"` + Roles []string `json:"roles"` Conditions NodeConditions `json:"conditions"` PodList []PodInfo `json:"podList"` } diff --git a/pkg/watchers/watchers.go b/pkg/watchers/watchers.go index 6a3354db1b..2983e83ea7 100644 --- a/pkg/watchers/watchers.go +++ b/pkg/watchers/watchers.go @@ -13,7 +13,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "k8s.io/utils/ptr" ) func Start(clusterID string) error { @@ -47,11 +46,11 @@ func watchECNodes(clientset kubernetes.Interface, clusterID string) error { // by default, add func gets called for existing nodes, and // we don't want to report N times every time kotsadm restarts, // specially if there are a lot of nodes. - hasSynced := ptr.To(false) + hasSynced := false handler, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - if !*hasSynced { + if !hasSynced { return } node := obj.(*corev1.Node) @@ -94,7 +93,7 @@ func watchECNodes(clientset kubernetes.Interface, clusterID string) error { return errors.New("sync node cache") } - *hasSynced = true + hasSynced = true return nil } diff --git a/web/src/components/apps/EmbeddedClusterManagement.tsx b/web/src/components/apps/EmbeddedClusterManagement.tsx index a2c8961165..a8f39c14a4 100644 --- a/web/src/components/apps/EmbeddedClusterManagement.tsx +++ b/web/src/components/apps/EmbeddedClusterManagement.tsx @@ -84,7 +84,7 @@ const EmbeddedClusterManagement = ({ capacity: number; used: number; }; - labels?: string[]; + roles?: string[]; conditions: { memoryPressure: boolean; diskPressure: boolean; @@ -354,7 +354,7 @@ const EmbeddedClusterManagement = ({ return ( (nodesData?.nodes || testData?.nodes)?.map((n) => ({ name: n.name, - roles: n.labels?.join(" ") || "", + roles: n.roles?.join(" ") || "", status: n.isReady ? "Ready" : "Not Ready", cpu: `${n.cpu.used.toFixed(2)} / ${n.cpu.capacity.toFixed(2)}`, memory: `${n.memory.used.toFixed(2)} / ${n.memory.capacity.toFixed(