Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions internal/controller/clustermanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
metrics "github.com/splunk/splunk-operator/pkg/splunk/client/metrics"
enterprise "github.com/splunk/splunk-operator/pkg/splunk/enterprise"
splutil "github.com/splunk/splunk-operator/pkg/splunk/util"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (r *ClusterManagerReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// Pass event recorder through context
ctx = context.WithValue(ctx, splcommon.EventRecorderKey, r.Recorder)

result, err := ApplyClusterManager(ctx, r.Client, instance)
result, err := ApplyClusterManager(ctx, r.Client, instance, nil)
if result.Requeue && result.RequeueAfter != 0 {
reqLogger.Info("Requeued", "period(seconds)", int(result.RequeueAfter/time.Second))
}
Expand All @@ -118,8 +119,8 @@ func (r *ClusterManagerReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

// ApplyClusterManager adding to handle unit test case
var ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager) (reconcile.Result, error) {
return enterprise.ApplyClusterManager(ctx, client, instance)
var ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager, podExecClient splutil.PodExecClientImpl) (reconcile.Result, error) {
return enterprise.ApplyClusterManager(ctx, client, instance, podExecClient)
}

func (r *ClusterManagerReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down
9 changes: 5 additions & 4 deletions internal/controller/clustermanager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/splunk/splunk-operator/internal/controller/testutils"

enterpriseApi "github.com/splunk/splunk-operator/api/v4"
splutil "github.com/splunk/splunk-operator/pkg/splunk/util"

"time"

Expand Down Expand Up @@ -36,7 +37,7 @@ var _ = Describe("ClusterManager Controller", func() {

It("Get ClusterManager custom resource should failed", func() {
namespace := "ns-splunk-cm-1"
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager) (reconcile.Result, error) {
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager, podExecClient splutil.PodExecClientImpl) (reconcile.Result, error) {
return reconcile.Result{}, nil
}
nsSpecs := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
Expand All @@ -52,7 +53,7 @@ var _ = Describe("ClusterManager Controller", func() {

It("Create ClusterManager custom resource with annotations should pause", func() {
namespace := "ns-splunk-cm-2"
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager) (reconcile.Result, error) {
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager, podExecClient splutil.PodExecClientImpl) (reconcile.Result, error) {
return reconcile.Result{}, nil
}
nsSpecs := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
Expand All @@ -72,7 +73,7 @@ var _ = Describe("ClusterManager Controller", func() {
Context("ClusterManager Management", func() {
It("Create ClusterManager custom resource should succeeded", func() {
namespace := "ns-splunk-cm-3"
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager) (reconcile.Result, error) {
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager, podExecClient splutil.PodExecClientImpl) (reconcile.Result, error) {
return reconcile.Result{}, nil
}
nsSpecs := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
Expand All @@ -85,7 +86,7 @@ var _ = Describe("ClusterManager Controller", func() {

It("Cover Unused methods", func() {
namespace := "ns-splunk-cm-4"
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager) (reconcile.Result, error) {
ApplyClusterManager = func(ctx context.Context, client client.Client, instance *enterpriseApi.ClusterManager, podExecClient splutil.PodExecClientImpl) (reconcile.Result, error) {
return reconcile.Result{}, nil
}
nsSpecs := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
Expand Down
38 changes: 25 additions & 13 deletions pkg/splunk/enterprise/afwscheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

var (
phaseManagerBusyWaitDuration = 1 * time.Second
phaseManagerLoopSleepDuration = 200 * time.Millisecond
)

var appPhaseInfoStatuses = map[enterpriseApi.AppPhaseStatusType]bool{
enterpriseApi.AppPkgDownloadPending: true,
enterpriseApi.AppPkgDownloadInProgress: true,
Expand Down Expand Up @@ -597,10 +602,10 @@ downloadWork:
default:
// All the workers are busy, check after one second
scopedLog.Info("All the workers are busy, we will check again after one second")
time.Sleep(1 * time.Second)
time.Sleep(phaseManagerBusyWaitDuration)
}

time.Sleep(200 * time.Millisecond)
time.Sleep(phaseManagerLoopSleepDuration)
}

// wait for all the download threads to finish
Expand Down Expand Up @@ -680,7 +685,7 @@ downloadPhase:
}
}

time.Sleep(200 * time.Millisecond)
time.Sleep(phaseManagerLoopSleepDuration)
}
}

Expand Down Expand Up @@ -1002,7 +1007,11 @@ func runPodCopyWorker(ctx context.Context, worker *PipelineWorker, ch chan struc
}

// get the podExecClient to be used for copying file to pod
podExecClient := splutil.GetPodExecClient(worker.client, cr, worker.targetPodName)
// Use injected client if available (for testing), otherwise create real client
podExecClient := worker.podExecClient
if podExecClient == nil {
podExecClient = splutil.GetPodExecClient(worker.client, cr, worker.targetPodName)
}
stdOut, stdErr, err := CopyFileToPod(ctx, worker.client, cr.GetNamespace(), appPkgLocalPath, appPkgPathOnPod, podExecClient)
if err != nil {
phaseInfo.FailCount++
Expand Down Expand Up @@ -1062,10 +1071,10 @@ podCopyHandler:
}
default:
// All the workers are busy, check after one second
time.Sleep(1 * time.Second)
time.Sleep(phaseManagerBusyWaitDuration)
}

time.Sleep(200 * time.Millisecond)
time.Sleep(phaseManagerLoopSleepDuration)
}

// Wait for all the workers to finish
Expand Down Expand Up @@ -1131,7 +1140,7 @@ podCopyPhase:
}
}

time.Sleep(200 * time.Millisecond)
time.Sleep(phaseManagerLoopSleepDuration)
}
}

Expand Down Expand Up @@ -1231,9 +1240,12 @@ installHandler:

// Install workers can exist for local scope and premium app scopes
if installWorker != nil {
podExecClient := splutil.GetPodExecClient(installWorker.client, installWorker.cr, installWorker.targetPodName)
// Use injected client if available (for testing), otherwise create real client
podExecClient := installWorker.podExecClient
if podExecClient == nil {
podExecClient = splutil.GetPodExecClient(installWorker.client, installWorker.cr, installWorker.targetPodName)
}
podID, _ := getOrdinalValFromPodName(installWorker.targetPodName)

// Get app source spec
appSrcSpec, err := getAppSrcSpec(installWorker.afwConfig.AppSources, installWorker.appSrcName)
if err != nil {
Expand Down Expand Up @@ -1264,10 +1276,10 @@ installHandler:
}

default:
time.Sleep(1 * time.Second)
time.Sleep(phaseManagerBusyWaitDuration)
}

time.Sleep(200 * time.Millisecond)
time.Sleep(phaseManagerLoopSleepDuration)
}

for {
Expand All @@ -1287,7 +1299,7 @@ installHandler:
}

// Sleep for a second before retry
time.Sleep(1 * time.Second)
time.Sleep(phaseManagerBusyWaitDuration)
}

// Wait for all the workers to finish
Expand Down Expand Up @@ -1383,7 +1395,7 @@ installPhase:
}
}

time.Sleep(200 * time.Millisecond)
time.Sleep(phaseManagerLoopSleepDuration)
}
}

Expand Down
Loading
Loading