From c47a27087c288881510751d96d552e8b901aaade Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 10 Apr 2026 13:54:34 +0000 Subject: [PATCH 1/2] Add auto-parallelism for RDS refresh dump and restore Resolve optimal -j values automatically: use EC2 DescribeInstanceTypes to determine vCPU count of the RDS clone (for pg_dump parallelism) and runtime.NumCPU for the local machine (for pg_restore parallelism). Pass both values through the existing ConfigProjection when updating DBLab config during refresh. https://claude.ai/code/session_01AhnBVCBWjk24T7BBQtmkbq --- engine/go.mod | 1 + engine/go.sum | 2 + engine/internal/rdsrefresh/dblab.go | 14 ++ engine/internal/rdsrefresh/dblab_test.go | 64 ++++++++ engine/internal/rdsrefresh/parallelism.go | 139 ++++++++++++++++ .../internal/rdsrefresh/parallelism_test.go | 155 ++++++++++++++++++ engine/internal/rdsrefresh/refresher.go | 63 ++++--- 7 files changed, 417 insertions(+), 21 deletions(-) create mode 100644 engine/internal/rdsrefresh/parallelism.go create mode 100644 engine/internal/rdsrefresh/parallelism_test.go diff --git a/engine/go.mod b/engine/go.mod index 3d38d85a..a48ed4b1 100644 --- a/engine/go.mod +++ b/engine/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go v1.55.8 github.com/aws/aws-sdk-go-v2 v1.41.5 github.com/aws/aws-sdk-go-v2/config v1.32.14 + github.com/aws/aws-sdk-go-v2/service/ec2 v1.297.0 github.com/aws/aws-sdk-go-v2/service/rds v1.117.1 github.com/containerd/errdefs v1.0.0 github.com/docker/cli v28.5.2+incompatible diff --git a/engine/go.sum b/engine/go.sum index 6f4272bf..e998c8ab 100644 --- a/engine/go.sum +++ b/engine/go.sum @@ -28,6 +28,8 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgq github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.297.0 h1:A+7NViqbMUCoTQFWjbSXdbzE4K5Ziu2zWJtZzAusm+A= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.297.0/go.mod h1:R+2BNtUfTfhPY0RH18oL02q116bakeBWjanrbnVBqkM= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 h1:c31//R3xgIJMSC8S6hEVq+38DcvUlgFY0FM6mSI5oto= diff --git a/engine/internal/rdsrefresh/dblab.go b/engine/internal/rdsrefresh/dblab.go index 780c0f75..2c29bb27 100644 --- a/engine/internal/rdsrefresh/dblab.go +++ b/engine/internal/rdsrefresh/dblab.go @@ -180,6 +180,10 @@ type SourceConfigUpdate struct { Password string // RDSIAMDBInstance is the RDS DB instance identifier for IAM auth. When empty, this field is omitted from the config update. RDSIAMDBInstance string + // DumpParallelJobs sets the -j flag for pg_dump. When zero, the existing value is preserved. + DumpParallelJobs int + // RestoreParallelJobs sets the -j flag for pg_restore. When zero, the existing value is preserved. + RestoreParallelJobs int } // UpdateSourceConfig updates the source database connection in DBLab config. @@ -198,6 +202,16 @@ func (c *DBLabClient) UpdateSourceConfig(ctx context.Context, update SourceConfi proj.RDSIAMDBInstance = &update.RDSIAMDBInstance } + if update.DumpParallelJobs > 0 { + dumpJobs := int64(update.DumpParallelJobs) + proj.DumpParallelJobs = &dumpJobs + } + + if update.RestoreParallelJobs > 0 { + restoreJobs := int64(update.RestoreParallelJobs) + proj.RestoreParallelJobs = &restoreJobs + } + nested := map[string]interface{}{} // defensive error check: StoreJSON only fails if target is not an addressable struct, diff --git a/engine/internal/rdsrefresh/dblab_test.go b/engine/internal/rdsrefresh/dblab_test.go index b3e48758..878b8184 100644 --- a/engine/internal/rdsrefresh/dblab_test.go +++ b/engine/internal/rdsrefresh/dblab_test.go @@ -191,6 +191,70 @@ func TestDBLabClientUpdateSourceConfig(t *testing.T) { assert.Nil(t, receivedConfig.RDSIAMDBInstance) }) + t.Run("successful with parallelism settings", func(t *testing.T) { + var receivedConfig models.ConfigProjection + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var nested map[string]interface{} + err := json.NewDecoder(r.Body).Decode(&nested) + require.NoError(t, err) + + err = projection.LoadJSON(&receivedConfig, nested, projection.LoadOptions{ + Groups: []string{"default", "sensitive"}, + }) + require.NoError(t, err) + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client, err := NewDBLabClient(&DBLabConfig{APIEndpoint: server.URL, Token: "test-token"}) + require.NoError(t, err) + + err = client.UpdateSourceConfig(context.Background(), SourceConfigUpdate{ + Host: "clone-host.rds.amazonaws.com", Port: 5432, DBName: "postgres", + Username: "dbuser", Password: "dbpass", + DumpParallelJobs: 4, RestoreParallelJobs: 8, + }) + require.NoError(t, err) + + require.NotNil(t, receivedConfig.DumpParallelJobs) + assert.Equal(t, int64(4), *receivedConfig.DumpParallelJobs) + require.NotNil(t, receivedConfig.RestoreParallelJobs) + assert.Equal(t, int64(8), *receivedConfig.RestoreParallelJobs) + }) + + t.Run("omits parallelism when zero", func(t *testing.T) { + var receivedConfig models.ConfigProjection + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var nested map[string]interface{} + err := json.NewDecoder(r.Body).Decode(&nested) + require.NoError(t, err) + + err = projection.LoadJSON(&receivedConfig, nested, projection.LoadOptions{ + Groups: []string{"default", "sensitive"}, + }) + require.NoError(t, err) + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client, err := NewDBLabClient(&DBLabConfig{APIEndpoint: server.URL, Token: "test-token"}) + require.NoError(t, err) + + err = client.UpdateSourceConfig(context.Background(), SourceConfigUpdate{ + Host: "host.rds.amazonaws.com", Port: 5432, DBName: "postgres", + Username: "dbuser", Password: "dbpass", + DumpParallelJobs: 0, RestoreParallelJobs: 0, + }) + require.NoError(t, err) + + assert.Nil(t, receivedConfig.DumpParallelJobs) + assert.Nil(t, receivedConfig.RestoreParallelJobs) + }) + t.Run("error on non-2xx status", func(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) diff --git a/engine/internal/rdsrefresh/parallelism.go b/engine/internal/rdsrefresh/parallelism.go new file mode 100644 index 00000000..4f323e46 --- /dev/null +++ b/engine/internal/rdsrefresh/parallelism.go @@ -0,0 +1,139 @@ +/* +2026 © PostgresAI +*/ + +package rdsrefresh + +import ( + "context" + "fmt" + "runtime" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" + + "gitlab.com/postgres-ai/database-lab/v3/pkg/log" +) + +const ( + // rdsInstanceClassPrefix is stripped to derive the EC2 instance type. + rdsInstanceClassPrefix = "db." + + // minParallelJobs is the minimum parallelism level. + minParallelJobs = 1 +) + +// EC2API defines the interface for EC2 client operations used for vCPU lookup. +type EC2API interface { + DescribeInstanceTypes(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) +} + +// ParallelismConfig holds the computed parallelism levels for dump and restore. +type ParallelismConfig struct { + DumpJobs int + RestoreJobs int +} + +// ResolveParallelism determines the optimal parallelism levels for pg_dump and pg_restore. +// dump parallelism is based on the vCPU count of the RDS clone instance class. +// restore parallelism is based on the vCPU count of the local machine. +func ResolveParallelism(ctx context.Context, cfg *Config) (*ParallelismConfig, error) { + dumpJobs, err := resolveRDSInstanceVCPUs(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("failed to resolve RDS instance vCPUs: %w", err) + } + + restoreJobs := resolveLocalVCPUs() + + log.Msg("auto-parallelism: dump jobs =", dumpJobs, "(RDS clone vCPUs), restore jobs =", restoreJobs, "(local vCPUs)") + + return &ParallelismConfig{ + DumpJobs: dumpJobs, + RestoreJobs: restoreJobs, + }, nil +} + +// resolveRDSInstanceVCPUs looks up the vCPU count for the configured RDS instance class +// by querying the EC2 DescribeInstanceTypes API. +func resolveRDSInstanceVCPUs(ctx context.Context, cfg *Config) (int, error) { + ec2Client, err := newEC2Client(ctx, cfg) + if err != nil { + return 0, fmt.Errorf("failed to create EC2 client: %w", err) + } + + return lookupInstanceVCPUs(ctx, ec2Client, cfg.RDSClone.InstanceClass) +} + +func newEC2Client(ctx context.Context, cfg *Config) (EC2API, error) { + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.AWS.Region)) + if err != nil { + return nil, fmt.Errorf("failed to load AWS config: %w", err) + } + + var opts []func(*ec2.Options) + if cfg.AWS.Endpoint != "" { + opts = append(opts, func(o *ec2.Options) { + o.BaseEndpoint = aws.String(cfg.AWS.Endpoint) + }) + } + + return ec2.NewFromConfig(awsCfg, opts...), nil +} + +// lookupInstanceVCPUs queries EC2 for the vCPU count of the given RDS instance class. +func lookupInstanceVCPUs(ctx context.Context, client EC2API, rdsInstanceClass string) (int, error) { + ec2InstanceType, err := rdsClassToEC2Type(rdsInstanceClass) + if err != nil { + return 0, err + } + + result, err := client.DescribeInstanceTypes(ctx, &ec2.DescribeInstanceTypesInput{ + InstanceTypes: []ec2types.InstanceType{ec2types.InstanceType(ec2InstanceType)}, + }) + if err != nil { + return 0, fmt.Errorf("failed to describe EC2 instance type %q: %w", ec2InstanceType, err) + } + + if len(result.InstanceTypes) == 0 { + return 0, fmt.Errorf("EC2 instance type %q not found", ec2InstanceType) + } + + info := result.InstanceTypes[0] + if info.VCpuInfo == nil || info.VCpuInfo.DefaultVCpus == nil { + return 0, fmt.Errorf("vCPU info not available for instance type %q", ec2InstanceType) + } + + vcpus := int(*info.VCpuInfo.DefaultVCpus) + if vcpus < minParallelJobs { + return minParallelJobs, nil + } + + return vcpus, nil +} + +// rdsClassToEC2Type converts an RDS instance class (e.g. "db.m5.xlarge") to an EC2 instance type ("m5.xlarge"). +func rdsClassToEC2Type(rdsClass string) (string, error) { + if !strings.HasPrefix(rdsClass, rdsInstanceClassPrefix) { + return "", fmt.Errorf("invalid RDS instance class %q: expected %q prefix", rdsClass, rdsInstanceClassPrefix) + } + + ec2Type := strings.TrimPrefix(rdsClass, rdsInstanceClassPrefix) + if ec2Type == "" { + return "", fmt.Errorf("invalid RDS instance class %q: empty after removing prefix", rdsClass) + } + + return ec2Type, nil +} + +// resolveLocalVCPUs returns the number of logical CPUs available on the local machine. +func resolveLocalVCPUs() int { + cpus := runtime.NumCPU() + if cpus < minParallelJobs { + return minParallelJobs + } + + return cpus +} diff --git a/engine/internal/rdsrefresh/parallelism_test.go b/engine/internal/rdsrefresh/parallelism_test.go new file mode 100644 index 00000000..da68c608 --- /dev/null +++ b/engine/internal/rdsrefresh/parallelism_test.go @@ -0,0 +1,155 @@ +/* +2026 © PostgresAI +*/ + +package rdsrefresh + +import ( + "context" + "runtime" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockEC2API struct { + describeInstanceTypesFunc func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) +} + +func (m *mockEC2API) DescribeInstanceTypes(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { + if m.describeInstanceTypesFunc != nil { + return m.describeInstanceTypesFunc(ctx, params, optFns...) + } + + return &ec2.DescribeInstanceTypesOutput{}, nil +} + +func TestRdsClassToEC2Type(t *testing.T) { + testCases := []struct { + rdsClass string + expectedType string + expectErr bool + }{ + {rdsClass: "db.m5.xlarge", expectedType: "m5.xlarge"}, + {rdsClass: "db.t3.medium", expectedType: "t3.medium"}, + {rdsClass: "db.r6g.2xlarge", expectedType: "r6g.2xlarge"}, + {rdsClass: "db.serverless", expectedType: "serverless"}, + {rdsClass: "m5.xlarge", expectErr: true}, + {rdsClass: "db.", expectErr: true}, + {rdsClass: "", expectErr: true}, + } + + for _, tc := range testCases { + t.Run(tc.rdsClass, func(t *testing.T) { + result, err := rdsClassToEC2Type(tc.rdsClass) + + if tc.expectErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tc.expectedType, result) + }) + } +} + +func TestLookupInstanceVCPUs(t *testing.T) { + t.Run("returns vcpu count for valid instance type", func(t *testing.T) { + mock := &mockEC2API{ + describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { + assert.Equal(t, ec2types.InstanceType("m5.xlarge"), params.InstanceTypes[0]) + + return &ec2.DescribeInstanceTypesOutput{ + InstanceTypes: []ec2types.InstanceTypeInfo{ + {InstanceType: ec2types.InstanceType("m5.xlarge"), VCpuInfo: &ec2types.VCpuInfo{DefaultVCpus: aws.Int32(4)}}, + }, + }, nil + }, + } + + vcpus, err := lookupInstanceVCPUs(context.Background(), mock, "db.m5.xlarge") + + require.NoError(t, err) + assert.Equal(t, 4, vcpus) + }) + + t.Run("returns vcpu count for large instance", func(t *testing.T) { + mock := &mockEC2API{ + describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { + return &ec2.DescribeInstanceTypesOutput{ + InstanceTypes: []ec2types.InstanceTypeInfo{ + {InstanceType: ec2types.InstanceType("r6g.16xlarge"), VCpuInfo: &ec2types.VCpuInfo{DefaultVCpus: aws.Int32(64)}}, + }, + }, nil + }, + } + + vcpus, err := lookupInstanceVCPUs(context.Background(), mock, "db.r6g.16xlarge") + + require.NoError(t, err) + assert.Equal(t, 64, vcpus) + }) + + t.Run("returns error for instance type not found", func(t *testing.T) { + mock := &mockEC2API{ + describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { + return &ec2.DescribeInstanceTypesOutput{InstanceTypes: []ec2types.InstanceTypeInfo{}}, nil + }, + } + + _, err := lookupInstanceVCPUs(context.Background(), mock, "db.nonexistent.type") + + require.Error(t, err) + assert.Contains(t, err.Error(), "not found") + }) + + t.Run("returns error for missing vcpu info", func(t *testing.T) { + mock := &mockEC2API{ + describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { + return &ec2.DescribeInstanceTypesOutput{ + InstanceTypes: []ec2types.InstanceTypeInfo{ + {InstanceType: ec2types.InstanceType("m5.xlarge"), VCpuInfo: nil}, + }, + }, nil + }, + } + + _, err := lookupInstanceVCPUs(context.Background(), mock, "db.m5.xlarge") + + require.Error(t, err) + assert.Contains(t, err.Error(), "vCPU info not available") + }) + + t.Run("returns error for invalid rds class", func(t *testing.T) { + mock := &mockEC2API{} + _, err := lookupInstanceVCPUs(context.Background(), mock, "invalid-class") + + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid RDS instance class") + }) + + t.Run("returns error on api failure", func(t *testing.T) { + mock := &mockEC2API{ + describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { + return nil, assert.AnError + }, + } + + _, err := lookupInstanceVCPUs(context.Background(), mock, "db.m5.xlarge") + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to describe EC2 instance type") + }) +} + +func TestResolveLocalVCPUs(t *testing.T) { + vcpus := resolveLocalVCPUs() + + assert.Equal(t, runtime.NumCPU(), vcpus) + assert.GreaterOrEqual(t, vcpus, minParallelJobs) +} diff --git a/engine/internal/rdsrefresh/refresher.go b/engine/internal/rdsrefresh/refresher.go index bb65eeb2..db99f75a 100644 --- a/engine/internal/rdsrefresh/refresher.go +++ b/engine/internal/rdsrefresh/refresher.go @@ -59,14 +59,15 @@ func NewRefresherWithStateFile(ctx context.Context, cfg *Config, stateFile *Stat // Run executes the full refresh workflow: // 1. Verifies DBLab is healthy and not already refreshing -// 2. Gets source database info -// 3. Finds the latest RDS snapshot -// 4. Creates a temporary RDS clone from the RDS snapshot -// 5. Waits for the RDS clone to be available -// 6. Updates DBLab config with the RDS clone endpoint -// 7. Triggers DBLab full refresh -// 8. Waits for refresh to complete -// 9. Deletes the temporary RDS clone +// 2. Resolves parallelism levels (RDS clone vCPUs for dump, local vCPUs for restore) +// 3. Gets source database info +// 4. Finds the latest RDS snapshot +// 5. Creates a temporary RDS clone from the RDS snapshot +// 6. Waits for the RDS clone to be available +// 7. Updates DBLab config with the RDS clone endpoint and parallelism +// 8. Triggers DBLab full refresh +// 9. Waits for refresh to complete +// 10. Deletes the temporary RDS clone func (r *Refresher) Run(ctx context.Context) *RefreshResult { result := &RefreshResult{ StartTime: time.Now(), @@ -96,7 +97,17 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { return result } - // step 2: get source info + // step 2: resolve parallelism levels + log.Msg("resolving parallelism levels...") + + parallelism, err := ResolveParallelism(ctx, r.cfg) + if err != nil { + log.Warn("failed to auto-detect parallelism, using defaults:", err) + + parallelism = &ParallelismConfig{DumpJobs: 0, RestoreJobs: 0} + } + + // step 3: get source info log.Msg("checking source database...") sourceInfo, err := r.rds.GetSourceInfo(ctx) @@ -107,7 +118,7 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { log.Msg("source:", sourceInfo) - // step 3: find latest RDS snapshot + // step 4: find latest RDS snapshot log.Msg("finding latest RDS snapshot...") snapshotID, err := r.rds.FindLatestSnapshot(ctx) @@ -119,7 +130,7 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { result.SnapshotID = snapshotID log.Msg("using RDS snapshot:", snapshotID) - // step 4: create temporary RDS clone + // step 5: create temporary RDS clone log.Msg("creating RDS clone from RDS snapshot...") // write state file before clone creation for crash recovery @@ -166,7 +177,7 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { } }() - // step 5: wait for RDS clone to be available + // step 6: wait for RDS clone to be available log.Msg("waiting for RDS clone (10-30 min)...") if err := r.rds.WaitForCloneAvailable(ctx, clone); err != nil { @@ -177,16 +188,18 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { result.CloneEndpoint = clone.Endpoint log.Msg("RDS clone ready:", fmt.Sprintf("%s:%d", clone.Endpoint, clone.Port)) - // step 6: update DBLab config with RDS clone endpoint + // step 7: update DBLab config with RDS clone endpoint and parallelism log.Msg("updating DBLab config...") if err := r.dblab.UpdateSourceConfig(ctx, SourceConfigUpdate{ - Host: clone.Endpoint, - Port: int(clone.Port), - DBName: r.cfg.Source.DBName, - Username: r.cfg.Source.Username, - Password: r.cfg.Source.Password, - RDSIAMDBInstance: clone.Identifier, + Host: clone.Endpoint, + Port: int(clone.Port), + DBName: r.cfg.Source.DBName, + Username: r.cfg.Source.Username, + Password: r.cfg.Source.Password, + RDSIAMDBInstance: clone.Identifier, + DumpParallelJobs: parallelism.DumpJobs, + RestoreParallelJobs: parallelism.RestoreJobs, }); err != nil { result.Error = fmt.Errorf("failed to update DBLab config: %w", err) return result @@ -194,7 +207,7 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { log.Msg("DBLab config updated successfully") - // step 7: trigger DBLab full refresh + // step 8: trigger DBLab full refresh log.Msg("triggering DBLab full refresh...") if err := r.dblab.TriggerFullRefresh(ctx); err != nil { @@ -204,7 +217,7 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { log.Msg("full refresh triggered, waiting for completion...") - // step 8: wait for refresh to complete + // step 9: wait for refresh to complete pollInterval := r.cfg.DBLab.PollInterval.Duration() timeout := r.cfg.DBLab.Timeout.Duration() @@ -264,6 +277,14 @@ func (r *Refresher) DryRun(ctx context.Context) error { log.Msg("would use RDS snapshot:", snapshotID) log.Msg("would create RDS clone with instance class:", r.cfg.RDSClone.InstanceClass) + // check parallelism + parallelism, err := ResolveParallelism(ctx, r.cfg) + if err != nil { + log.Warn("could not auto-detect parallelism:", err) + } else { + log.Msg("auto-parallelism: dump jobs =", parallelism.DumpJobs, ", restore jobs =", parallelism.RestoreJobs) + } + log.Msg("=== DRY RUN COMPLETE - all checks passed ===") return nil From a3f930e6d887341037d159e5892d7a24c61d0dcf Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 10 Apr 2026 18:41:19 +0000 Subject: [PATCH 2/2] Replace EC2 API vCPU lookup with static instance size parsing Drop the aws-sdk-go-v2/service/ec2 dependency entirely. Instead of calling DescribeInstanceTypes (which required ec2:DescribeInstanceTypes IAM permission and added ~5s IMDS timeout in tests), parse vCPU count from the RDS instance class size suffix using a static map of standard AWS size-to-vCPU mappings. Unlisted NUMxlarge sizes are handled via multiplier parsing. https://claude.ai/code/session_01AhnBVCBWjk24T7BBQtmkbq --- engine/go.mod | 1 - engine/go.sum | 2 - engine/internal/rdsrefresh/parallelism.go | 131 ++++++------ .../internal/rdsrefresh/parallelism_test.go | 197 ++++++++---------- engine/internal/rdsrefresh/refresher.go | 4 +- 5 files changed, 163 insertions(+), 172 deletions(-) diff --git a/engine/go.mod b/engine/go.mod index a48ed4b1..3d38d85a 100644 --- a/engine/go.mod +++ b/engine/go.mod @@ -9,7 +9,6 @@ require ( github.com/aws/aws-sdk-go v1.55.8 github.com/aws/aws-sdk-go-v2 v1.41.5 github.com/aws/aws-sdk-go-v2/config v1.32.14 - github.com/aws/aws-sdk-go-v2/service/ec2 v1.297.0 github.com/aws/aws-sdk-go-v2/service/rds v1.117.1 github.com/containerd/errdefs v1.0.0 github.com/docker/cli v28.5.2+incompatible diff --git a/engine/go.sum b/engine/go.sum index e998c8ab..6f4272bf 100644 --- a/engine/go.sum +++ b/engine/go.sum @@ -28,8 +28,6 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgq github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY= -github.com/aws/aws-sdk-go-v2/service/ec2 v1.297.0 h1:A+7NViqbMUCoTQFWjbSXdbzE4K5Ziu2zWJtZzAusm+A= -github.com/aws/aws-sdk-go-v2/service/ec2 v1.297.0/go.mod h1:R+2BNtUfTfhPY0RH18oL02q116bakeBWjanrbnVBqkM= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 h1:c31//R3xgIJMSC8S6hEVq+38DcvUlgFY0FM6mSI5oto= diff --git a/engine/internal/rdsrefresh/parallelism.go b/engine/internal/rdsrefresh/parallelism.go index 4f323e46..b6ffe332 100644 --- a/engine/internal/rdsrefresh/parallelism.go +++ b/engine/internal/rdsrefresh/parallelism.go @@ -5,30 +5,45 @@ package rdsrefresh import ( - "context" "fmt" "runtime" + "strconv" "strings" - "github.com/aws/aws-sdk-go-v2/aws" - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/ec2" - ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" - "gitlab.com/postgres-ai/database-lab/v3/pkg/log" ) const ( - // rdsInstanceClassPrefix is stripped to derive the EC2 instance type. + // rdsInstanceClassPrefix is stripped to derive the instance size. rdsInstanceClassPrefix = "db." // minParallelJobs is the minimum parallelism level. minParallelJobs = 1 ) -// EC2API defines the interface for EC2 client operations used for vCPU lookup. -type EC2API interface { - DescribeInstanceTypes(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) +// instanceSizeVCPUs maps AWS instance size suffixes to their typical vCPU count. +// this mapping is consistent across most instance families (m5, m6g, r5, r6g, c5, etc.). +// graviton and intel/amd variants of the same size have the same vCPU count. +var instanceSizeVCPUs = map[string]int{ + "micro": 1, + "small": 1, + "medium": 2, + "large": 2, + "xlarge": 4, + "2xlarge": 8, + "3xlarge": 12, + "4xlarge": 16, + "6xlarge": 24, + "8xlarge": 32, + "9xlarge": 36, + "10xlarge": 40, + "12xlarge": 48, + "16xlarge": 64, + "18xlarge": 72, + "24xlarge": 96, + "32xlarge": 128, + "48xlarge": 192, + "metal": 96, } // ParallelismConfig holds the computed parallelism levels for dump and restore. @@ -40,8 +55,10 @@ type ParallelismConfig struct { // ResolveParallelism determines the optimal parallelism levels for pg_dump and pg_restore. // dump parallelism is based on the vCPU count of the RDS clone instance class. // restore parallelism is based on the vCPU count of the local machine. -func ResolveParallelism(ctx context.Context, cfg *Config) (*ParallelismConfig, error) { - dumpJobs, err := resolveRDSInstanceVCPUs(ctx, cfg) +// local vCPU detection uses runtime.NumCPU(), which works on Linux +// (the target platform for DBLab Engine). +func ResolveParallelism(cfg *Config) (*ParallelismConfig, error) { + dumpJobs, err := resolveRDSInstanceVCPUs(cfg.RDSClone.InstanceClass) if err != nil { return nil, fmt.Errorf("failed to resolve RDS instance vCPUs: %w", err) } @@ -56,79 +73,71 @@ func ResolveParallelism(ctx context.Context, cfg *Config) (*ParallelismConfig, e }, nil } -// resolveRDSInstanceVCPUs looks up the vCPU count for the configured RDS instance class -// by querying the EC2 DescribeInstanceTypes API. -func resolveRDSInstanceVCPUs(ctx context.Context, cfg *Config) (int, error) { - ec2Client, err := newEC2Client(ctx, cfg) +// resolveRDSInstanceVCPUs estimates the vCPU count for the given RDS instance class +// by parsing the instance size suffix (e.g. "xlarge" from "db.m5.xlarge"). +// the mapping covers standard AWS size naming used across RDS instance families. +// if the size is not recognized, it attempts to parse a numeric multiplier prefix +// (e.g. "2xlarge" → 8 vCPUs). +func resolveRDSInstanceVCPUs(instanceClass string) (int, error) { + size, err := extractInstanceSize(instanceClass) if err != nil { - return 0, fmt.Errorf("failed to create EC2 client: %w", err) + return 0, err } - return lookupInstanceVCPUs(ctx, ec2Client, cfg.RDSClone.InstanceClass) -} - -func newEC2Client(ctx context.Context, cfg *Config) (EC2API, error) { - awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.AWS.Region)) - if err != nil { - return nil, fmt.Errorf("failed to load AWS config: %w", err) + if vcpus, ok := instanceSizeVCPUs[size]; ok { + return vcpus, nil } - var opts []func(*ec2.Options) - if cfg.AWS.Endpoint != "" { - opts = append(opts, func(o *ec2.Options) { - o.BaseEndpoint = aws.String(cfg.AWS.Endpoint) - }) + // handle unlisted NUMxlarge sizes by parsing the multiplier + vcpus, err := parseXlargeMultiplier(size) + if err != nil { + return 0, fmt.Errorf("unknown instance size %q in class %q", size, instanceClass) } - return ec2.NewFromConfig(awsCfg, opts...), nil + return vcpus, nil } -// lookupInstanceVCPUs queries EC2 for the vCPU count of the given RDS instance class. -func lookupInstanceVCPUs(ctx context.Context, client EC2API, rdsInstanceClass string) (int, error) { - ec2InstanceType, err := rdsClassToEC2Type(rdsInstanceClass) - if err != nil { - return 0, err +// extractInstanceSize extracts the size component from an RDS instance class. +// for example, "db.m5.xlarge" → "xlarge", "db.r6g.2xlarge" → "2xlarge". +func extractInstanceSize(instanceClass string) (string, error) { + if !strings.HasPrefix(instanceClass, rdsInstanceClassPrefix) { + return "", fmt.Errorf("invalid RDS instance class %q: expected %q prefix", instanceClass, rdsInstanceClassPrefix) } - result, err := client.DescribeInstanceTypes(ctx, &ec2.DescribeInstanceTypesInput{ - InstanceTypes: []ec2types.InstanceType{ec2types.InstanceType(ec2InstanceType)}, - }) - if err != nil { - return 0, fmt.Errorf("failed to describe EC2 instance type %q: %w", ec2InstanceType, err) - } - - if len(result.InstanceTypes) == 0 { - return 0, fmt.Errorf("EC2 instance type %q not found", ec2InstanceType) - } + withoutPrefix := strings.TrimPrefix(instanceClass, rdsInstanceClassPrefix) - info := result.InstanceTypes[0] - if info.VCpuInfo == nil || info.VCpuInfo.DefaultVCpus == nil { - return 0, fmt.Errorf("vCPU info not available for instance type %q", ec2InstanceType) - } + // format is "family.size", e.g. "m5.xlarge" or "r6g.2xlarge" + parts := strings.SplitN(withoutPrefix, ".", 2) - vcpus := int(*info.VCpuInfo.DefaultVCpus) - if vcpus < minParallelJobs { - return minParallelJobs, nil + const expectedParts = 2 + if len(parts) != expectedParts || parts[1] == "" { + return "", fmt.Errorf("invalid RDS instance class %q: expected format db..", instanceClass) } - return vcpus, nil + return parts[1], nil } -// rdsClassToEC2Type converts an RDS instance class (e.g. "db.m5.xlarge") to an EC2 instance type ("m5.xlarge"). -func rdsClassToEC2Type(rdsClass string) (string, error) { - if !strings.HasPrefix(rdsClass, rdsInstanceClassPrefix) { - return "", fmt.Errorf("invalid RDS instance class %q: expected %q prefix", rdsClass, rdsInstanceClassPrefix) +// parseXlargeMultiplier handles NUMxlarge patterns not in the static map. +// for example, "5xlarge" → 5 * 4 = 20 vCPUs. +func parseXlargeMultiplier(size string) (int, error) { + idx := strings.Index(size, "xlarge") + if idx <= 0 { + return 0, fmt.Errorf("not an xlarge variant: %q", size) } - ec2Type := strings.TrimPrefix(rdsClass, rdsInstanceClassPrefix) - if ec2Type == "" { - return "", fmt.Errorf("invalid RDS instance class %q: empty after removing prefix", rdsClass) + multiplier, err := strconv.Atoi(size[:idx]) + if err != nil { + return 0, fmt.Errorf("invalid multiplier in %q: %w", size, err) } - return ec2Type, nil + const vcpusPerXlarge = 4 + + return multiplier * vcpusPerXlarge, nil } // resolveLocalVCPUs returns the number of logical CPUs available on the local machine. +// uses runtime.NumCPU() which reads from /proc/cpuinfo on Linux +// (the target platform for DBLab Engine). func resolveLocalVCPUs() int { cpus := runtime.NumCPU() if cpus < minParallelJobs { diff --git a/engine/internal/rdsrefresh/parallelism_test.go b/engine/internal/rdsrefresh/parallelism_test.go index da68c608..ffb73b65 100644 --- a/engine/internal/rdsrefresh/parallelism_test.go +++ b/engine/internal/rdsrefresh/parallelism_test.go @@ -5,47 +5,34 @@ package rdsrefresh import ( - "context" "runtime" "testing" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/ec2" - ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type mockEC2API struct { - describeInstanceTypesFunc func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) -} - -func (m *mockEC2API) DescribeInstanceTypes(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { - if m.describeInstanceTypesFunc != nil { - return m.describeInstanceTypesFunc(ctx, params, optFns...) - } - - return &ec2.DescribeInstanceTypesOutput{}, nil -} - -func TestRdsClassToEC2Type(t *testing.T) { +func TestExtractInstanceSize(t *testing.T) { testCases := []struct { - rdsClass string - expectedType string - expectErr bool + instanceClass string + expectedSize string + expectErr bool }{ - {rdsClass: "db.m5.xlarge", expectedType: "m5.xlarge"}, - {rdsClass: "db.t3.medium", expectedType: "t3.medium"}, - {rdsClass: "db.r6g.2xlarge", expectedType: "r6g.2xlarge"}, - {rdsClass: "db.serverless", expectedType: "serverless"}, - {rdsClass: "m5.xlarge", expectErr: true}, - {rdsClass: "db.", expectErr: true}, - {rdsClass: "", expectErr: true}, + {instanceClass: "db.m5.xlarge", expectedSize: "xlarge"}, + {instanceClass: "db.t3.medium", expectedSize: "medium"}, + {instanceClass: "db.r6g.2xlarge", expectedSize: "2xlarge"}, + {instanceClass: "db.m5.metal", expectedSize: "metal"}, + {instanceClass: "db.t3.micro", expectedSize: "micro"}, + {instanceClass: "db.r6g.16xlarge", expectedSize: "16xlarge"}, + {instanceClass: "m5.xlarge", expectErr: true}, + {instanceClass: "db.m5", expectErr: true}, + {instanceClass: "db.", expectErr: true}, + {instanceClass: "", expectErr: true}, } for _, tc := range testCases { - t.Run(tc.rdsClass, func(t *testing.T) { - result, err := rdsClassToEC2Type(tc.rdsClass) + t.Run(tc.instanceClass, func(t *testing.T) { + size, err := extractInstanceSize(tc.instanceClass) if tc.expectErr { require.Error(t, err) @@ -53,103 +40,101 @@ func TestRdsClassToEC2Type(t *testing.T) { } require.NoError(t, err) - assert.Equal(t, tc.expectedType, result) + assert.Equal(t, tc.expectedSize, size) }) } } -func TestLookupInstanceVCPUs(t *testing.T) { - t.Run("returns vcpu count for valid instance type", func(t *testing.T) { - mock := &mockEC2API{ - describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { - assert.Equal(t, ec2types.InstanceType("m5.xlarge"), params.InstanceTypes[0]) - - return &ec2.DescribeInstanceTypesOutput{ - InstanceTypes: []ec2types.InstanceTypeInfo{ - {InstanceType: ec2types.InstanceType("m5.xlarge"), VCpuInfo: &ec2types.VCpuInfo{DefaultVCpus: aws.Int32(4)}}, - }, - }, nil - }, - } - - vcpus, err := lookupInstanceVCPUs(context.Background(), mock, "db.m5.xlarge") +func TestResolveRDSInstanceVCPUs(t *testing.T) { + testCases := []struct { + instanceClass string + expectedVCPUs int + expectErr bool + }{ + {instanceClass: "db.t3.micro", expectedVCPUs: 1}, + {instanceClass: "db.t3.small", expectedVCPUs: 1}, + {instanceClass: "db.t3.medium", expectedVCPUs: 2}, + {instanceClass: "db.m5.large", expectedVCPUs: 2}, + {instanceClass: "db.m5.xlarge", expectedVCPUs: 4}, + {instanceClass: "db.r6g.2xlarge", expectedVCPUs: 8}, + {instanceClass: "db.r6g.4xlarge", expectedVCPUs: 16}, + {instanceClass: "db.r6g.8xlarge", expectedVCPUs: 32}, + {instanceClass: "db.r6g.16xlarge", expectedVCPUs: 64}, + {instanceClass: "db.m5.24xlarge", expectedVCPUs: 96}, + {instanceClass: "db.m5.metal", expectedVCPUs: 96}, + {instanceClass: "db.m5.5xlarge", expectedVCPUs: 20}, + {instanceClass: "invalid", expectErr: true}, + {instanceClass: "db.m5", expectErr: true}, + {instanceClass: "db.m5.unknown", expectErr: true}, + } - require.NoError(t, err) - assert.Equal(t, 4, vcpus) - }) + for _, tc := range testCases { + t.Run(tc.instanceClass, func(t *testing.T) { + vcpus, err := resolveRDSInstanceVCPUs(tc.instanceClass) - t.Run("returns vcpu count for large instance", func(t *testing.T) { - mock := &mockEC2API{ - describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { - return &ec2.DescribeInstanceTypesOutput{ - InstanceTypes: []ec2types.InstanceTypeInfo{ - {InstanceType: ec2types.InstanceType("r6g.16xlarge"), VCpuInfo: &ec2types.VCpuInfo{DefaultVCpus: aws.Int32(64)}}, - }, - }, nil - }, - } + if tc.expectErr { + require.Error(t, err) + return + } - vcpus, err := lookupInstanceVCPUs(context.Background(), mock, "db.r6g.16xlarge") + require.NoError(t, err) + assert.Equal(t, tc.expectedVCPUs, vcpus) + }) + } +} - require.NoError(t, err) - assert.Equal(t, 64, vcpus) - }) +func TestParseXlargeMultiplier(t *testing.T) { + testCases := []struct { + size string + expectedVCPUs int + expectErr bool + }{ + {size: "2xlarge", expectedVCPUs: 8}, + {size: "4xlarge", expectedVCPUs: 16}, + {size: "5xlarge", expectedVCPUs: 20}, + {size: "xlarge", expectErr: true}, + {size: "large", expectErr: true}, + {size: "abcxlarge", expectErr: true}, + } - t.Run("returns error for instance type not found", func(t *testing.T) { - mock := &mockEC2API{ - describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { - return &ec2.DescribeInstanceTypesOutput{InstanceTypes: []ec2types.InstanceTypeInfo{}}, nil - }, - } + for _, tc := range testCases { + t.Run(tc.size, func(t *testing.T) { + vcpus, err := parseXlargeMultiplier(tc.size) - _, err := lookupInstanceVCPUs(context.Background(), mock, "db.nonexistent.type") + if tc.expectErr { + require.Error(t, err) + return + } - require.Error(t, err) - assert.Contains(t, err.Error(), "not found") - }) + require.NoError(t, err) + assert.Equal(t, tc.expectedVCPUs, vcpus) + }) + } +} - t.Run("returns error for missing vcpu info", func(t *testing.T) { - mock := &mockEC2API{ - describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { - return &ec2.DescribeInstanceTypesOutput{ - InstanceTypes: []ec2types.InstanceTypeInfo{ - {InstanceType: ec2types.InstanceType("m5.xlarge"), VCpuInfo: nil}, - }, - }, nil - }, - } +func TestResolveLocalVCPUs(t *testing.T) { + vcpus := resolveLocalVCPUs() - _, err := lookupInstanceVCPUs(context.Background(), mock, "db.m5.xlarge") + assert.Equal(t, runtime.NumCPU(), vcpus) + assert.GreaterOrEqual(t, vcpus, minParallelJobs) +} - require.Error(t, err) - assert.Contains(t, err.Error(), "vCPU info not available") - }) +func TestResolveParallelism(t *testing.T) { + t.Run("resolves both dump and restore jobs", func(t *testing.T) { + cfg := &Config{RDSClone: RDSCloneConfig{InstanceClass: "db.m5.xlarge"}} - t.Run("returns error for invalid rds class", func(t *testing.T) { - mock := &mockEC2API{} - _, err := lookupInstanceVCPUs(context.Background(), mock, "invalid-class") + result, err := ResolveParallelism(cfg) - require.Error(t, err) - assert.Contains(t, err.Error(), "invalid RDS instance class") + require.NoError(t, err) + assert.Equal(t, 4, result.DumpJobs) + assert.Equal(t, runtime.NumCPU(), result.RestoreJobs) }) - t.Run("returns error on api failure", func(t *testing.T) { - mock := &mockEC2API{ - describeInstanceTypesFunc: func(ctx context.Context, params *ec2.DescribeInstanceTypesInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) { - return nil, assert.AnError - }, - } + t.Run("returns error for invalid instance class", func(t *testing.T) { + cfg := &Config{RDSClone: RDSCloneConfig{InstanceClass: "invalid"}} - _, err := lookupInstanceVCPUs(context.Background(), mock, "db.m5.xlarge") + _, err := ResolveParallelism(cfg) require.Error(t, err) - assert.Contains(t, err.Error(), "failed to describe EC2 instance type") }) } - -func TestResolveLocalVCPUs(t *testing.T) { - vcpus := resolveLocalVCPUs() - - assert.Equal(t, runtime.NumCPU(), vcpus) - assert.GreaterOrEqual(t, vcpus, minParallelJobs) -} diff --git a/engine/internal/rdsrefresh/refresher.go b/engine/internal/rdsrefresh/refresher.go index db99f75a..85b8abe4 100644 --- a/engine/internal/rdsrefresh/refresher.go +++ b/engine/internal/rdsrefresh/refresher.go @@ -100,7 +100,7 @@ func (r *Refresher) Run(ctx context.Context) *RefreshResult { // step 2: resolve parallelism levels log.Msg("resolving parallelism levels...") - parallelism, err := ResolveParallelism(ctx, r.cfg) + parallelism, err := ResolveParallelism(r.cfg) if err != nil { log.Warn("failed to auto-detect parallelism, using defaults:", err) @@ -278,7 +278,7 @@ func (r *Refresher) DryRun(ctx context.Context) error { log.Msg("would create RDS clone with instance class:", r.cfg.RDSClone.InstanceClass) // check parallelism - parallelism, err := ResolveParallelism(ctx, r.cfg) + parallelism, err := ResolveParallelism(r.cfg) if err != nil { log.Warn("could not auto-detect parallelism:", err) } else {