Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8f92b80
APIE-844 - Confluent CLI support for kstreams
tmalikconfluent Feb 13, 2026
1603eeb
Update dependency
cqin-confluent Mar 11, 2026
649039a
Update to better match API spec
cqin-confluent Mar 11, 2026
07fca9a
Add integration tests
cqin-confluent Mar 11, 2026
e9fac8e
Fix empty struct with stream-group-member describe
cqin-confluent Mar 13, 2026
42db63f
Remove redundant flag
cqin-confluent Mar 13, 2026
06c14b8
Refactor for clearer command naming
cqin-confluent Mar 13, 2026
e9b2d66
Fix linter error
cqin-confluent Mar 13, 2026
0e942b6
Update tests
cqin-confluent Mar 26, 2026
0eaefca
Refactor to move stream-group to its own top-level command group
cqin-confluent Apr 1, 2026
4ea10ce
Remove `kind` in output for redundancy
cqin-confluent Apr 1, 2026
6b366b8
Fix trailing whitespace
cqin-confluent Apr 1, 2026
5b1d3de
Rename to streams-group
cqin-confluent Apr 1, 2026
6fc32c7
Use public SDK
cqin-confluent Apr 13, 2026
86b889a
Nit
cqin-confluent Apr 13, 2026
f4aa18b
Merge
cqin-confluent Apr 13, 2026
d88c44b
Merge
cqin-confluent Apr 13, 2026
b8f2310
Fix go.mod
cqin-confluent Apr 13, 2026
969a0bd
Fix linter
cqin-confluent Apr 13, 2026
995bb00
Update naming to StreamsGroup
cqin-confluent Apr 13, 2026
5f6d538
Update golden
cqin-confluent Apr 13, 2026
9a934c2
Fix naming and remove annotation for LIST
cqin-confluent Apr 13, 2026
1872892
Remove NewValidArgsFunction to fix incorrect autocompletin
cqin-confluent Apr 13, 2026
3cf427d
Add flag completion for assignment-type
cqin-confluent Apr 13, 2026
1b894ae
Add alias
cqin-confluent Apr 14, 2026
000b965
Update formatting
cqin-confluent Apr 14, 2026
c014baa
Update formatting
cqin-confluent Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/lint/en_US.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
48977
48979
0/nm
0th/pt
1/n1
Expand Down Expand Up @@ -43627,6 +43627,7 @@ subterfuge/SM
subterranean
subtext/SM
subtitle/DSMG
subtopology
subtle/TR
subtlety/SM
subtly
Expand Down Expand Up @@ -47761,6 +47762,7 @@ warmness/M
warmonger/SMG
warmongering/M
warmth/M
warmup
warn/JDGS
warning/M
warp/MDGS
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0
github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.3.0
github.com/confluentinc/ccloud-sdk-go-v2/kafka-quotas v0.4.0
github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.0.0-20250909043602-f80bee0eb280
github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.25.0
github.com/confluentinc/ccloud-sdk-go-v2/ksql v0.2.0
github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0
github.com/confluentinc/ccloud-sdk-go-v2/metrics v0.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/kafka-quotas v0.4.0 h1:T9e7lNj/VjxE89+t
github.com/confluentinc/ccloud-sdk-go-v2/kafka-quotas v0.4.0/go.mod h1:7gqwWFIyj2MAGpL/kf6SGXm/pi2Z6qpMJIjKlgEEhhg=
github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.0.0-20250909043602-f80bee0eb280 h1:GFVI3pGckhpP66Xb05usB8txzubnnoigZHp292ax5Rg=
github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.0.0-20250909043602-f80bee0eb280/go.mod h1:b8v8EIBtpQDx0zAxCpGxhuSWBRAwh/+PRFNtaBR5P7c=
github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.25.0 h1:sV+PXI3jyEbqzyL8dS+v8DprMdwRD87vICegjgo1WBU=
github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.25.0/go.mod h1:KZa40uDMwQlwKJQtj44cjTuvwLnE/wgcPfYPFq+tD9A=
github.com/confluentinc/ccloud-sdk-go-v2/ksql v0.2.0 h1:g6OHa1iW3HO3N/YiTAL9Q6Y7rdjMBAjOPYK37akTt0M=
github.com/confluentinc/ccloud-sdk-go-v2/ksql v0.2.0/go.mod h1:0LAvd4VqlaRwKU4yvDEkVCtV43yNezt56+hBe9Lmg7Q=
github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0 h1:jIXXhGi+Xn+XYFCErnMvd035QijbYXla1Bo8W7V7lFM=
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd.AddCommand(newRegionCommand(prerunner))
cmd.AddCommand(newReplicaCommand(prerunner))
cmd.AddCommand(newShareGroupCommand(prerunner))
cmd.AddCommand(newStreamsGroupCommand(prerunner))
cmd.AddCommand(newTopicCommand(cfg, prerunner))
Comment thread
cqin-confluent marked this conversation as resolved.

return cmd
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type consumerGroupOut struct {
IsSimple bool `human:"Simple" serialized:"is_simple"`
PartitionAssignor string `human:"Partition Assignor" serialized:"partition_assignor"`
State string `human:"State" serialized:"state"`
ProtocolType string `human:"Type,omitempty" serialized:"type,omitempty"`
}
Comment thread
cqin-confluent marked this conversation as resolved.

func (c *consumerCommand) newGroupCommand(cfg *config.Config) *cobra.Command {
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (c *consumerCommand) groupDescribe(cmd *cobra.Command, args []string) error
IsSimple: group.GetIsSimple(),
PartitionAssignor: group.GetPartitionAssignor(),
State: group.GetState(),
ProtocolType: group.GetType(),
})
return table.Print()
}
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/command_consumer_group_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (c *consumerCommand) groupList(cmd *cobra.Command, _ []string) error {
IsSimple: group.GetIsSimple(),
PartitionAssignor: group.GetPartitionAssignor(),
State: group.GetState(),
ProtocolType: group.GetType(),
})
}
return list.Print()
Expand Down
57 changes: 57 additions & 0 deletions internal/kafka/command_kafka_streams_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

type streamsGroupCommand struct {
*pcmd.AuthenticatedCLICommand
}

type streamsGroupOut struct {
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
State string `human:"State" serialized:"state"`
MemberCount int32 `human:"Member Count" serialized:"member_count"`
SubtopologyCount int32 `human:"Subtopology Count" serialized:"subtopology_count"`
GroupEpoch int32 `human:"Group Epoch" serialized:"group_epoch"`
TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"`
TargetAssignmentEpoch int32 `human:"Target Assignment Epoch" serialized:"target_assignment_epoch"`
Members string `human:"Members" serialized:"members"`
Subtopologies string `human:"Subtopologies" serialized:"subtopologies"`
}

func newStreamsGroupCommand(prerunner pcmd.PreRunner) *cobra.Command {
cmd := &cobra.Command{
Use: "streams-group",
Short: "Manage Kafka streams groups.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin},
}

c := &streamsGroupCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}

cmd.AddCommand(c.newStreamsGroupDescribeCommand())
cmd.AddCommand(c.newStreamsGroupListCommand())
cmd.AddCommand(c.newStreamsGroupMemberAssignmentCommand())
cmd.AddCommand(c.newStreamsGroupMemberCommand())
cmd.AddCommand(c.newStreamsGroupMemberTargetAssignmentCommand())
cmd.AddCommand(c.newStreamsGroupMemberTargetAssignmentTaskPartitionsCommand())
cmd.AddCommand(c.newStreamsGroupMemberTaskPartitionsCommand())
cmd.AddCommand(c.newStreamsGroupSubtopologyCommand())

return cmd
}

func (c *streamsGroupCommand) validStreamsGroupArgs(cmd *cobra.Command, args []string) []string {
if len(args) > 0 {
return nil
}

if err := c.PersistentPreRunE(cmd, args); err != nil {

Check warning on line 52 in internal/kafka/command_kafka_streams_group.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Remove this unnecessary variable declaration and use the expression directly in the condition.

[S8193] Variables in if short statements should be used beyond just the condition See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3260&issues=f27d21d7-4731-4f6c-824d-452dc195b949&open=f27d21d7-4731-4f6c-824d-452dc195b949
return nil
}

return pcmd.AutocompleteStreamsGroups(cmd, c.AuthenticatedCLICommand)
}
56 changes: 56 additions & 0 deletions internal/kafka/command_kafka_streams_group_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *streamsGroupCommand) newStreamsGroupDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <group>",
Short: "Describe a Kafka streams group.",
Args: cobra.ExactArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamsGroupArgs),
RunE: c.streamsGroupDescribe,
}

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *streamsGroupCommand) streamsGroupDescribe(cmd *cobra.Command, args []string) error {
groupId := args[0]

kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return err
}

streamsGroup, err := kafkaREST.CloudClient.GetKafkaStreamsGroup(groupId)
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&streamsGroupOut{
ClusterId: streamsGroup.GetClusterId(),
GroupId: streamsGroup.GetGroupId(),
State: streamsGroup.GetState(),
MemberCount: streamsGroup.GetMemberCount(),
SubtopologyCount: streamsGroup.GetSubtopologyCount(),
GroupEpoch: streamsGroup.GetGroupEpoch(),
TopologyEpoch: streamsGroup.GetTopologyEpoch(),
TargetAssignmentEpoch: streamsGroup.GetTargetAssignmentEpoch(),
Members: streamsGroup.Members.GetRelated(),
Subtopologies: streamsGroup.Subtopologies.GetRelated(),
})

return table.Print()
}
65 changes: 65 additions & 0 deletions internal/kafka/command_kafka_streams_group_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package kafka

import (
"github.com/spf13/cobra"

kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *streamsGroupCommand) newStreamsGroupListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Kafka streams groups.",
Args: cobra.NoArgs,
RunE: c.listStreamsGroups,
}

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *streamsGroupCommand) listStreamsGroups(cmd *cobra.Command, _ []string) error {
groups, err := c.getStreamsGroups(cmd)
if err != nil {
return err
}

list := output.NewList(cmd)
for _, streamsGroup := range groups {
list.Add(&streamsGroupOut{
ClusterId: streamsGroup.GetClusterId(),
GroupId: streamsGroup.GetGroupId(),
State: streamsGroup.GetState(),
MemberCount: streamsGroup.GetMemberCount(),
SubtopologyCount: streamsGroup.GetSubtopologyCount(),
GroupEpoch: streamsGroup.GetGroupEpoch(),
TopologyEpoch: streamsGroup.GetTopologyEpoch(),
TargetAssignmentEpoch: streamsGroup.GetTargetAssignmentEpoch(),
Members: streamsGroup.Members.GetRelated(),
Subtopologies: streamsGroup.Subtopologies.GetRelated(),
})
}
return list.Print()
}

func (c *streamsGroupCommand) getStreamsGroups(cmd *cobra.Command) ([]kafkarestv3.StreamsGroupData, error) {
kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return nil, err
}

groups, err := kafkaREST.CloudClient.ListKafkaStreamsGroups()
if err != nil {
return nil, err
}

return groups.Data, nil
}
32 changes: 32 additions & 0 deletions internal/kafka/command_kafka_streams_group_member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kafka

import (
"github.com/spf13/cobra"
)

type streamsGroupMemberOut struct {
Kind string `human:"Kind" serialized:"kind"`
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
MemberId string `human:"Member Id" serialized:"member_id"`
ProcessId string `human:"Process Id" serialized:"process_id"`
ClientId string `human:"Client Id" serialized:"client_id"`
InstanceId string `human:"Instance Id" serialized:"instance_id"`
MemberEpoch int32 `human:"Member Epoch" serialized:"member_epoch"`
TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"`
IsClassic bool `human:"Is Classic" serialized:"is_classic"`
Assignments string `human:"Assignments" serialized:"assignments"`
TargetAssign string `human:"Target Assignment" serialized:"target_assignment"`
}

func (c *streamsGroupCommand) newStreamsGroupMemberCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "member",
Short: "Manage Kafka streams group members.",
}

cmd.AddCommand(c.newStreamsGroupMemberDescribeCommand())
cmd.AddCommand(c.newStreamsGroupMemberListCommand())

return cmd
}
28 changes: 28 additions & 0 deletions internal/kafka/command_kafka_streams_group_member_assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package kafka

import (
"github.com/spf13/cobra"
)

type streamsGroupMemberAssignmentOut struct {
Kind string `human:"Kind" serialized:"kind"`
ClusterId string `human:"Cluster Id" serialized:"cluster_id"`
GroupId string `human:"Group Id" serialized:"group_id"`
MemberId string `human:"Member Id" serialized:"member_id"`
ActiveTasks string `human:"Active Tasks" serialized:"active_tasks"`
StandbyTasks string `human:"Standby Tasks" serialized:"standby_tasks"`
WarmupTasks string `human:"Warmup Tasks" serialized:"warmup_tasks"`
}

func (c *streamsGroupCommand) newStreamsGroupMemberAssignmentCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "member-assignment",
Aliases: []string{"ma"},
Short: "Manage Kafka streams group member assignments.",
}

cmd.AddCommand(c.newStreamsGroupMemberAssignmentDescribeCommand())
cmd.AddCommand(c.newStreamsGroupMemberAssignmentListCommand())

return cmd
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *streamsGroupCommand) newStreamsGroupMemberAssignmentDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <member>",
Short: "Describe a Kafka streams group member assignment.",
Args: cobra.ExactArgs(1),
RunE: c.streamsGroupMemberAssignmentDescribe,
}

cmd.Flags().String("group", "", "Group Id.")

pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("group"))

return cmd
}

func (c *streamsGroupCommand) streamsGroupMemberAssignmentDescribe(cmd *cobra.Command, args []string) error {
groupId, err := cmd.Flags().GetString("group")
if err != nil {
return err
}

memberId := args[0]

kafkaREST, err := c.GetKafkaREST(cmd)
if err != nil {
return err
}

assignment, err := kafkaREST.CloudClient.GetKafkaStreamsGroupMemberAssignment(groupId, memberId)
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&streamsGroupMemberAssignmentOut{
Kind: assignment.GetKind(),
ClusterId: assignment.GetClusterId(),
GroupId: assignment.GetGroupId(),
MemberId: assignment.GetMemberId(),
ActiveTasks: assignment.ActiveTasks.GetRelated(),
StandbyTasks: assignment.StandbyTasks.GetRelated(),
WarmupTasks: assignment.WarmupTasks.GetRelated(),
})

return table.Print()
}
Loading