diff --git a/internal/tools/sigmigrate/README.md b/internal/tools/sigmigrate/README.md index a6f9e6fd..32802587 100644 --- a/internal/tools/sigmigrate/README.md +++ b/internal/tools/sigmigrate/README.md @@ -100,19 +100,19 @@ d8 tools sig-migrate \ 5. **Error Handling**: - If a resource does not support annotations (MethodNotAllowed), it is added to the list of unsupported types and skipped in the future. - If the operation is forbidden for the current service account, the command automatically attempts to use an alternative service account (`system:serviceaccount:d8-multitenancy-manager:multitenancy-manager`). - - Each run writes failed/skipped artifacts to run-scoped files in `/tmp` with a timestamp suffix (for example: `/tmp/failed_annotations_20260414T151625Z.txt`, `/tmp/failed_errors_20260414T151625Z.txt`, `/tmp/skipped_objects_20260414T151625Z.txt`). - - For backward-compatible retry UX, the latest failed annotations are also synced to legacy `/tmp/failed_annotations.txt`, so `--retry` continues to work without extra arguments. + - Each run writes failed/skipped artifacts to run-scoped files in `/tmp` with a timestamp suffix (for example: `/tmp/failed_annotations_20260414T151625Z.log`, `/tmp/failed_errors_20260414T151625Z.log`, `/tmp/skipped_objects_20260414T151625Z.log`). + - For backward-compatible retry UX, the latest failed annotations are also synced to legacy `/tmp/failed_annotations.log`, so `--retry` continues to work without extra arguments. - A dedicated trace debug log is written to `/tmp/sigmigrate_trace_.log` with detailed execution/error diagnostics. ## Retry Files The command creates run-scoped files to track failed operations: -- `/tmp/failed_annotations_.txt` - list of objects in `namespace|name|kind|group|version` format that failed to be processed -- `/tmp/failed_errors_.txt` - detailed error information in `namespace|name|kind|error_message` format -- `/tmp/skipped_objects_.txt` - skipped objects with reason/details +- `/tmp/failed_annotations_.log` - list of objects in `namespace|name|kind|group|version` format that failed to be processed +- `/tmp/failed_errors_.log` - detailed error information in `namespace|name|kind|error_message` format +- `/tmp/skipped_objects_.log` - skipped objects with reason/details -For retry compatibility, failed annotations are also mirrored into legacy `/tmp/failed_annotations.txt` and `--retry` reads from that legacy file (supports both old `namespace|name|kind` and new `namespace|name|kind|group|version` lines). +For retry compatibility, failed annotations are also mirrored into legacy `/tmp/failed_annotations.log` and `--retry` reads from that legacy file (supports both old `namespace|name|kind` and new `namespace|name|kind|group|version` lines). ### Automatic Failure Detection @@ -128,8 +128,8 @@ Example output when failures occur: ⚠️ Migration completed with 5 failed object(s). Some objects could not be annotated. Please check the error details: - Error log file: /tmp/failed_errors_.txt - Failed objects list: /tmp/failed_annotations_.txt + Error log file: /tmp/failed_errors_.log + Failed objects list: /tmp/failed_annotations_.log Trace log file: /tmp/sigmigrate_trace_.log To investigate the issues: diff --git a/internal/tools/sigmigrate/cmd/flags.go b/internal/tools/sigmigrate/cmd/flags.go index 772cb9cd..80acd177 100644 --- a/internal/tools/sigmigrate/cmd/flags.go +++ b/internal/tools/sigmigrate/cmd/flags.go @@ -20,6 +20,12 @@ import ( "os" "github.com/spf13/pflag" + + "github.com/deckhouse/deckhouse-cli/internal/tools/sigmigrate" +) + +const ( + defaultKubectlAs = "system:serviceaccount:d8-system:deckhouse" ) func addFlags(flags *pflag.FlagSet) { @@ -31,7 +37,7 @@ func addFlags(flags *pflag.FlagSet) { flags.String( "as", - "system:serviceaccount:d8-system:deckhouse", + defaultKubectlAs, "Specify a Kubernetes service account for the kubectl operations (impersonation).", ) @@ -63,4 +69,16 @@ func addFlags(flags *pflag.FlagSet) { "", "Process objects by identifier in format //. Use 'clusterwide' namespace for cluster-scoped resources. Resource name must match kubectl api-resources output.", ) + + flags.Int( + "threads", + sigmigrate.DefaultWorkerCount, + "Number of worker threads for resource discovery and migration. Values <=0 use default.", + ) + + flags.Bool( + "measure-stages", + false, + "Print execution time for major migration stages.", + ) } diff --git a/internal/tools/sigmigrate/cmd/flags_test.go b/internal/tools/sigmigrate/cmd/flags_test.go new file mode 100644 index 00000000..fd8ebfbb --- /dev/null +++ b/internal/tools/sigmigrate/cmd/flags_test.go @@ -0,0 +1,43 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "testing" + + "github.com/deckhouse/deckhouse-cli/internal/tools/sigmigrate" + "github.com/spf13/pflag" + "github.com/stretchr/testify/require" +) + +func TestAddFlags_DefaultThreadsMatchesCoreConstant(t *testing.T) { + flags := pflag.NewFlagSet("sig-migrate", pflag.ContinueOnError) + addFlags(flags) + + threads, err := flags.GetInt("threads") + require.NoError(t, err) + require.Equal(t, sigmigrate.DefaultWorkerCount, threads) +} + +func TestAddFlags_DefaultLogLevel(t *testing.T) { + flags := pflag.NewFlagSet("sig-migrate", pflag.ContinueOnError) + addFlags(flags) + + logLevel, err := flags.GetString("log-level") + require.NoError(t, err) + require.Equal(t, "DEBUG", logLevel) +} diff --git a/internal/tools/sigmigrate/sigmigrate.go b/internal/tools/sigmigrate/sigmigrate.go index 82d92358..7dc542c7 100644 --- a/internal/tools/sigmigrate/sigmigrate.go +++ b/internal/tools/sigmigrate/sigmigrate.go @@ -19,16 +19,20 @@ package sigmigrate import ( "context" "encoding/json" + stderrors "errors" "fmt" + "net" "os" "strings" "sync" + "sync/atomic" "time" "github.com/fatih/color" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" @@ -41,27 +45,42 @@ import ( const ( annotationKey = "d8-migration" annotationKeyToRemove = "d8-migration-" - defaultKubectlAs = "system:serviceaccount:d8-system:deckhouse" switchAccount = "system:serviceaccount:d8-multitenancy-manager:multitenancy-manager" - legacyFailedAttemptsFile = "/tmp/failed_annotations.txt" - legacyErrorLogFile = "/tmp/failed_errors.txt" - legacySkippedObjectsFile = "/tmp/skipped_objects.txt" + legacyFailedAttemptsFile = "/tmp/failed_annotations.log" + legacyErrorLogFile = "/tmp/failed_errors.log" + legacySkippedObjectsFile = "/tmp/skipped_objects.log" runTimestampFormat = "20060102T150405Z" + maxWorkerCount = 256 + maxRequestRetries = 5 + requestTimeout = 30 * time.Second + baseRetryDelay = 200 * time.Millisecond + DefaultWorkerCount = 10 + defaultWorkerCount = DefaultWorkerCount + defaultClientQPS = 50 + defaultClientBurst = 100 + progressPrintInterval = 500 * time.Millisecond + progressPercentStep = 2 ) var runStateMu sync.RWMutex +var fileWriteMu sync.Mutex +var traceWriteMu sync.Mutex var currentRunState *sigMigrateRunState // sigMigrateRunState stores paths and resources for one command run. type sigMigrateRunState struct { RunID string + LogLevel string FailedAttemptsFile string ErrorLogFile string SkippedObjectsFile string TraceLogFile string LegacyFailedRetryFile string traceFile *os.File + failedAttemptsWriter *os.File + errorLogWriter *os.File + skippedObjectsWriter *os.File } func newSigMigrateRunState(now time.Time) *sigMigrateRunState { @@ -69,9 +88,9 @@ func newSigMigrateRunState(now time.Time) *sigMigrateRunState { return &sigMigrateRunState{ RunID: runID, - FailedAttemptsFile: fmt.Sprintf("/tmp/failed_annotations_%s.txt", runID), - ErrorLogFile: fmt.Sprintf("/tmp/failed_errors_%s.txt", runID), - SkippedObjectsFile: fmt.Sprintf("/tmp/skipped_objects_%s.txt", runID), + FailedAttemptsFile: fmt.Sprintf("/tmp/failed_annotations_%s.log", runID), + ErrorLogFile: fmt.Sprintf("/tmp/failed_errors_%s.log", runID), + SkippedObjectsFile: fmt.Sprintf("/tmp/skipped_objects_%s.log", runID), TraceLogFile: fmt.Sprintf("/tmp/sigmigrate_trace_%s.log", runID), LegacyFailedRetryFile: legacyFailedAttemptsFile, } @@ -123,13 +142,25 @@ func tracef(format string, args ...interface{}) { } message := fmt.Sprintf(format, args...) + + traceWriteMu.Lock() + defer traceWriteMu.Unlock() + if _, err := fmt.Fprintf(state.traceFile, "%s TRACE %s\n", time.Now().UTC().Format(time.RFC3339Nano), message); err != nil { fmt.Fprintf(os.Stderr, "Warning: failed to write trace log file %s: %v\n", state.TraceLogFile, err) } } -func syncLegacyRetryFile() error { - state := getCurrentRunState() +func isTraceVerboseEnabled() bool { + level := strings.ToUpper(getCurrentRunState().LogLevel) + return level == "TRACE" +} + +func syncLegacyRetryFileForState(state *sigMigrateRunState) error { + if state == nil { + state = getCurrentRunState() + } + srcFile := state.FailedAttemptsFile dstFile := state.LegacyFailedRetryFile @@ -155,6 +186,10 @@ func syncLegacyRetryFile() error { return nil } +func syncLegacyRetryFile() error { + return syncLegacyRetryFileForState(getCurrentRunState()) +} + func truncateFile(path string) { if err := os.WriteFile(path, []byte{}, 0644); err != nil { fmt.Fprintf(os.Stderr, "Warning: failed to truncate %s: %v\n", path, err) @@ -236,12 +271,14 @@ func upsertCollectedObject( } type SigMigrateConfig struct { - RetryFailed bool - KubectlAs string - LogLevel string - Kubeconfig string - Context string - Object string + RetryFailed bool + KubectlAs string + LogLevel string + Kubeconfig string + Context string + Object string + Workers int + MeasureStages bool } func SigMigrate(cmd *cobra.Command, _ []string) error { @@ -264,6 +301,8 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { return fmt.Errorf("failed to get log-level flag: %w", err) } + config.LogLevel = normalizeLogLevel(config.LogLevel) + config.Kubeconfig, err = cmd.Flags().GetString("kubeconfig") if err != nil { return fmt.Errorf("failed to get kubeconfig flag: %w", err) @@ -279,6 +318,18 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { return fmt.Errorf("failed to get object flag: %w", err) } + config.Workers, err = cmd.Flags().GetInt("threads") + if err != nil { + return fmt.Errorf("failed to get threads flag: %w", err) + } + + config.Workers = normalizeWorkerCount(config.Workers) + + config.MeasureStages, err = cmd.Flags().GetBool("measure-stages") + if err != nil { + return fmt.Errorf("failed to get measure-stages flag: %w", err) + } + runState := newSigMigrateRunState(time.Now()) traceFile, traceOpenErr := os.OpenFile(runState.TraceLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) @@ -288,26 +339,33 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { runState.traceFile = traceFile } + runState.LogLevel = config.LogLevel setCurrentRunState(runState) defer func() { + closeRunStateWriters(runState) + if runState.traceFile != nil { + traceWriteMu.Lock() _ = runState.traceFile.Sync() _ = runState.traceFile.Close() + traceWriteMu.Unlock() } - setCurrentRunState(nil) - }() - defer func() { - if syncErr := syncLegacyRetryFile(); syncErr != nil { + if syncErr := syncLegacyRetryFileForState(runState); syncErr != nil { fmt.Fprintf(os.Stderr, "Warning: failed to sync legacy retry file: %v\n", syncErr) } + + setCurrentRunState(nil) }() - tracef("sig-migrate started: retry=%t, object=%q, log-level=%s", config.RetryFailed, config.Object, config.LogLevel) + tracef("sig-migrate started: retry=%t, object=%q, log-level=%s, threads=%d, measure-stages=%t", config.RetryFailed, config.Object, config.LogLevel, config.Workers, config.MeasureStages) tracef("run artifacts: failed=%s, errors=%s, skipped=%s, trace=%s", getFailedAttemptsFilePath(), getErrorLogFilePath(), getSkippedObjectsFilePath(), runState.TraceLogFile) tracef("legacy retry compatibility file: %s", getLegacyRetryFilePath()) + commandStart := time.Now() + clientsStartedAt := time.Now() + restConfig, _, err := utilk8s.SetupK8sClientSet(config.Kubeconfig, config.Context) if err != nil { tracef("failed to setup Kubernetes client: %v", err) @@ -316,6 +374,13 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { // Setup impersonation restConfig.Impersonate.UserName = config.KubectlAs + if restConfig.QPS <= 0 || restConfig.QPS < defaultClientQPS { + restConfig.QPS = defaultClientQPS + } + + if restConfig.Burst <= 0 || restConfig.Burst < defaultClientBurst { + restConfig.Burst = defaultClientBurst + } discoveryClient, err := discovery.NewDiscoveryClientForConfig(restConfig) if err != nil { @@ -329,8 +394,12 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { return fmt.Errorf("failed to create dynamic client: %w", err) } + clientsDuration := time.Since(clientsStartedAt) + var objects map[string]ObjectRef + objectCollectionStartedAt := time.Now() + switch { case config.Object != "" && !config.RetryFailed: objects, err = collectSingleObject(discoveryClient, dynamicClient, config.Object, config.LogLevel) @@ -375,7 +444,7 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { color.Cyan("Loaded %d objects for retry from %s.\n", len(objects), getLegacyRetryFilePath()) default: - objects, err = collectAllObjects(discoveryClient, dynamicClient, config.LogLevel) + objects, err = collectAllObjects(discoveryClient, dynamicClient, config.LogLevel, config.Workers) if err != nil { tracef("failed to collect objects: %v", err) return fmt.Errorf("failed to collect objects: %w", err) @@ -384,6 +453,8 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { color.Cyan("\nTotal objects collected: %d\n", len(objects)) } + objectCollectionDuration := time.Since(objectCollectionStartedAt) + if len(objects) == 0 { color.Red("No objects available for annotation. Exiting.") tracef("no objects available for annotation") @@ -406,22 +477,45 @@ func SigMigrate(cmd *cobra.Command, _ []string) error { return fmt.Errorf("failed to create switch dynamic client: %w", err) } + migrationStartedAt := time.Now() timestamp := time.Now().Unix() unsupportedTypes := make(map[string]bool) - annotateObjects(dynamicClient, switchDynamicClient, objects, timestamp, unsupportedTypes, config.LogLevel) + annotateObjects(dynamicClient, switchDynamicClient, objects, timestamp, unsupportedTypes, config.LogLevel, config.Workers) + + migrationDuration := time.Since(migrationStartedAt) + postChecksStartedAt := time.Now() // Check if there were any failed annotations checkFailedAnnotations() + + postChecksDuration := time.Since(postChecksStartedAt) + tracef("sig-migrate completed") + if config.MeasureStages { + totalDuration := time.Since(commandStart) + + color.Cyan("\nStage timing summary:") + color.Cyan(" Clients initialization: %s", clientsDuration) + color.Cyan(" Objects collection: %s", objectCollectionDuration) + color.Cyan(" Migration: %s", migrationDuration) + color.Cyan(" Post-checks: %s", postChecksDuration) + color.Cyan(" Total: %s\n", totalDuration) + tracef("stage timing summary: clients=%s collection=%s migration=%s post-checks=%s total=%s", clientsDuration, objectCollectionDuration, migrationDuration, postChecksDuration, totalDuration) + } + return nil } -func collectAllObjects(discoveryClient discovery.DiscoveryInterface, dynamicClient dynamic.Interface, logLevel string) (map[string]ObjectRef, error) { +type resourceInfo struct { + gvr schema.GroupVersionResource + namespaced bool +} + +func collectAllObjects(discoveryClient discovery.DiscoveryInterface, dynamicClient dynamic.Interface, logLevel string, workers int) (map[string]ObjectRef, error) { objects := make(map[string]ObjectRef) - // Get all API groups first (similar to kubectl api-resources) apiGroupList, err := discoveryClient.ServerGroups() if err != nil { tracef("failed to discover API groups: %v", err) @@ -430,25 +524,12 @@ func collectAllObjects(discoveryClient discovery.DiscoveryInterface, dynamicClie preferredByGroup := preferredVersionByGroup(apiGroupList) - namespacedResources := []schema.GroupVersionResource{} - clusterResources := []schema.GroupVersionResource{} - - // Track resources by GVR to collect all unique API versions - // This allows us to process both core API resources and custom resources (like apps.kruise.io) - type resourceInfo struct { - gvr schema.GroupVersionResource - namespaced bool - } - resourceMap := make(map[string]resourceInfo) - // Iterate through all API groups and their versions (like kubectl api-resources does) for _, group := range apiGroupList.Groups { for _, version := range group.Versions { - // Get resources for this specific group version apiResourceList, err := discoveryClient.ServerResourcesForGroupVersion(version.GroupVersion) if err != nil { - // Log but continue - some groups may fail (e.g., metrics) if logLevel == "TRACE" { fmt.Printf("Warning: failed to get resources for %s: %v\n", version.GroupVersion, err) } @@ -463,125 +544,125 @@ func collectAllObjects(discoveryClient discovery.DiscoveryInterface, dynamicClie continue } - // Skip metrics and other API groups that don't support standard operations - // These groups typically only support read operations if gv.Group == "metrics.k8s.io" || gv.Group == "custom.metrics.k8s.io" || gv.Group == "external.metrics.k8s.io" { continue } for _, apiResource := range apiResourceList.APIResources { - // Skip subresources if strings.Contains(apiResource.Name, "/") { continue } - // Skip resources that don't support list - if !contains(apiResource.Verbs, "list") { + if !contains(apiResource.Verbs, "list") || !contains(apiResource.Verbs, "patch") { continue } - // Skip resources that don't support patch (needed for annotations) - if !contains(apiResource.Verbs, "patch") { - continue - } - - gvr := schema.GroupVersionResource{ - Group: gv.Group, - Version: gv.Version, - Resource: apiResource.Name, - } - - // Use full GVR string as key to collect all unique API versions - // This allows us to process both core API (apps/v1/daemonsets) and custom resources (apps.kruise.io/v1alpha1/daemonsets) - resourceKey := gvr.String() - - // Only add if we haven't seen this exact GVR before - if _, exists := resourceMap[resourceKey]; !exists { - resourceMap[resourceKey] = resourceInfo{gvr: gvr, namespaced: apiResource.Namespaced} + gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: apiResource.Name} + if _, exists := resourceMap[gvr.String()]; !exists { + resourceMap[gvr.String()] = resourceInfo{gvr: gvr, namespaced: apiResource.Namespaced} } } } } - // Convert map to slices + resources := make([]resourceInfo, 0, len(resourceMap)) for _, info := range resourceMap { - if info.namespaced { - namespacedResources = append(namespacedResources, info.gvr) - } else { - clusterResources = append(clusterResources, info.gvr) - } + resources = append(resources, info) } - totalResources := len(namespacedResources) + len(clusterResources) - currentResource := 0 + if len(resources) == 0 { + fmt.Println() + return objects, nil + } - // Process namespaced resources - for _, gvr := range namespacedResources { - currentResource++ - progress := (currentResource * 100) / totalResources + workerCount := normalizeWorkerCount(workers) + if workerCount > len(resources) { + workerCount = len(resources) + } - if logLevel == "TRACE" { - fmt.Printf("\nFetching resource: %s\n", gvr.String()) - } else { - greenProgress := color.New(color.FgGreen).SprintFunc() - fmt.Printf("\rCalculating: [%s] Processing Namespaced Resource: %s ", greenProgress(fmt.Sprintf("%d%%", progress)), gvr.Resource) - } + jobs := make(chan resourceInfo) - resourceClient := dynamicClient.Resource(gvr) + var ( + wg sync.WaitGroup + progressMu sync.Mutex + objectsMu sync.Mutex + processed int64 + ) - list, err := resourceClient.List(context.TODO(), metav1.ListOptions{}) - if err != nil { - if logLevel == "TRACE" { - fmt.Printf("Error listing %s: %v\n", gvr.String(), err) - } + totalResources := int64(len(resources)) + lastProgressPercent := -1 + lastProgressPrintedAt := time.Now() - tracef("error listing %s: %v", gvr.String(), err) + for i := 0; i < workerCount; i++ { + wg.Add(1) - continue - } + go func() { + defer wg.Done() - for _, item := range list.Items { - namespace := item.GetNamespace() - if namespace == "" { - namespace = "clusterwide" - } + for info := range jobs { + if logLevel == "TRACE" { + fmt.Printf("\nFetching resource: %s\n", info.gvr.String()) + } - name := item.GetName() - upsertCollectedObject(objects, namespace, name, gvr, preferredByGroup) - } - } + list, err := withRetryResult("list "+info.gvr.String(), logLevel, func(ctx context.Context) (*unstructured.UnstructuredList, error) { + return dynamicClient.Resource(info.gvr).List(ctx, metav1.ListOptions{}) + }) + if err != nil { + if logLevel == "TRACE" { + fmt.Printf("Error listing %s: %v\n", info.gvr.String(), err) + } - // Process cluster resources - for _, gvr := range clusterResources { - currentResource++ - progress := (currentResource * 100) / totalResources + tracef("error listing %s: %v", info.gvr.String(), err) - if logLevel == "TRACE" { - fmt.Printf("\nFetching resource: %s\n", gvr.String()) - } else { - greenProgress := color.New(color.FgGreen).SprintFunc() - fmt.Printf("\rCalculating: [%s] Processing Cluster Resource: %s ", greenProgress(fmt.Sprintf("%d%%", progress)), gvr.Resource) - } + current := atomic.AddInt64(&processed, 1) - resourceClient := dynamicClient.Resource(gvr) + if logLevel != "TRACE" { + progressMu.Lock() + if shouldEmitProgress(current, totalResources, &lastProgressPercent, &lastProgressPrintedAt) { + progress := int((current * 100) / totalResources) + greenProgress := color.New(color.FgGreen).SprintFunc() + fmt.Printf("\rCalculating: [%s] Processed Resource: %s ", greenProgress(fmt.Sprintf("%d%%", progress)), info.gvr.Resource) + } + progressMu.Unlock() + } - list, err := resourceClient.List(context.TODO(), metav1.ListOptions{}) - if err != nil { - if logLevel == "TRACE" { - fmt.Printf("Error listing %s: %v\n", gvr.String(), err) - } + continue + } + + objectsMu.Lock() + for _, item := range list.Items { + namespace := item.GetNamespace() + if namespace == "" { + namespace = "clusterwide" + } - tracef("error listing %s: %v", gvr.String(), err) + name := item.GetName() + upsertCollectedObject(objects, namespace, name, info.gvr, preferredByGroup) + } + objectsMu.Unlock() - continue - } + current := atomic.AddInt64(&processed, 1) - for _, item := range list.Items { - name := item.GetName() - upsertCollectedObject(objects, "clusterwide", name, gvr, preferredByGroup) - } + if logLevel != "TRACE" { + progressMu.Lock() + if shouldEmitProgress(current, totalResources, &lastProgressPercent, &lastProgressPrintedAt) { + progress := int((current * 100) / totalResources) + greenProgress := color.New(color.FgGreen).SprintFunc() + fmt.Printf("\rCalculating: [%s] Processed Resource: %s ", greenProgress(fmt.Sprintf("%d%%", progress)), info.gvr.Resource) + } + progressMu.Unlock() + } + } + }() + } + + for _, info := range resources { + jobs <- info } + close(jobs) + wg.Wait() + fmt.Println() return objects, nil @@ -594,182 +675,400 @@ func annotateObjects( timestamp int64, unsupportedTypes map[string]bool, logLevel string, + workers int, ) { - currentObject := 0 totalObjects := len(objects) + if totalObjects == 0 { + fmt.Println() + return + } + items := make([]ObjectRef, 0, totalObjects) for _, obj := range objects { - var err error + items = append(items, obj) + } - if unsupportedTypes[obj.Kind] { - if logLevel == "DEBUG" || logLevel == "TRACE" { - color.Yellow("\nSkipping type that does not support annotation: %s\n", obj.Kind) - } + workerCount := normalizeWorkerCount(workers) + if workerCount > len(items) { + workerCount = len(items) + } - recordSkippedObject(obj, "MethodNotSupported", fmt.Sprintf("Resource type %s does not support PATCH operation", obj.Kind)) + jobs := make(chan ObjectRef) - continue - } + var ( + wg sync.WaitGroup + progressMu sync.Mutex + unsupportedMu sync.RWMutex + processed int64 + ) - currentObject++ - progress := (currentObject * 100) / totalObjects - greenProgress := color.New(color.FgGreen).SprintFunc() - fmt.Printf("\rProgress: [%s] Annotating: Kind=%s, Namespace=%s, Name=%s ", greenProgress(fmt.Sprintf("%d%%", progress)), obj.Kind, obj.Namespace, obj.Name) + total := int64(len(items)) + lastProgressPercent := -1 + lastProgressPrintedAt := time.Now() - if logLevel == "TRACE" { - color.Cyan("\n[TRACE] Processing object: Kind=%s, Namespace=%s, Name=%s, GVR=%s\n", obj.Kind, obj.Namespace, obj.Name, obj.GVR.String()) - } + for i := 0; i < workerCount; i++ { + wg.Add(1) - tracef("processing object kind=%s namespace=%s name=%s gvr=%s", obj.Kind, obj.Namespace, obj.Name, obj.GVR.String()) + go func() { + defer wg.Done() - resourceClient := dynamicClient.Resource(obj.GVR) + for obj := range jobs { + unsupportedMu.RLock() - var objClient dynamic.ResourceInterface - if obj.Namespace == "clusterwide" { - objClient = resourceClient - } else { - objClient = resourceClient.Namespace(obj.Namespace) - } + skipKind := unsupportedTypes[obj.Kind] - // Add annotation - err = addAnnotation(objClient, obj.Name, annotationKey, fmt.Sprintf("%d", timestamp), logLevel) - if err != nil { - errStr := err.Error() + unsupportedMu.RUnlock() - // First, check for permission denied - try with different service account - // This should be checked BEFORE MethodNotSupported, as permission errors - // can sometimes be reported as "method not allowed" - if shouldRetryWithSwitchAccount(errStr) { - color.Yellow("\nRetrying with different service account: %s for %s/%s/%s\n", switchAccount, obj.Kind, obj.Namespace, obj.Name) - tracef("retrying with switch account %s for %s/%s/%s", switchAccount, obj.Kind, obj.Namespace, obj.Name) - switchResourceClient := switchDynamicClient.Resource(obj.GVR) - - var switchObjClient dynamic.ResourceInterface - if obj.Namespace == "clusterwide" { - switchObjClient = switchResourceClient - } else { - switchObjClient = switchResourceClient.Namespace(obj.Namespace) - } + if skipKind { + if logLevel == "DEBUG" || logLevel == "TRACE" { + color.Yellow("\nSkipping type that does not support annotation: %s\n", obj.Kind) + } - err = addAnnotation(switchObjClient, obj.Name, annotationKey, fmt.Sprintf("%d", timestamp), logLevel) - if err != nil { - // If it still fails with MethodNotSupported after switching accounts, then it's truly unsupported - if errors.IsMethodNotSupported(err) { - if logLevel == "TRACE" { - color.Cyan("\n[TRACE] MethodNotSupported error after switching account: %v\n", err) - } + recordSkippedObject(obj, "MethodNotSupported", fmt.Sprintf("Resource type %s does not support PATCH operation", obj.Kind)) - tracef("method not supported after switch account for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) - unsupportedTypes[obj.Kind] = true - color.Yellow("\nAdding %s to unsupported annotation types due to MethodNotSupported (after trying switch account).\n", obj.Kind) - recordSkippedObject(obj, "MethodNotSupported", fmt.Sprintf("After switching to account %s: %v", switchAccount, err)) + current := atomic.AddInt64(&processed, 1) - continue + if logLevel != "TRACE" { + progressMu.Lock() + if shouldEmitProgress(current, total, &lastProgressPercent, &lastProgressPrintedAt) { + printAnnotationProgress(current, total, obj) + } + progressMu.Unlock() } - color.Red("\nFailed to add annotation after switching accounts for %s/%s/%s\n", obj.Kind, obj.Namespace, obj.Name) - color.Yellow("Retry Details: %v\n", err) - tracef("failed to add annotation after switch account for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) - recordFailure(obj, err.Error()) - continue } - // Success with switch account, continue to next object - continue - } - // Check for unsupported method (resource type doesn't support annotations) - // Only mark as unsupported if it's truly MethodNotSupported AND not a permission issue - if errors.IsMethodNotSupported(err) { if logLevel == "TRACE" { - color.Cyan("\n[TRACE] MethodNotSupported error: %v\n", err) - color.Cyan("[TRACE] Error details for %s/%s/%s: %s\n", obj.Kind, obj.Namespace, obj.Name, errStr) - // Check if it's a StatusError to get more details - if statusErr, ok := err.(*errors.StatusError); ok { - color.Cyan("[TRACE] Status code: %d\n", statusErr.Status().Code) - color.Cyan("[TRACE] Status reason: %s\n", statusErr.Status().Reason) - color.Cyan("[TRACE] Status message: %s\n", statusErr.Status().Message) - } + color.Cyan("\n[TRACE] Processing object: Kind=%s, Namespace=%s, Name=%s, GVR=%s\n", obj.Kind, obj.Namespace, obj.Name, obj.GVR.String()) } - tracef("method not supported for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) - unsupportedTypes[obj.Kind] = true - color.Yellow("\nAdding %s to unsupported annotation types due to MethodNotSupported.\n", obj.Kind) - recordSkippedObject(obj, "MethodNotSupported", fmt.Sprintf("Error: %v", err)) + if isTraceVerboseEnabled() { + tracef("processing object kind=%s namespace=%s name=%s gvr=%s", obj.Kind, obj.Namespace, obj.Name, obj.GVR.String()) + } - continue - } + processObjectAnnotation(dynamicClient, switchDynamicClient, obj, timestamp, unsupportedTypes, &unsupportedMu, logLevel) - // Record all other errors (excluding not-found cases - skip retry for these) - isNotFound := errors.IsNotFound(err) || strings.Contains(strings.ToLower(errStr), "not found") - - if isNotFound { - skipReason, skipDetails := classifyNotFoundError(err) - color.Yellow("\nSkipping %s/%s/%s: %s\n", obj.Kind, obj.Namespace, obj.Name, skipDetails) - tracef("skipping object %s/%s/%s: reason=%s details=%s", obj.Kind, obj.Namespace, obj.Name, skipReason, skipDetails) - recordSkippedObject(obj, skipReason, skipDetails) - } else { - // Other errors - definitely record them - color.Red("\nFailed to add annotation to %s/%s/%s\n", obj.Kind, obj.Namespace, obj.Name) - color.Yellow("Details: %v\n", err) - tracef("failed to add annotation for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) - recordFailure(obj, errStr) + current := atomic.AddInt64(&processed, 1) + + if logLevel != "TRACE" { + progressMu.Lock() + if shouldEmitProgress(current, total, &lastProgressPercent, &lastProgressPrintedAt) { + printAnnotationProgress(current, total, obj) + } + progressMu.Unlock() + } } + }() + } - continue + for _, obj := range items { + jobs <- obj + } + + close(jobs) + wg.Wait() + + fmt.Println() +} + +func normalizeWorkerCount(workers int) int { + if workers <= 0 { + return defaultWorkerCount + } + + if workers > maxWorkerCount { + return maxWorkerCount + } + + return workers +} + +func normalizeLogLevel(level string) string { + normalized := strings.ToUpper(strings.TrimSpace(level)) + if normalized == "" { + return "DEBUG" + } + + return normalized +} + +func withRetry(operation, logLevel string, fn func(ctx context.Context) error) error { + var lastErr error + + for attempt := 0; attempt < maxRequestRetries; attempt++ { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + err := fn(ctx) + + cancel() + + if err == nil { + return nil } - // Remove annotation - err = removeAnnotation(objClient, obj.Name, annotationKeyToRemove, logLevel) - if err != nil { - // Skip MethodNotSupported and NotFound errors - no need to record them - // MethodNotSupported: resource type doesn't support PATCH (already known from addAnnotation) - // NotFound: object was deleted, no point in retrying - if !errors.IsMethodNotSupported(err) && !errors.IsNotFound(err) { - errStr := err.Error() - if !strings.Contains(errStr, "Not found") && !strings.Contains(errStr, "not found") { - color.Red("\nFailed to remove annotation from %s/%s/%s\n", obj.Kind, obj.Namespace, obj.Name) - color.Yellow("Details: %v\n", err) - tracef("failed to remove annotation for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) - recordFailure(obj, errStr) - } - } + lastErr = err + if !shouldRetryRequestError(err) || attempt == maxRequestRetries-1 { + return err } + + delay := baseRetryDelay * time.Duration(1<= 500 { + return true + } + } + + if os.IsTimeout(err) { + return true + } + + var netErr net.Error + if stderrors.As(err, &netErr) { + if netErr.Timeout() { + return true + } + + type temporary interface{ Temporary() bool } + if te, ok := any(netErr).(temporary); ok && te.Temporary() { + return true + } + } + + msg := strings.ToLower(err.Error()) + if strings.Contains(msg, "context deadline exceeded") || strings.Contains(msg, "connection reset by peer") || strings.Contains(msg, "too many requests") || strings.Contains(msg, "eof") { + return true + } + + return false +} + +func getObjectClient(client dynamic.Interface, obj ObjectRef) dynamic.ResourceInterface { + resourceClient := client.Resource(obj.GVR) + if obj.Namespace == "clusterwide" { + return resourceClient + } + + return resourceClient.Namespace(obj.Namespace) +} + +func printAnnotationProgress(current, total int64, obj ObjectRef) { + if total <= 0 { + return + } + + progress := int((current * 100) / total) + greenProgress := color.New(color.FgGreen).SprintFunc() + fmt.Printf("\rProgress: [%s] Annotating: Kind=%s, Namespace=%s, Name=%s ", greenProgress(fmt.Sprintf("%d%%", progress)), obj.Kind, obj.Namespace, obj.Name) +} + +func shouldEmitProgress(current, total int64, lastPercent *int, lastPrintedAt *time.Time) bool { + if total <= 0 { + return false + } + + progress := int((current * 100) / total) + now := time.Now() + + if current >= total { + *lastPercent = progress + *lastPrintedAt = now + + return true + } + + if *lastPercent < 0 { + *lastPercent = progress + *lastPrintedAt = now + + return true + } + + if now.Sub(*lastPrintedAt) >= progressPrintInterval { + if progress > *lastPercent { + *lastPercent = progress + } + + *lastPrintedAt = now + + return true + } + + if progress-*lastPercent >= progressPercentStep { + *lastPercent = progress + *lastPrintedAt = now + + return true + } + + return false +} + +func processObjectAnnotation( + dynamicClient dynamic.Interface, + switchDynamicClient dynamic.Interface, + obj ObjectRef, + timestamp int64, + unsupportedTypes map[string]bool, + unsupportedMu *sync.RWMutex, + logLevel string, +) { + objClient := getObjectClient(dynamicClient, obj) + + err := addAnnotation(objClient, obj.Name, annotationKey, fmt.Sprintf("%d", timestamp), logLevel) if err != nil { - if logLevel == "TRACE" { - color.Cyan("\n[TRACE] Get failed for %s: %v\n", name, err) + errStr := err.Error() + + if shouldRetryWithSwitchAccount(errStr) { + color.Yellow("\nRetrying with different service account: %s for %s/%s/%s\n", switchAccount, obj.Kind, obj.Namespace, obj.Name) + tracef("retrying with switch account %s for %s/%s/%s", switchAccount, obj.Kind, obj.Namespace, obj.Name) + switchObjClient := getObjectClient(switchDynamicClient, obj) + + err = addAnnotation(switchObjClient, obj.Name, annotationKey, fmt.Sprintf("%d", timestamp), logLevel) + if err != nil { + if errors.IsMethodNotSupported(err) { + if logLevel == "TRACE" { + color.Cyan("\n[TRACE] MethodNotSupported error after switching account: %v\n", err) + } + + tracef("method not supported after switch account for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) + unsupportedMu.Lock() + unsupportedTypes[obj.Kind] = true + unsupportedMu.Unlock() + color.Yellow("\nAdding %s to unsupported annotation types due to MethodNotSupported (after trying switch account).\n", obj.Kind) + recordSkippedObject(obj, "MethodNotSupported", fmt.Sprintf("After switching to account %s: %v", switchAccount, err)) + + return + } + + color.Red("\nFailed to add annotation after switching accounts for %s/%s/%s\n", obj.Kind, obj.Namespace, obj.Name) + color.Yellow("Retry Details: %v\n", err) + tracef("failed to add annotation after switch account for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) + recordFailure(obj, err.Error()) + + return + } + + return } - tracef("get failed for %s: %s", name, formatServerErrorDetails(err)) + if errors.IsMethodNotSupported(err) { + if logLevel == "TRACE" { + color.Cyan("\n[TRACE] MethodNotSupported error: %v\n", err) + color.Cyan("[TRACE] Error details for %s/%s/%s: %s\n", obj.Kind, obj.Namespace, obj.Name, errStr) - return err + if statusErr, ok := err.(*errors.StatusError); ok { + color.Cyan("[TRACE] Status code: %d\n", statusErr.Status().Code) + color.Cyan("[TRACE] Status reason: %s\n", statusErr.Status().Reason) + color.Cyan("[TRACE] Status message: %s\n", statusErr.Status().Message) + } + } + + tracef("method not supported for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) + unsupportedMu.Lock() + unsupportedTypes[obj.Kind] = true + unsupportedMu.Unlock() + color.Yellow("\nAdding %s to unsupported annotation types due to MethodNotSupported.\n", obj.Kind) + recordSkippedObject(obj, "MethodNotSupported", fmt.Sprintf("Error: %v", err)) + + return + } + + isNotFound := errors.IsNotFound(err) || strings.Contains(strings.ToLower(errStr), "not found") + if isNotFound { + skipReason, skipDetails := classifyNotFoundError(err) + color.Yellow("\nSkipping %s/%s/%s: %s\n", obj.Kind, obj.Namespace, obj.Name, skipDetails) + tracef("skipping object %s/%s/%s: reason=%s details=%s", obj.Kind, obj.Namespace, obj.Name, skipReason, skipDetails) + recordSkippedObject(obj, skipReason, skipDetails) + + return + } + + color.Red("\nFailed to add annotation to %s/%s/%s\n", obj.Kind, obj.Namespace, obj.Name) + color.Yellow("Details: %v\n", err) + tracef("failed to add annotation for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) + recordFailure(obj, errStr) + + return + } + + err = removeAnnotation(objClient, obj.Name, annotationKeyToRemove, logLevel) + if err != nil { + if !errors.IsMethodNotSupported(err) && !errors.IsNotFound(err) { + errStr := err.Error() + if !strings.Contains(errStr, "Not found") && !strings.Contains(errStr, "not found") { + color.Red("\nFailed to remove annotation from %s/%s/%s\n", obj.Kind, obj.Namespace, obj.Name) + color.Yellow("Details: %v\n", err) + tracef("failed to remove annotation for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, formatServerErrorDetails(err)) + recordFailure(obj, errStr) + } + } } +} +func addAnnotation(client dynamic.ResourceInterface, name, key, value, logLevel string) error { if logLevel == "TRACE" { color.Cyan("\n[TRACE] Running annotation command: add %s=%s to %s\n", key, value, name) - color.Cyan("[TRACE] Object UID: %s, ResourceVersion: %s\n", obj.GetUID(), obj.GetResourceVersion()) } - tracef("add annotation key=%s value=%s object=%s uid=%s rv=%s", key, value, name, obj.GetUID(), obj.GetResourceVersion()) - - annotations := obj.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) + if isTraceVerboseEnabled() { + tracef("add annotation key=%s value=%s object=%s", key, value, name) } - annotations[key] = value - obj.SetAnnotations(annotations) - patch := map[string]interface{}{ "metadata": map[string]interface{}{ - "annotations": annotations, + "annotations": map[string]string{key: value}, }, } @@ -783,10 +1082,15 @@ func addAnnotation(client dynamic.ResourceInterface, name, key, value, logLevel color.Cyan("[TRACE] Calling Patch with MergePatchType for %s\n", name) } - tracef("patch payload for %s: %s", name, string(patchBytes)) + if isTraceVerboseEnabled() { + tracef("patch payload for %s: %s", name, string(patchBytes)) + } // Try MergePatchType first - _, err = client.Patch(context.TODO(), name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + err = withRetry("patch merge "+name, logLevel, func(ctx context.Context) error { + _, patchErr := client.Patch(ctx, name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + return patchErr + }) if err != nil { // If MergePatchType fails with MethodNotSupported, try StrategicMergePatchType // This is needed for some resources like pods @@ -796,7 +1100,10 @@ func addAnnotation(client dynamic.ResourceInterface, name, key, value, logLevel } tracef("merge patch type not supported for %s, retry with StrategicMergePatchType", name) - _, err = client.Patch(context.TODO(), name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + err = withRetry("patch strategic "+name, logLevel, func(ctx context.Context) error { + _, patchErr := client.Patch(ctx, name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + return patchErr + }) } if err != nil && logLevel == "TRACE" { @@ -811,8 +1118,10 @@ func addAnnotation(client dynamic.ResourceInterface, name, key, value, logLevel return err } -func removeAnnotation(client dynamic.ResourceInterface, name, keyPrefix, _ string) error { - obj, err := client.Get(context.TODO(), name, metav1.GetOptions{}) +func removeAnnotation(client dynamic.ResourceInterface, name, keyPrefix, logLevel string) error { + obj, err := withRetryResult("get "+name+" for remove annotation", logLevel, func(ctx context.Context) (*unstructured.Unstructured, error) { + return client.Get(ctx, name, metav1.GetOptions{}) + }) if err != nil { if errors.IsNotFound(err) { return nil @@ -826,7 +1135,9 @@ func removeAnnotation(client dynamic.ResourceInterface, name, keyPrefix, _ strin return nil } - // Remove all annotations that start with keyPrefix + // Remove all annotations that start with keyPrefix. + // With keyPrefix="d8-migration-" this removes only legacy d8-migration-* keys + // and keeps the canonical d8-migration annotation set by this command. modified := false for key := range annotations { @@ -852,7 +1163,10 @@ func removeAnnotation(client dynamic.ResourceInterface, name, keyPrefix, _ strin return fmt.Errorf("failed to marshal patch: %w", err) } - _, err = client.Patch(context.TODO(), name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + err = withRetry("patch remove annotation "+name, logLevel, func(ctx context.Context) error { + _, patchErr := client.Patch(ctx, name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + return patchErr + }) return err } @@ -910,39 +1224,78 @@ func loadFailedObjects() (map[string]ObjectRef, error) { return objects, nil } +func getOrOpenRunWriter(current **os.File, path string) (*os.File, error) { + if *current != nil { + return *current, nil + } + + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + + *current = f + + return f, nil +} + +func closeRunStateWriters(state *sigMigrateRunState) { + if state == nil { + return + } + + fileWriteMu.Lock() + defer fileWriteMu.Unlock() + + if state.failedAttemptsWriter != nil { + _ = state.failedAttemptsWriter.Sync() + _ = state.failedAttemptsWriter.Close() + state.failedAttemptsWriter = nil + } + + if state.errorLogWriter != nil { + _ = state.errorLogWriter.Sync() + _ = state.errorLogWriter.Close() + state.errorLogWriter = nil + } + + if state.skippedObjectsWriter != nil { + _ = state.skippedObjectsWriter.Sync() + _ = state.skippedObjectsWriter.Close() + state.skippedObjectsWriter = nil + } +} + func recordFailure(obj ObjectRef, errorMsg string) { failedAttemptsFile := getFailedAttemptsFilePath() errorLogFile := getErrorLogFilePath() tracef("recording failure for %s/%s/%s: %s", obj.Kind, obj.Namespace, obj.Name, errorMsg) - // Append to failed attempts file - f, err := os.OpenFile(failedAttemptsFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + fileWriteMu.Lock() + defer fileWriteMu.Unlock() + + state := getCurrentRunState() + + failedWriter, err := getOrOpenRunWriter(&state.failedAttemptsWriter, failedAttemptsFile) if err != nil { - // If we can't write to the file, log to stderr as fallback fmt.Fprintf(os.Stderr, "Warning: failed to write to %s: %v\n", failedAttemptsFile, err) tracef("failed to append failed attempts file %s: %v", failedAttemptsFile, err) return } - _, _ = fmt.Fprintf(f, "%s|%s|%s|%s|%s\n", obj.Namespace, obj.Name, obj.Kind, obj.GVR.Group, obj.GVR.Version) - _ = f.Sync() - _ = f.Close() + _, _ = fmt.Fprintf(failedWriter, "%s|%s|%s|%s|%s\n", obj.Namespace, obj.Name, obj.Kind, obj.GVR.Group, obj.GVR.Version) - // Append to error log file - f, err = os.OpenFile(errorLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + errorWriter, err := getOrOpenRunWriter(&state.errorLogWriter, errorLogFile) if err != nil { - // If we can't write to the file, log to stderr as fallback fmt.Fprintf(os.Stderr, "Warning: failed to write to %s: %v\n", errorLogFile, err) tracef("failed to append error log file %s: %v", errorLogFile, err) return } - _, _ = fmt.Fprintf(f, "%s|%s|%s|%s\n", obj.Namespace, obj.Name, obj.Kind, errorMsg) - _ = f.Sync() - _ = f.Close() + _, _ = fmt.Fprintf(errorWriter, "%s|%s|%s|%s\n", obj.Namespace, obj.Name, obj.Kind, errorMsg) } func recordSkippedObject(obj ObjectRef, reason string, details string) { @@ -950,10 +1303,13 @@ func recordSkippedObject(obj ObjectRef, reason string, details string) { tracef("recording skipped object %s/%s/%s: reason=%s details=%s", obj.Kind, obj.Namespace, obj.Name, reason, details) - // Append to skipped objects file - f, err := os.OpenFile(skippedObjectsFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + fileWriteMu.Lock() + defer fileWriteMu.Unlock() + + state := getCurrentRunState() + + skippedWriter, err := getOrOpenRunWriter(&state.skippedObjectsWriter, skippedObjectsFile) if err != nil { - // If we can't write to the file, log to stderr as fallback fmt.Fprintf(os.Stderr, "Warning: failed to write to %s: %v\n", skippedObjectsFile, err) tracef("failed to append skipped objects file %s: %v", skippedObjectsFile, err) @@ -962,9 +1318,7 @@ func recordSkippedObject(obj ObjectRef, reason string, details string) { timestamp := time.Now().Format(time.RFC3339) gvrStr := obj.GVR.String() - _, _ = fmt.Fprintf(f, "%s|%s|%s|%s|%s|%s|%s\n", timestamp, obj.Namespace, obj.Name, obj.Kind, gvrStr, reason, details) - _ = f.Sync() - _ = f.Close() + _, _ = fmt.Fprintf(skippedWriter, "%s|%s|%s|%s|%s|%s|%s\n", timestamp, obj.Namespace, obj.Name, obj.Kind, gvrStr, reason, details) } func shouldRetryWithSwitchAccount(errMsg string) bool { diff --git a/internal/tools/sigmigrate/sigmigrate_test.go b/internal/tools/sigmigrate/sigmigrate_test.go index 02fb6742..646f3298 100644 --- a/internal/tools/sigmigrate/sigmigrate_test.go +++ b/internal/tools/sigmigrate/sigmigrate_test.go @@ -18,9 +18,12 @@ package sigmigrate import ( "context" + "fmt" "os" "path/filepath" + "strconv" "strings" + "sync" "testing" "time" @@ -165,7 +168,7 @@ func TestUpsertCollectedObject_PrefersPreferredVersion(t *testing.T) { func TestLoadFailedObjects_DoesNotOverwriteSameNamespaceNameKindAcrossGroups(t *testing.T) { tmpDir := t.TempDir() - legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.txt") + legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.log") setCurrentRunState(&sigMigrateRunState{LegacyFailedRetryFile: legacyRetryFile}) defer setCurrentRunState(nil) @@ -357,7 +360,7 @@ func TestRemoveAnnotation(t *testing.T) { func TestLoadFailedObjects(t *testing.T) { tmpDir := t.TempDir() - legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.txt") + legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.log") runState := &sigMigrateRunState{ LegacyFailedRetryFile: legacyRetryFile, } @@ -394,7 +397,7 @@ func TestLoadFailedObjects(t *testing.T) { func TestLoadFailedObjects_ExtendedRetryFormatIncludesGVR(t *testing.T) { tmpDir := t.TempDir() - legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.txt") + legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.log") setCurrentRunState(&sigMigrateRunState{LegacyFailedRetryFile: legacyRetryFile}) defer setCurrentRunState(nil) @@ -418,7 +421,7 @@ func TestLoadFailedObjects_ExtendedRetryFormatIncludesGVR(t *testing.T) { func TestLoadFailedObjects_ExtendedRetryFormat_NamespacedObjects(t *testing.T) { tmpDir := t.TempDir() - legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.txt") + legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.log") setCurrentRunState(&sigMigrateRunState{LegacyFailedRetryFile: legacyRetryFile}) defer setCurrentRunState(nil) @@ -453,8 +456,8 @@ func TestLoadFailedObjects_ExtendedRetryFormat_NamespacedObjects(t *testing.T) { func TestRecordFailure(t *testing.T) { tmpDir := t.TempDir() - failedFile := filepath.Join(tmpDir, "failed_annotations.txt") - errorFile := filepath.Join(tmpDir, "failed_errors.txt") + failedFile := filepath.Join(tmpDir, "failed_annotations.log") + errorFile := filepath.Join(tmpDir, "failed_errors.log") obj := ObjectRef{ Namespace: "default", @@ -528,7 +531,7 @@ func TestAnnotateObjects_UnsupportedType(t *testing.T) { } // Should skip unsupported types - annotateObjects(dynamicClient, dynamicClient, objects, 1234567890, unsupportedTypes, "DEBUG") + annotateObjects(dynamicClient, dynamicClient, objects, 1234567890, unsupportedTypes, "DEBUG", 4) // Verify object was not modified resourceClient := dynamicClient.Resource(gvr).Namespace("default") @@ -544,8 +547,8 @@ func TestAnnotateObjects_UnsupportedType(t *testing.T) { func TestRecordFailure_FileWrite(t *testing.T) { tmpDir := t.TempDir() - failedFile := filepath.Join(tmpDir, "failed_annotations.txt") - errorFile := filepath.Join(tmpDir, "failed_errors.txt") + failedFile := filepath.Join(tmpDir, "failed_annotations.log") + errorFile := filepath.Join(tmpDir, "failed_errors.log") obj := ObjectRef{ Namespace: "test-ns", @@ -561,22 +564,22 @@ func TestRecordFailure_FileWrite(t *testing.T) { // Verify files were created _, err := os.Stat(failedFile) - require.NoError(t, err, "failed_annotations.txt should be created") + require.NoError(t, err, "failed_annotations.log should be created") _, err = os.Stat(errorFile) - require.NoError(t, err, "failed_errors.txt should be created") + require.NoError(t, err, "failed_errors.log should be created") - // Verify content of failed_annotations.txt + // Verify content of failed_annotations.log failedData, err := os.ReadFile(failedFile) require.NoError(t, err) expectedFailedLine := "test-ns|test-resource|pods\n" - require.Equal(t, expectedFailedLine, string(failedData), "failed_annotations.txt should contain correct data") + require.Equal(t, expectedFailedLine, string(failedData), "failed_annotations.log should contain correct data") - // Verify content of failed_errors.txt + // Verify content of failed_errors.log errorData, err := os.ReadFile(errorFile) require.NoError(t, err) expectedErrorLine := "test-ns|test-resource|pods|test error message\n" - require.Equal(t, expectedErrorLine, string(errorData), "failed_errors.txt should contain correct data") + require.Equal(t, expectedErrorLine, string(errorData), "failed_errors.log should contain correct data") // Test appending multiple failures obj2 := ObjectRef{ @@ -620,9 +623,9 @@ func TestAnnotateObjects_ErrorRecording(t *testing.T) { } tmpDir := t.TempDir() - runFailedFile := filepath.Join(tmpDir, "failed_annotations_run.txt") - runErrorFile := filepath.Join(tmpDir, "failed_errors_run.txt") - runSkippedFile := filepath.Join(tmpDir, "skipped_run.txt") + runFailedFile := filepath.Join(tmpDir, "failed_annotations_run.log") + runErrorFile := filepath.Join(tmpDir, "failed_errors_run.log") + runSkippedFile := filepath.Join(tmpDir, "skipped_run.log") setCurrentRunState(&sigMigrateRunState{ FailedAttemptsFile: runFailedFile, ErrorLogFile: runErrorFile, @@ -631,7 +634,7 @@ func TestAnnotateObjects_ErrorRecording(t *testing.T) { defer setCurrentRunState(nil) unsupportedTypes := make(map[string]bool) - annotateObjects(dynamicClient, dynamicClient, objects, 1234567890, unsupportedTypes, "DEBUG") + annotateObjects(dynamicClient, dynamicClient, objects, 1234567890, unsupportedTypes, "DEBUG", 4) // NotFound errors are classified as skipped and should be written to skipped file. skippedData, err := os.ReadFile(runSkippedFile) @@ -647,8 +650,8 @@ func TestAnnotateObjects_ErrorRecording(t *testing.T) { func TestSyncLegacyRetryFile(t *testing.T) { tmpDir := t.TempDir() - runFile := filepath.Join(tmpDir, "failed_annotations_run.txt") - legacyFile := filepath.Join(tmpDir, "failed_annotations.txt") + runFile := filepath.Join(tmpDir, "failed_annotations_run.log") + legacyFile := filepath.Join(tmpDir, "failed_annotations.log") err := os.WriteFile(runFile, []byte("ns|obj|pods\n"), 0644) require.NoError(t, err) @@ -667,6 +670,30 @@ func TestSyncLegacyRetryFile(t *testing.T) { require.Equal(t, "ns|obj|pods\n", string(legacyData)) } +func TestSyncLegacyRetryFileForState_WorksAfterCurrentRunStateReset(t *testing.T) { + tmpDir := t.TempDir() + runFile := filepath.Join(tmpDir, "failed_annotations_run.log") + legacyFile := filepath.Join(tmpDir, "failed_annotations.log") + + err := os.WriteFile(runFile, []byte("ns|obj|pods\n"), 0644) + require.NoError(t, err) + + runState := &sigMigrateRunState{ + FailedAttemptsFile: runFile, + LegacyFailedRetryFile: legacyFile, + } + + // Simulate cleanup where global run state is already reset. + setCurrentRunState(nil) + + err = syncLegacyRetryFileForState(runState) + require.NoError(t, err) + + legacyData, err := os.ReadFile(legacyFile) + require.NoError(t, err) + require.Equal(t, "ns|obj|pods\n", string(legacyData)) +} + func TestTracefWritesContent(t *testing.T) { tmpDir := t.TempDir() tracePath := filepath.Join(tmpDir, "sigmigrate_trace.log") @@ -694,9 +721,9 @@ func TestNewSigMigrateRunState_GeneratesSaltedPaths(t *testing.T) { state := newSigMigrateRunState(ts) require.Equal(t, "20260414T151625Z", state.RunID) - require.Equal(t, "/tmp/failed_annotations_20260414T151625Z.txt", state.FailedAttemptsFile) - require.Equal(t, "/tmp/failed_errors_20260414T151625Z.txt", state.ErrorLogFile) - require.Equal(t, "/tmp/skipped_objects_20260414T151625Z.txt", state.SkippedObjectsFile) + require.Equal(t, "/tmp/failed_annotations_20260414T151625Z.log", state.FailedAttemptsFile) + require.Equal(t, "/tmp/failed_errors_20260414T151625Z.log", state.ErrorLogFile) + require.Equal(t, "/tmp/skipped_objects_20260414T151625Z.log", state.SkippedObjectsFile) require.Equal(t, "/tmp/sigmigrate_trace_20260414T151625Z.log", state.TraceLogFile) require.Equal(t, legacyFailedAttemptsFile, state.LegacyFailedRetryFile) } @@ -737,6 +764,339 @@ func TestIsResourceEndpointNotFound(t *testing.T) { require.False(t, isResourceEndpointNotFound(apierrors.NewNotFound(schema.GroupResource{Resource: "configmaps"}, "test-cm"))) } +func TestNormalizeWorkerCount(t *testing.T) { + tests := []struct { + name string + input int + expected int + }{ + {name: "negative defaults", input: -10, expected: defaultWorkerCount}, + {name: "zero defaults", input: 0, expected: defaultWorkerCount}, + {name: "valid value", input: 32, expected: 32}, + {name: "value above max is capped", input: maxWorkerCount + 100, expected: maxWorkerCount}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, normalizeWorkerCount(tt.input)) + }) + } +} + +func TestNormalizeLogLevel(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + {name: "empty defaults to debug", input: "", expected: "DEBUG"}, + {name: "whitespace defaults to debug", input: " ", expected: "DEBUG"}, + {name: "lowercase converted to uppercase", input: "trace", expected: "TRACE"}, + {name: "mixed case converted to uppercase", input: "dEbUg", expected: "DEBUG"}, + {name: "trim spaces", input: " info ", expected: "INFO"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, normalizeLogLevel(tt.input)) + }) + } +} + +func TestShouldRetryRequestError(t *testing.T) { + require.True(t, shouldRetryRequestError(apierrors.NewTooManyRequests("too many", 1))) + require.True(t, shouldRetryRequestError(apierrors.NewTimeoutError("timeout", 1))) + require.False(t, shouldRetryRequestError(apierrors.NewBadRequest("bad request"))) +} + +func TestWithRetryResult_RetriesAndSucceeds(t *testing.T) { + attempts := 0 + + result, err := withRetryResult("test-retry", "DEBUG", func(_ context.Context) (string, error) { + attempts++ + if attempts < 3 { + return "", apierrors.NewTooManyRequests("rate limited", 1) + } + return "ok", nil + }) + + require.NoError(t, err) + require.Equal(t, "ok", result) + require.Equal(t, 3, attempts) +} + +func TestWithRetry_NoRetryForNonRetryableError(t *testing.T) { + attempts := 0 + err := withRetry("test-no-retry", "DEBUG", func(_ context.Context) error { + attempts++ + return apierrors.NewBadRequest("invalid") + }) + + require.Error(t, err) + require.Equal(t, 1, attempts) +} + +func TestRecordFailure_ConcurrentWrites(t *testing.T) { + tmpDir := t.TempDir() + runFailedFile := filepath.Join(tmpDir, "failed_annotations_run.log") + runErrorFile := filepath.Join(tmpDir, "failed_errors_run.log") + runSkippedFile := filepath.Join(tmpDir, "skipped_run.log") + setCurrentRunState(&sigMigrateRunState{ + FailedAttemptsFile: runFailedFile, + ErrorLogFile: runErrorFile, + SkippedObjectsFile: runSkippedFile, + }) + defer setCurrentRunState(nil) + + const total = 100 + var wg sync.WaitGroup + for i := 0; i < total; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + obj := ObjectRef{ + Namespace: "default", + Name: fmt.Sprintf("obj-%d", idx), + Kind: "configmaps", + GVR: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "configmaps", + }, + } + recordFailure(obj, "boom") + }(i) + } + wg.Wait() + + failedData, err := os.ReadFile(runFailedFile) + require.NoError(t, err) + errorData, err := os.ReadFile(runErrorFile) + require.NoError(t, err) + + failedLines := strings.Split(strings.TrimSpace(string(failedData)), "\n") + errorLines := strings.Split(strings.TrimSpace(string(errorData)), "\n") + require.Len(t, failedLines, total) + require.Len(t, errorLines, total) +} + +func TestAnnotateObjects_ManyObjectsWithWorkers(t *testing.T) { + scheme := runtime.NewScheme() + gvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"} + + const total = 120 + objs := make([]runtime.Object, 0, total) + input := make(map[string]ObjectRef, total) + for i := 0; i < total; i++ { + name := "cm-" + strconv.Itoa(i) + objs = append(objs, &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": name, + "namespace": "default", + }, + }}) + input["default|"+name+"||configmaps"] = ObjectRef{ + Namespace: "default", + Name: name, + Kind: "configmaps", + GVR: gvr, + } + } + + dynamicClient := fake.NewSimpleDynamicClient(scheme, objs...) + + tmpDir := t.TempDir() + runFailedFile := filepath.Join(tmpDir, "failed_annotations_run.log") + runErrorFile := filepath.Join(tmpDir, "failed_errors_run.log") + runSkippedFile := filepath.Join(tmpDir, "skipped_run.log") + setCurrentRunState(&sigMigrateRunState{ + FailedAttemptsFile: runFailedFile, + ErrorLogFile: runErrorFile, + SkippedObjectsFile: runSkippedFile, + }) + defer setCurrentRunState(nil) + + annotateObjects(dynamicClient, dynamicClient, input, 1234567890, map[string]bool{}, "DEBUG", 16) + + _, err := os.Stat(runFailedFile) + require.True(t, os.IsNotExist(err), "failed file should not be created on successful bulk annotate") + _, err = os.Stat(runErrorFile) + require.True(t, os.IsNotExist(err), "error file should not be created on successful bulk annotate") +} + +func TestAnnotateObjects_SimpleRun(t *testing.T) { + scheme := runtime.NewScheme() + gvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"} + + obj1 := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "cm-1", + "namespace": "default", + }, + }} + obj2 := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "cm-2", + "namespace": "default", + }, + }} + + dynamicClient := fake.NewSimpleDynamicClient(scheme, obj1, obj2) + + tmpDir := t.TempDir() + runFailedFile := filepath.Join(tmpDir, "failed_annotations_run.log") + runErrorFile := filepath.Join(tmpDir, "failed_errors_run.log") + runSkippedFile := filepath.Join(tmpDir, "skipped_run.log") + setCurrentRunState(&sigMigrateRunState{ + FailedAttemptsFile: runFailedFile, + ErrorLogFile: runErrorFile, + SkippedObjectsFile: runSkippedFile, + }) + defer setCurrentRunState(nil) + + objects := map[string]ObjectRef{ + "default|cm-1||configmaps": { + Namespace: "default", + Name: "cm-1", + Kind: "configmaps", + GVR: gvr, + }, + "default|cm-2||configmaps": { + Namespace: "default", + Name: "cm-2", + Kind: "configmaps", + GVR: gvr, + }, + } + + timestamp := int64(1234567890) + annotateObjects(dynamicClient, dynamicClient, objects, timestamp, map[string]bool{}, "DEBUG", 4) + + client := dynamicClient.Resource(gvr).Namespace("default") + updated1, err := client.Get(context.TODO(), "cm-1", metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, strconv.FormatInt(timestamp, 10), updated1.GetAnnotations()[annotationKey]) + + updated2, err := client.Get(context.TODO(), "cm-2", metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, strconv.FormatInt(timestamp, 10), updated2.GetAnnotations()[annotationKey]) + + _, err = os.Stat(runFailedFile) + require.True(t, os.IsNotExist(err), "failed file should not exist on successful run") + _, err = os.Stat(runErrorFile) + require.True(t, os.IsNotExist(err), "error file should not exist on successful run") +} + +func TestRetryFlow_LoadFailedAndAnnotate(t *testing.T) { + scheme := runtime.NewScheme() + gvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"} + + obj := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "retry-cm", + "namespace": "default", + }, + }} + dynamicClient := fake.NewSimpleDynamicClient(scheme, obj) + + tmpDir := t.TempDir() + legacyRetryFile := filepath.Join(tmpDir, "failed_annotations_legacy.log") + runFailedFile := filepath.Join(tmpDir, "failed_annotations_run.log") + runErrorFile := filepath.Join(tmpDir, "failed_errors_run.log") + runSkippedFile := filepath.Join(tmpDir, "skipped_run.log") + + retryData := strings.Join([]string{ + "default|retry-cm|configmaps||v1", + "default|missing-cm|configmaps||v1", + }, "\n") + "\n" + err := os.WriteFile(legacyRetryFile, []byte(retryData), 0644) + require.NoError(t, err) + + setCurrentRunState(&sigMigrateRunState{ + LegacyFailedRetryFile: legacyRetryFile, + FailedAttemptsFile: runFailedFile, + ErrorLogFile: runErrorFile, + SkippedObjectsFile: runSkippedFile, + }) + defer setCurrentRunState(nil) + + objects, err := loadFailedObjects() + require.NoError(t, err) + require.Len(t, objects, 2) + + timestamp := int64(1234567890) + annotateObjects(dynamicClient, dynamicClient, objects, timestamp, map[string]bool{}, "DEBUG", 8) + + updated, err := dynamicClient.Resource(gvr).Namespace("default").Get(context.TODO(), "retry-cm", metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, strconv.FormatInt(timestamp, 10), updated.GetAnnotations()[annotationKey]) + + skippedData, err := os.ReadFile(runSkippedFile) + require.NoError(t, err) + require.Contains(t, string(skippedData), "default|missing-cm|configmaps") + require.Contains(t, string(skippedData), "NotFound") +} + +func TestSingleObjectMigration_ByIdentifier(t *testing.T) { + scheme := runtime.NewScheme() + gvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"} + + objs := []runtime.Object{ + &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{"name": "target", "namespace": "default"}, + }}, + &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{"name": "other", "namespace": "default"}, + }}, + } + dynamicClient := fake.NewSimpleDynamicClient(scheme, objs...) + + allObjects := map[string]ObjectRef{ + "default|target||configmaps": { + Namespace: "default", + Name: "target", + Kind: "configmaps", + GVR: gvr, + }, + "default|other||configmaps": { + Namespace: "default", + Name: "other", + Kind: "configmaps", + GVR: gvr, + }, + } + + filtered := filterObjectsByIdentifier(allObjects, "default/target/configmaps") + require.Len(t, filtered, 1) + + timestamp := int64(1234567890) + annotateObjects(dynamicClient, dynamicClient, filtered, timestamp, map[string]bool{}, "DEBUG", 2) + + client := dynamicClient.Resource(gvr).Namespace("default") + target, err := client.Get(context.TODO(), "target", metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, strconv.FormatInt(timestamp, 10), target.GetAnnotations()[annotationKey]) + + other, err := client.Get(context.TODO(), "other", metav1.GetOptions{}) + require.NoError(t, err) + if other.GetAnnotations() != nil { + _, exists := other.GetAnnotations()[annotationKey] + require.False(t, exists, "non-target object should remain unchanged") + } +} + func TestFormatServerErrorDetails_StatusError(t *testing.T) { err := &apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusFailure,