Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/api-key/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
apiKeyNotValidForClusterSuggestions = "Specify the cluster this API key belongs to using the `--resource` flag. Alternatively, first execute the `confluent kafka cluster use` command to set the context to the proper cluster for this key and retry the `confluent api-key store` command."
apiKeyUseFailedErrorMsg = "unable to set active API key"
apiKeyUseFailedSuggestions = "If you did not create this API key with the CLI or created it on another computer, you must first store the API key and secret locally with `confluent api-key store %s <secret>`."
nonKafkaNotImplementedErrorMsg = "functionality not yet available for non-Kafka cluster resources"
nonKafkaNotImplementedErrorMsg = "functionality not yet available for resources other than Kafka clusters and Global API keys"
refuseToOverrideSecretSuggestions = "If you would like to override the existing secret stored for API key \"%s\", use the `--force` flag."
unableToStoreApiKeyErrorMsg = "unable to store API key locally: %w"
)
Expand Down
29 changes: 23 additions & 6 deletions internal/api-key/command_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
return cmd
}

func (c *command) create(cmd *cobra.Command, _ []string) error {

Check failure on line 83 in internal/api-key/command_create.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 38 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3377&issues=a2432fb9-439b-454d-9b9d-66ae74e8337b&open=a2432fb9-439b-454d-9b9d-66ae74e8337b
c.setKeyStoreIfNil()

description, err := cmd.Flags().GetString("description")
Expand Down Expand Up @@ -171,24 +171,41 @@
return err
}

if resourceType == resource.KafkaCluster {
switch resourceType {
case resource.KafkaCluster:
if err := c.keystore.StoreAPIKey(c.V2Client, userKey, resourceId); err != nil {
return fmt.Errorf(unableToStoreApiKeyErrorMsg, err)
}
case resource.Global:
// Global keys' secrets are irretrievable after creation, so always store them locally — same
// rationale as Kafka cluster keys. The user can still copy the printed secret if they want.
if err := c.keystore.StoreGlobalAPIKey(userKey); err != nil {
return fmt.Errorf(unableToStoreApiKeyErrorMsg, err)
}
}

use, err := cmd.Flags().GetBool("use")
if err != nil {
return err
}
if use {
if resourceType != resource.KafkaCluster {
switch resourceType {
case resource.KafkaCluster:
if err := c.useAPIKey(userKey.Key, resourceId); err != nil {
return errors.NewWrapErrorWithSuggestions(err, apiKeyUseFailedErrorMsg, fmt.Sprintf(apiKeyUseFailedSuggestions, userKey.Key))
}
output.Printf(c.Config.EnableColor, useAPIKeyMsg, userKey.Key)
case resource.Global:
if err := c.Context.SetActiveGlobalAPIKey(userKey.Key); err != nil {
return errors.NewWrapErrorWithSuggestions(err, apiKeyUseFailedErrorMsg, fmt.Sprintf(apiKeyUseFailedSuggestions, userKey.Key))
}
if err := c.Config.Save(); err != nil {
return err
}
output.Printf(c.Config.EnableColor, useGlobalAPIKeyMsg, userKey.Key)
default:
return fmt.Errorf("`--use` set but ineffective: %s", nonKafkaNotImplementedErrorMsg)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update the nonKafkaNotImplementedErrorMsg now that we can store Kafka & global keys.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review Steven! I've updated the message to be: functionality not yet available for resources other than Kafka clusters and Global API keys

}
if err := c.useAPIKey(userKey.Key, resourceId); err != nil {
return errors.NewWrapErrorWithSuggestions(err, apiKeyUseFailedErrorMsg, fmt.Sprintf(apiKeyUseFailedSuggestions, userKey.Key))
}
output.Printf(c.Config.EnableColor, useAPIKeyMsg, userKey.Key)
}

return nil
Expand Down
70 changes: 53 additions & 17 deletions internal/api-key/command_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,51 @@
return cmd
}

func (c *command) store(cmd *cobra.Command, args []string) error {

Check failure on line 50 in internal/api-key/command_store.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3377&issues=06c0073a-3aa1-4218-812a-9c584034335c&open=06c0073a-3aa1-4218-812a-9c584034335c
c.setKeyStoreIfNil()

key := args[0]
secret := args[1]

force, err := cmd.Flags().GetBool("force")
if err != nil {
return err
}

// Check if API key exists server-side
apiKey, httpResp, err := c.V2Client.GetApiKey(key)
if err != nil {
return errors.CatchApiKeyForbiddenAccessError(err, getOperation, httpResp)
}

resourceType, clusterId, _, resolveErr := c.resolveResourceId(cmd, c.V2Client)
isGlobalKey := apiKey.GetSpec().Resource.GetKind() == "Global"

// Detect Global API keys by either the explicit --resource global flag or the server-side resource Kind.
// Global keys are org-scoped and stored separately from cluster-scoped keys.
if resourceType == resource.Global || isGlobalKey {
if resourceType == resource.Global && !isGlobalKey {
return errors.NewErrorWithSuggestions(
fmt.Sprintf("API key %q is not a Global API key", key),
"Omit `--resource global`, or pass the correct resource ID.",
)
}
if isGlobalKey && resourceType != "" && resourceType != resource.Global {
return errors.NewErrorWithSuggestions(
fmt.Sprintf("API key %q is a Global API key, but --resource was set to %q", key, resourceType),
"Re-run with `--resource global`, or omit `--resource` to auto-detect.",
)
}
return c.storeGlobal(key, secret, force)
}

var cluster *config.KafkaClusterConfig

// Attempt to get cluster from --resource flag if set; if that doesn't work,
// attempt to fall back to the currently active Kafka cluster
resourceType, clusterId, _, err := c.resolveResourceId(cmd, c.V2Client)
if err == nil && clusterId != "" {
// attempt to fall back to the currently active Kafka cluster.
// Preserve historical behavior: if --resource resolution failed, silently fall through to the
// active cluster (the cluster/key-mismatch check below will surface real problems).
if resolveErr == nil && clusterId != "" {
if resourceType != resource.KafkaCluster {
return errors.New(nonKafkaNotImplementedErrorMsg)
}
Expand All @@ -74,20 +110,6 @@
}
}

key := args[0]
secret := args[1]

force, err := cmd.Flags().GetBool("force")
if err != nil {
return err
}

// Check if API key exists server-side
apiKey, httpResp, err := c.V2Client.GetApiKey(key)
if err != nil {
return errors.CatchApiKeyForbiddenAccessError(err, getOperation, httpResp)
}

apiKeyIsValidForTargetCluster := cluster.GetId() != "" && cluster.GetId() == apiKey.GetSpec().Resource.GetId()

if !apiKeyIsValidForTargetCluster {
Expand All @@ -114,3 +136,17 @@
output.ErrPrintf(c.Config.EnableColor, "Stored secret for API key \"%s\".\n", key)
return nil
}

func (c *command) storeGlobal(key, secret string, force bool) error {
if c.keystore.HasGlobalAPIKey(key) && !force {
return errors.NewErrorWithSuggestions(
fmt.Sprintf(`refusing to overwrite existing secret for API Key "%s"`, key),
fmt.Sprintf(refuseToOverrideSecretSuggestions, key),
)
}
if err := c.keystore.StoreGlobalAPIKey(&config.APIKeyPair{Key: key, Secret: secret}); err != nil {
return fmt.Errorf(unableToStoreApiKeyErrorMsg, err)
}
output.ErrPrintf(c.Config.EnableColor, "Stored secret for Global API key \"%s\".\n", key)
return nil
}
32 changes: 25 additions & 7 deletions internal/api-key/command_use.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"github.com/confluentinc/cli/v4/pkg/resource"
)

const useAPIKeyMsg = "Using API Key \"%s\".\n"
const (
useAPIKeyMsg = "Using API Key \"%s\".\n"
useGlobalAPIKeyMsg = "Using Global API Key \"%s\".\n"
)

func (c *command) newUseCommand() *cobra.Command {
cmd := &cobra.Command{
Expand All @@ -34,6 +37,21 @@ func (c *command) newUseCommand() *cobra.Command {
func (c *command) use(cmd *cobra.Command, args []string) error {
c.setKeyStoreIfNil()

apiKey := args[0]

// Global keys are stored on the Context, not on a specific Kafka cluster. Check there first so
// `confluent api-key use <global-key>` without --resource works seamlessly.
if !cmd.Flags().Changed("resource") && c.Context.HasGlobalAPIKey(apiKey) {
if err := c.Context.SetActiveGlobalAPIKey(apiKey); err != nil {
return errors.NewWrapErrorWithSuggestions(err, apiKeyUseFailedErrorMsg, fmt.Sprintf(apiKeyUseFailedSuggestions, apiKey))
}
if err := c.Config.Save(); err != nil {
return err
}
output.Printf(c.Config.EnableColor, useGlobalAPIKeyMsg, apiKey)
return nil
}

var clusterId string

if cmd.Flags().Changed("resource") {
Expand All @@ -46,20 +64,20 @@ func (c *command) use(cmd *cobra.Command, args []string) error {
}
clusterId = resourceId
} else {
clusterId = c.Context.KafkaClusterContext.FindApiKeyClusterId(args[0])
clusterId = c.Context.KafkaClusterContext.FindApiKeyClusterId(apiKey)
if clusterId == "" {
return errors.NewErrorWithSuggestions(
fmt.Sprintf(`API key "%s" and associated Kafka cluster are not stored in local CLI state`, args[0]),
fmt.Sprintf(apiKeyUseFailedSuggestions, args[0]),
fmt.Sprintf(`API key "%s" and associated Kafka cluster are not stored in local CLI state`, apiKey),
fmt.Sprintf(apiKeyUseFailedSuggestions, apiKey),
)
}
}

if err := c.useAPIKey(args[0], clusterId); err != nil {
return errors.NewWrapErrorWithSuggestions(err, apiKeyUseFailedErrorMsg, fmt.Sprintf(apiKeyUseFailedSuggestions, args[0]))
if err := c.useAPIKey(apiKey, clusterId); err != nil {
return errors.NewWrapErrorWithSuggestions(err, apiKeyUseFailedErrorMsg, fmt.Sprintf(apiKeyUseFailedSuggestions, apiKey))
}

output.Printf(c.Config.EnableColor, useAPIKeyMsg, args[0])
output.Printf(c.Config.EnableColor, useAPIKeyMsg, apiKey)
return nil
}

Expand Down
12 changes: 10 additions & 2 deletions internal/asyncapi/command_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@
return bindings, nil
}

func (c *command) getClusterDetails(details *accountDetails, flags *flags, cmd *cobra.Command) error {

Check failure on line 465 in internal/asyncapi/command_export.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 28 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3377&issues=ef4460c6-8610-4d90-8a09-e0c2df2578ba&open=ef4460c6-8610-4d90-8a09-e0c2df2578ba
cluster, err := pkafka.GetClusterForCommand(c.V2Client, c.Context)
if err != nil {
return fmt.Errorf(`failed to find Kafka cluster: %w`, err)
Expand Down Expand Up @@ -491,12 +491,20 @@
}
clusterCreds = cluster.APIKeys[flags.kafkaApiKey]
} else {
clusterCreds = cluster.APIKeys[cluster.APIKey]
// No explicit --kafka-api-key: resolve the cluster-scoped active key, falling back to the
// active Global API key (see Context.ResolveKafkaAPIKey).
apiKey, apiSecret, err := c.Context.ResolveKafkaAPIKey(cluster)
if err != nil {
return err
}
if apiKey != "" {
clusterCreds = &config.APIKeyPair{Key: apiKey, Secret: apiSecret}
}
}
if clusterCreds == nil {
return errors.NewErrorWithSuggestions(
"API key not set for the Kafka cluster",
"Set an API key pair for the Kafka cluster using `confluent api-key create --resource <cluster-id>` and then use it with `--kafka-api-key`.",
"Set an API key pair for the Kafka cluster using `confluent api-key create --resource <cluster-id>` and then use it with `--kafka-api-key`, or set an active Global API key with `confluent api-key use`.",
)
}

Expand Down
13 changes: 9 additions & 4 deletions internal/kafka/command_clientconfig_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (c *clientConfigCommand) setKafkaCluster(cmd *cobra.Command, configFile str
return "", err
}

if err := addApiKeyToCluster(cmd, kafkaCluster); err != nil {
if err := addApiKeyToCluster(cmd, c.Context, kafkaCluster); err != nil {
return "", err
}

Expand All @@ -200,11 +200,16 @@ func (c *clientConfigCommand) setKafkaCluster(cmd *cobra.Command, configFile str
}
}

apiKey, apiSecret, err := c.Context.ResolveKafkaAPIKey(kafkaCluster)
if err != nil {
return "", err
}

// replace BROKER_ENDPOINT, CLUSTER_API_KEY, and CLUSTER_API_SECRET templates
configFile = replaceTemplates(configFile, map[string]string{
brokerEndpointTemplate: kafkaCluster.Bootstrap,
clusterApiKeyTemplate: kafkaCluster.APIKey,
clusterApiSecretTemplate: kafkaCluster.GetApiSecret(),
clusterApiKeyTemplate: apiKey,
clusterApiSecretTemplate: apiSecret,
})
return configFile, nil
}
Expand Down Expand Up @@ -283,7 +288,7 @@ func (c *clientConfigCommand) getSchemaRegistryCluster() (*srcmv3.SrcmV3Cluster,
}

func (c *clientConfigCommand) validateKafkaCredentials(kafkaCluster *config.KafkaClusterConfig) error {
configMap, err := getCommonConfig(kafkaCluster, c.clientId)
configMap, err := getCommonConfig(c.Context, kafkaCluster, c.clientId)
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion internal/kafka/command_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (c *command) prepareAnonymousContext(cmd *cobra.Command) error {
return nil
}

func addApiKeyToCluster(cmd *cobra.Command, cluster *config.KafkaClusterConfig) error {
func addApiKeyToCluster(cmd *cobra.Command, ctx *config.Context, cluster *config.KafkaClusterConfig) error {
apiKey, err := cmd.Flags().GetString("api-key")
if err != nil {
return err
Expand All @@ -192,6 +192,12 @@ func addApiKeyToCluster(cmd *cobra.Command, cluster *config.KafkaClusterConfig)
}
}

// If the cluster has no scoped API key, accept an active Global API key as a fallback. The Kafka
// REST/admin path will resolve credentials via Context.ResolveKafkaAPIKey() downstream.
if cluster.APIKey == "" && ctx.GetActiveGlobalAPIKey() != "" {
return nil
}

if cluster.APIKey == "" {
return &errors.UnspecifiedAPIKeyError{ClusterID: cluster.ID}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/kafka/command_topic_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
return c.consumeOnPrem(cmd, args)
}

func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {

Check failure on line 114 in internal/kafka/command_topic_consume.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 47 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3377&issues=43ceda39-f71d-4459-bf99-0d7ab297f249&open=43ceda39-f71d-4459-bf99-0d7ab297f249
topic := args[0]

cluster, err := kafka.GetClusterForCommand(c.V2Client, c.Context)
Expand All @@ -127,7 +127,7 @@
cluster.Bootstrap = bootstrap
}

if err := addApiKeyToCluster(cmd, cluster); err != nil {
if err := addApiKeyToCluster(cmd, c.Context, cluster); err != nil {
return err
}

Expand Down Expand Up @@ -193,7 +193,7 @@
}
}

consumer, err := newConsumer(group, cluster, c.clientID, configFile, config)
consumer, err := newConsumer(group, c.Context, cluster, c.clientID, configFile, config)
if err != nil {
return fmt.Errorf(errors.FailedToCreateConsumerErrorMsg, err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/kafka/command_topic_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
return c.produceOnPrem(cmd, args)
}

func (c *command) produceCloud(cmd *cobra.Command, args []string) error {

Check failure on line 130 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3377&issues=f47de021-6378-4bb9-a97f-0133130db53b&open=f47de021-6378-4bb9-a97f-0133130db53b
topic := args[0]

cluster, err := kafka.GetClusterForCommand(c.V2Client, c.Context)
Expand All @@ -143,7 +143,7 @@
cluster.Bootstrap = bootstrap
}

if err := addApiKeyToCluster(cmd, cluster); err != nil {
if err := addApiKeyToCluster(cmd, c.Context, cluster); err != nil {
return err
}

Expand Down Expand Up @@ -184,7 +184,7 @@
return err
}

producer, err := newProducer(cluster, c.clientID, configFile, config)
producer, err := newProducer(c.Context, cluster, c.clientID, configFile, config)
if err != nil {
return fmt.Errorf(errors.FailedToCreateProducerErrorMsg, err)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/kafka/confluent_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,17 @@ func (c *command) retrieveUnsecuredToken(e ckgo.OAuthBearerTokenRefresh) (ckgo.O
return oauthBearerToken, nil
}

func newProducer(kafka *config.KafkaClusterConfig, clientID, configPath string, configStrings []string) (*ckgo.Producer, error) {
configMap, err := getProducerConfigMap(kafka, clientID)
func newProducer(ctx *config.Context, kafka *config.KafkaClusterConfig, clientID, configPath string, configStrings []string) (*ckgo.Producer, error) {
configMap, err := getProducerConfigMap(ctx, kafka, clientID)
if err != nil {
return nil, fmt.Errorf(errors.FailedToGetConfigurationErrorMsg, err)
}

return newProducerWithOverwrittenConfigs(configMap, configPath, configStrings)
}

func newConsumer(group string, kafka *config.KafkaClusterConfig, clientID, configPath string, configStrings []string) (*ckgo.Consumer, error) {
configMap, err := getConsumerConfigMap(group, kafka, clientID)
func newConsumer(group string, ctx *config.Context, kafka *config.KafkaClusterConfig, clientID, configPath string, configStrings []string) (*ckgo.Consumer, error) {
configMap, err := getConsumerConfigMap(group, ctx, kafka, clientID)
if err != nil {
return nil, fmt.Errorf(errors.FailedToGetConfigurationErrorMsg, err)
}
Expand Down
19 changes: 12 additions & 7 deletions internal/kafka/confluent_kafka_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,31 @@ type PartitionFilter struct {
Index int32
}

func getCommonConfig(kafka *config.KafkaClusterConfig, clientId string) (*ckgo.ConfigMap, error) {
func getCommonConfig(ctx *config.Context, kafka *config.KafkaClusterConfig, clientId string) (*ckgo.ConfigMap, error) {
if err := kafka.DecryptAPIKeys(); err != nil {
return nil, err
}

apiKey, apiSecret, err := ctx.ResolveKafkaAPIKey(kafka)
if err != nil {
return nil, err
}

configMap := &ckgo.ConfigMap{
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"ssl.endpoint.identification.algorithm": "https",
"client.id": clientId,
"bootstrap.servers": kafka.Bootstrap,
"sasl.username": kafka.APIKey,
"sasl.password": kafka.GetApiSecret(),
"sasl.username": apiKey,
"sasl.password": apiSecret,
}

return configMap, nil
}

func getProducerConfigMap(kafka *config.KafkaClusterConfig, clientID string) (*ckgo.ConfigMap, error) {
configMap, err := getCommonConfig(kafka, clientID)
func getProducerConfigMap(ctx *config.Context, kafka *config.KafkaClusterConfig, clientID string) (*ckgo.ConfigMap, error) {
configMap, err := getCommonConfig(ctx, kafka, clientID)
if err != nil {
return nil, err
}
Expand All @@ -62,8 +67,8 @@ func getProducerConfigMap(kafka *config.KafkaClusterConfig, clientID string) (*c
return configMap, nil
}

func getConsumerConfigMap(group string, kafka *config.KafkaClusterConfig, clientID string) (*ckgo.ConfigMap, error) {
configMap, err := getCommonConfig(kafka, clientID)
func getConsumerConfigMap(group string, ctx *config.Context, kafka *config.KafkaClusterConfig, clientID string) (*ckgo.ConfigMap, error) {
configMap, err := getCommonConfig(ctx, kafka, clientID)
if err != nil {
return nil, err
}
Expand Down
Loading