-
Notifications
You must be signed in to change notification settings - Fork 26
APIE-844 - Confluent CLI support for kstreams #3260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tmalik (tmalikconfluent)
wants to merge
27
commits into
main
Choose a base branch
from
APIE-844
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
8f92b80
APIE-844 - Confluent CLI support for kstreams
tmalikconfluent 1603eeb
Update dependency
cqin-confluent 649039a
Update to better match API spec
cqin-confluent 07fca9a
Add integration tests
cqin-confluent e9fac8e
Fix empty struct with stream-group-member describe
cqin-confluent 42db63f
Remove redundant flag
cqin-confluent 06c14b8
Refactor for clearer command naming
cqin-confluent e9b2d66
Fix linter error
cqin-confluent 0e942b6
Update tests
cqin-confluent 0eaefca
Refactor to move stream-group to its own top-level command group
cqin-confluent 4ea10ce
Remove `kind` in output for redundancy
cqin-confluent 6b366b8
Fix trailing whitespace
cqin-confluent 5b1d3de
Rename to streams-group
cqin-confluent 6fc32c7
Use public SDK
cqin-confluent 86b889a
Nit
cqin-confluent f4aa18b
Merge
cqin-confluent d88c44b
Merge
cqin-confluent b8f2310
Fix go.mod
cqin-confluent 969a0bd
Fix linter
cqin-confluent 995bb00
Update naming to StreamsGroup
cqin-confluent 5f6d538
Update golden
cqin-confluent 9a934c2
Fix naming and remove annotation for LIST
cqin-confluent 1872892
Remove NewValidArgsFunction to fix incorrect autocompletin
cqin-confluent 3cf427d
Add flag completion for assignment-type
cqin-confluent 1b894ae
Add alias
cqin-confluent 000b965
Update formatting
cqin-confluent c014baa
Update formatting
cqin-confluent File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| ) | ||
|
|
||
| type streamsGroupCommand struct { | ||
| *pcmd.AuthenticatedCLICommand | ||
| } | ||
|
|
||
| type streamsGroupOut struct { | ||
| ClusterId string `human:"Cluster Id" serialized:"cluster_id"` | ||
| GroupId string `human:"Group Id" serialized:"group_id"` | ||
| State string `human:"State" serialized:"state"` | ||
| MemberCount int32 `human:"Member Count" serialized:"member_count"` | ||
| SubtopologyCount int32 `human:"Subtopology Count" serialized:"subtopology_count"` | ||
| GroupEpoch int32 `human:"Group Epoch" serialized:"group_epoch"` | ||
| TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"` | ||
| TargetAssignmentEpoch int32 `human:"Target Assignment Epoch" serialized:"target_assignment_epoch"` | ||
| Members string `human:"Members" serialized:"members"` | ||
| Subtopologies string `human:"Subtopologies" serialized:"subtopologies"` | ||
| } | ||
|
|
||
| func newStreamsGroupCommand(prerunner pcmd.PreRunner) *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "streams-group", | ||
| Short: "Manage Kafka streams groups.", | ||
| Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin}, | ||
| } | ||
|
|
||
| c := &streamsGroupCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)} | ||
|
|
||
| cmd.AddCommand(c.newStreamsGroupDescribeCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupListCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupMemberAssignmentCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupMemberCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupMemberTargetAssignmentCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupMemberTargetAssignmentTaskPartitionsCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupMemberTaskPartitionsCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupSubtopologyCommand()) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *streamsGroupCommand) validStreamsGroupArgs(cmd *cobra.Command, args []string) []string { | ||
| if len(args) > 0 { | ||
| return nil | ||
| } | ||
|
|
||
| if err := c.PersistentPreRunE(cmd, args); err != nil { | ||
|
Check warning on line 52 in internal/kafka/command_kafka_streams_group.go
|
||
| return nil | ||
| } | ||
|
|
||
| return pcmd.AutocompleteStreamsGroups(cmd, c.AuthenticatedCLICommand) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| ) | ||
|
|
||
| func (c *streamsGroupCommand) newStreamsGroupDescribeCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "describe <group>", | ||
| Short: "Describe a Kafka streams group.", | ||
| Args: cobra.ExactArgs(1), | ||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamsGroupArgs), | ||
| RunE: c.streamsGroupDescribe, | ||
| } | ||
|
|
||
| pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *streamsGroupCommand) streamsGroupDescribe(cmd *cobra.Command, args []string) error { | ||
| groupId := args[0] | ||
|
|
||
| kafkaREST, err := c.GetKafkaREST(cmd) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| streamsGroup, err := kafkaREST.CloudClient.GetKafkaStreamsGroup(groupId) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| table := output.NewTable(cmd) | ||
| table.Add(&streamsGroupOut{ | ||
| ClusterId: streamsGroup.GetClusterId(), | ||
| GroupId: streamsGroup.GetGroupId(), | ||
| State: streamsGroup.GetState(), | ||
| MemberCount: streamsGroup.GetMemberCount(), | ||
| SubtopologyCount: streamsGroup.GetSubtopologyCount(), | ||
| GroupEpoch: streamsGroup.GetGroupEpoch(), | ||
| TopologyEpoch: streamsGroup.GetTopologyEpoch(), | ||
| TargetAssignmentEpoch: streamsGroup.GetTargetAssignmentEpoch(), | ||
| Members: streamsGroup.Members.GetRelated(), | ||
| Subtopologies: streamsGroup.Subtopologies.GetRelated(), | ||
| }) | ||
|
|
||
| return table.Print() | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
|
|
||
| kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| ) | ||
|
|
||
| func (c *streamsGroupCommand) newStreamsGroupListCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "list", | ||
| Short: "List Kafka streams groups.", | ||
| Args: cobra.NoArgs, | ||
| RunE: c.listStreamsGroups, | ||
| } | ||
|
|
||
| pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *streamsGroupCommand) listStreamsGroups(cmd *cobra.Command, _ []string) error { | ||
| groups, err := c.getStreamsGroups(cmd) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| list := output.NewList(cmd) | ||
| for _, streamsGroup := range groups { | ||
| list.Add(&streamsGroupOut{ | ||
| ClusterId: streamsGroup.GetClusterId(), | ||
| GroupId: streamsGroup.GetGroupId(), | ||
| State: streamsGroup.GetState(), | ||
| MemberCount: streamsGroup.GetMemberCount(), | ||
| SubtopologyCount: streamsGroup.GetSubtopologyCount(), | ||
| GroupEpoch: streamsGroup.GetGroupEpoch(), | ||
| TopologyEpoch: streamsGroup.GetTopologyEpoch(), | ||
| TargetAssignmentEpoch: streamsGroup.GetTargetAssignmentEpoch(), | ||
| Members: streamsGroup.Members.GetRelated(), | ||
| Subtopologies: streamsGroup.Subtopologies.GetRelated(), | ||
| }) | ||
| } | ||
| return list.Print() | ||
| } | ||
|
|
||
| func (c *streamsGroupCommand) getStreamsGroups(cmd *cobra.Command) ([]kafkarestv3.StreamsGroupData, error) { | ||
| kafkaREST, err := c.GetKafkaREST(cmd) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| groups, err := kafkaREST.CloudClient.ListKafkaStreamsGroups() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return groups.Data, nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
| ) | ||
|
|
||
| type streamsGroupMemberOut struct { | ||
| Kind string `human:"Kind" serialized:"kind"` | ||
| ClusterId string `human:"Cluster Id" serialized:"cluster_id"` | ||
| GroupId string `human:"Group Id" serialized:"group_id"` | ||
| MemberId string `human:"Member Id" serialized:"member_id"` | ||
| ProcessId string `human:"Process Id" serialized:"process_id"` | ||
| ClientId string `human:"Client Id" serialized:"client_id"` | ||
| InstanceId string `human:"Instance Id" serialized:"instance_id"` | ||
| MemberEpoch int32 `human:"Member Epoch" serialized:"member_epoch"` | ||
| TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"` | ||
| IsClassic bool `human:"Is Classic" serialized:"is_classic"` | ||
| Assignments string `human:"Assignments" serialized:"assignments"` | ||
| TargetAssign string `human:"Target Assignment" serialized:"target_assignment"` | ||
| } | ||
|
|
||
| func (c *streamsGroupCommand) newStreamsGroupMemberCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "member", | ||
| Short: "Manage Kafka streams group members.", | ||
| } | ||
|
|
||
| cmd.AddCommand(c.newStreamsGroupMemberDescribeCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupMemberListCommand()) | ||
|
|
||
| return cmd | ||
| } |
28 changes: 28 additions & 0 deletions
28
internal/kafka/command_kafka_streams_group_member_assignment.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
| ) | ||
|
|
||
| type streamsGroupMemberAssignmentOut struct { | ||
| Kind string `human:"Kind" serialized:"kind"` | ||
| ClusterId string `human:"Cluster Id" serialized:"cluster_id"` | ||
| GroupId string `human:"Group Id" serialized:"group_id"` | ||
| MemberId string `human:"Member Id" serialized:"member_id"` | ||
| ActiveTasks string `human:"Active Tasks" serialized:"active_tasks"` | ||
| StandbyTasks string `human:"Standby Tasks" serialized:"standby_tasks"` | ||
| WarmupTasks string `human:"Warmup Tasks" serialized:"warmup_tasks"` | ||
| } | ||
|
|
||
| func (c *streamsGroupCommand) newStreamsGroupMemberAssignmentCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "member-assignment", | ||
| Aliases: []string{"ma"}, | ||
| Short: "Manage Kafka streams group member assignments.", | ||
| } | ||
|
|
||
| cmd.AddCommand(c.newStreamsGroupMemberAssignmentDescribeCommand()) | ||
| cmd.AddCommand(c.newStreamsGroupMemberAssignmentListCommand()) | ||
|
|
||
| return cmd | ||
| } |
61 changes: 61 additions & 0 deletions
61
internal/kafka/command_kafka_streams_group_member_assignment_describe.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| package kafka | ||
|
|
||
| import ( | ||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| ) | ||
|
|
||
| func (c *streamsGroupCommand) newStreamsGroupMemberAssignmentDescribeCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "describe <member>", | ||
| Short: "Describe a Kafka streams group member assignment.", | ||
| Args: cobra.ExactArgs(1), | ||
| RunE: c.streamsGroupMemberAssignmentDescribe, | ||
| } | ||
|
|
||
| cmd.Flags().String("group", "", "Group Id.") | ||
|
|
||
| pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
| cobra.CheckErr(cmd.MarkFlagRequired("group")) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *streamsGroupCommand) streamsGroupMemberAssignmentDescribe(cmd *cobra.Command, args []string) error { | ||
| groupId, err := cmd.Flags().GetString("group") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| memberId := args[0] | ||
|
|
||
| kafkaREST, err := c.GetKafkaREST(cmd) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| assignment, err := kafkaREST.CloudClient.GetKafkaStreamsGroupMemberAssignment(groupId, memberId) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| table := output.NewTable(cmd) | ||
| table.Add(&streamsGroupMemberAssignmentOut{ | ||
| Kind: assignment.GetKind(), | ||
| ClusterId: assignment.GetClusterId(), | ||
| GroupId: assignment.GetGroupId(), | ||
| MemberId: assignment.GetMemberId(), | ||
| ActiveTasks: assignment.ActiveTasks.GetRelated(), | ||
| StandbyTasks: assignment.StandbyTasks.GetRelated(), | ||
| WarmupTasks: assignment.WarmupTasks.GetRelated(), | ||
| }) | ||
|
|
||
| return table.Print() | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.