Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d5c2c22
Add collection names struct
torcolvin Feb 19, 2026
a1285a7
Merge remote-tracking branch 'origin/main' into dcp-common-collection…
torcolvin Feb 23, 2026
1d00cad
merge upstream
torcolvin Feb 23, 2026
d97c3fc
Merge remote-tracking branch 'origin/main' into dcp-common-collection…
torcolvin Feb 23, 2026
db02cd3
fix lint error
torcolvin Feb 23, 2026
be47f65
Allow collections to work
torcolvin Feb 24, 2026
2334276
temporary debugging
torcolvin Feb 26, 2026
3e79f96
Merge remote-tracking branch 'origin/main' into dcp-common-collection…
torcolvin Feb 27, 2026
beec91f
Remove duplicate code
torcolvin Feb 27, 2026
a361250
use newcollection
torcolvin Mar 4, 2026
cb0796b
Add debug code
torcolvin Mar 4, 2026
9ae6a07
Merge remote-tracking branch 'origin/main' into dcp-common-collection…
torcolvin Mar 5, 2026
2883196
remove debugging
torcolvin Mar 5, 2026
42e0007
Remove unused test
torcolvin Mar 5, 2026
347e8ef
Merge remote-tracking branch 'origin/main' into dcp-common-collection…
torcolvin Mar 9, 2026
d3bb65c
CBG-4249 implement a generic DCPclient using rosmar
torcolvin Mar 10, 2026
58b70c9
Merge remote-tracking branch 'origin/main' into dcp-abstract
torcolvin Mar 10, 2026
b120080
final comments
torcolvin Mar 11, 2026
c9b6088
fix flaky test
torcolvin Mar 11, 2026
bc094c8
don't warn over done chan
torcolvin Mar 11, 2026
85d6267
copilot comments
torcolvin Mar 11, 2026
f10777d
Apply suggestions from code review
torcolvin Mar 11, 2026
919d3d6
Remove unused arg
torcolvin Mar 11, 2026
1aa585e
fix data races
torcolvin Mar 11, 2026
9480c4f
Remove flaky assertion
torcolvin Mar 11, 2026
5197b97
Small fixes
torcolvin Mar 19, 2026
5abe6f8
Speed up slow test, allow for longer wait time for CI and -race
torcolvin Mar 19, 2026
c8444d7
move purge checkpoint func into DCPClient
torcolvin Mar 19, 2026
a738d0a
fix up
torcolvin Mar 19, 2026
d0a2784
Merge remote-tracking branch 'origin/main' into dcp-abstract
torcolvin Mar 24, 2026
d97dce8
Add back code for data race
torcolvin Mar 24, 2026
3c8d755
Remove extra constructors
torcolvin Mar 26, 2026
d542b1a
Merge remote-tracking branch 'origin/main' into dcp-abstract
torcolvin Apr 7, 2026
22f13ec
switch to use new DCPClient API
torcolvin Apr 7, 2026
d4280c7
Add error checks
torcolvin Apr 7, 2026
4077fdc
Protect from panic
torcolvin Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ func (b *GocbV2Bucket) IsMinimumVersion(requiredMajor, requiredMinor uint64) boo
return IsMinimumVersion(b.clusterCompatMajorVersion, b.clusterCompatMinorVersion, requiredMajor, requiredMinor)
}

// StartDCPFeed is not supported anymore and only exists to satisfy sgbucket.Bucket interface. Use NewDCPClient.
func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
groupID := ""
return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID)
return errors.New("GocbV2Bucket does not support StartDCPFeed; use NewDCPClient instead")
}

func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) {
Expand Down
49 changes: 49 additions & 0 deletions base/collection_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package base

import (
"errors"
"maps"
"slices"

sgbucket "github.com/couchbase/sg-bucket"
Expand All @@ -24,6 +25,9 @@ type ScopeAndCollectionName = sgbucket.DataStoreNameImpl
// CollectionNames represent a map of scope names to collection names.
type CollectionNames map[string][]string

// CollectionNameSet represents a unique set of collection names.
type CollectionNameSet map[string]map[string]struct{}

func DefaultScopeAndCollectionName() ScopeAndCollectionName {
return ScopeAndCollectionName{Scope: DefaultScope, Collection: DefaultCollection}
}
Expand Down Expand Up @@ -61,6 +65,20 @@ func (c CollectionNames) Add(ds ...sgbucket.DataStoreName) {
}
}

// ToCollectionNameSet converts CollectionNames to a CollectionNameSet.
func (c CollectionNames) ToCollectionNameSet() CollectionNameSet {
collectionNameSet := make(CollectionNameSet, len(c))
for scopeName, collections := range c {
if _, ok := collectionNameSet[scopeName]; !ok {
collectionNameSet[scopeName] = make(map[string]struct{})
}
for _, collection := range collections {
collectionNameSet[scopeName][collection] = struct{}{}
}
}
return collectionNameSet
}

// NewCollectionNames creates a new CollectionNames from specified collections. Does not deduplicate collections.
func NewCollectionNames(ds ...sgbucket.DataStoreName) CollectionNames {
c := make(CollectionNames, 1)
Expand All @@ -70,3 +88,34 @@ func NewCollectionNames(ds ...sgbucket.DataStoreName) CollectionNames {
}
return c
}

// Add adds the provided collections to map.
func (c CollectionNameSet) Add(ds ...sgbucket.DataStoreName) {
for _, d := range ds {
if _, ok := c[d.ScopeName()]; !ok {
c[d.ScopeName()] = make(map[string]struct{})
}
c[d.ScopeName()][d.CollectionName()] = struct{}{}
}
}

// toCollectionNames converts to CollectionNames, sorting the list of collection names for each scope.
func (c CollectionNameSet) ToCollectionNames() CollectionNames {
collectionNames := make(CollectionNames, len(c))
for scopeName, collections := range c {
if _, ok := collectionNames[scopeName]; !ok {
collectionNames[scopeName] = make([]string, 0, len(collections))
}
names := slices.Collect(maps.Keys(collections))
slices.Sort(names)
collectionNames[scopeName] = names
}
return collectionNames
}

// NewCollectionNameSet creates a new set from a set of data stores.
func NewCollectionNameSet(ds ...sgbucket.DataStoreName) CollectionNameSet {
c := make(CollectionNameSet, 1)
c.Add(ds...)
return c
}
183 changes: 183 additions & 0 deletions base/dcp_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2025-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package base

import (
"context"
"expvar"
"fmt"

"github.com/couchbase/gocbcore/v10"
sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbaselabs/rosmar"
)

// DCPClient is an interface for all DCP implementations.
type DCPClient interface {
// Start will start the DCP feed. It returns a channel marking the end of the feed.
Start() (chan error, error)
// Close will shut down the DCP feed.
Close() error
// GetMetadata returns the current DCP metadata.
GetMetadata() []DCPMetadata
// GetMetadataKeyPrefix returns the key prefix used for storing any persistent data.
GetMetadataKeyPrefix() string

// PurgeCheckpoints deletes the checkpoint document for the feed. Calling this function while the feed is running
// will not alter the feed nor remove the checkpoint for the future.
PurgeCheckpoints() error
}

// DCPClientOptions are options for creating a DCPClient.
type DCPClientOptions struct {
FeedID string // Optional feed ID to distinguish the feed from others in logging
Callback sgbucket.FeedEventCallbackFunc // callback function for DCP events
DBStats *expvar.Map // these options are used only for gocbcore implementation, these stats are not shared by prometheus stats
CheckpointPrefix string // start of the checkpoint documents
CollectionNames CollectionNameSet // scopes and collections to monitor
InitialMetadata []DCPMetadata // initial metadata to seed the DCP client with
MetadataStoreType DCPMetadataStoreType // persistent or in memory storage
OneShot bool // if true, the feed runs to latest document found when the client is started
FailOnRollback bool // if true, fail Start if the current DCP checkpoints encounter a rollback condition
Terminator chan bool // optional channel that can be closed to terminate the DCP feed, this will be replaced with a context option.
FromLatestSequence bool // If true, start at latest sequence.
FeedContent sgbucket.FeedContent // feedContent specifies whether the DCP feed should include values, xattrs, or both
}

// NewDCPClient creates a new DCPClient to receive events from a bucket.
func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DCPClient, error) {
if bucket == nil {
return nil, fmt.Errorf("bucket must be provided")
} else if opts.Callback == nil {
return nil, fmt.Errorf("DCPClientOptions.Callback must be provided")
} else if len(opts.CollectionNames) == 0 {
return nil, fmt.Errorf("DCPClientOptions.CollectionNames must be provided")
} else if opts.FromLatestSequence && len(opts.InitialMetadata) > 0 {
return nil, fmt.Errorf("DCPClientOptions.InitialMetadata cannot be provided when FromLatestSequence is true")
} else if opts.MetadataStoreType == DCPMetadataStoreInMemory && opts.CheckpointPrefix != "" {
return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix cannot be provided when MetadataStoreType is InMemory")
Comment thread
torcolvin marked this conversation as resolved.
} else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.CheckpointPrefix == "" {
return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix must be provided when MetadataStoreType is persistent")
}
underlyingBucket := GetBaseBucket(bucket)
if _, ok := underlyingBucket.(*rosmar.Bucket); ok {
return NewRosmarDCPClient(ctx, bucket, opts)
}
gocbBucket, ok := underlyingBucket.(*GocbV2Bucket)
if !ok {
return nil, fmt.Errorf("bucket type %T does not have a DCPClient implementation", underlyingBucket)
}
var collectionIDs []uint32

cm, err := gocbBucket.GetCollectionManifest()
if err != nil {
return nil, err
}

for scopeName, collections := range opts.CollectionNames {
// should only be one args.Scope so cheaper to iterate this way around
var manifestScope *gocbcore.ManifestScope
for _, ms := range cm.Scopes {
if scopeName == ms.Name {
manifestScope = &ms
break
}
}
if manifestScope == nil {
return nil, RedactErrorf("scope %s not found", MD(scopeName))
}
collectionsFound := make(map[string]struct{})
// should be less than or equal number of args.collections than cm.scope.collections, so iterate this way so that the inner loop completes quicker on average
for _, manifestCollection := range manifestScope.Collections {
for collectionName := range collections {
if collectionName != manifestCollection.Name {
continue
}
collectionIDs = append(collectionIDs, manifestCollection.UID)
collectionsFound[collectionName] = struct{}{}
}
}
if len(collectionsFound) != len(collections) {
for collectionName := range collections {
if _, ok := collectionsFound[collectionName]; !ok {
return nil, RedactErrorf("collection %s not found in scope %s %+v", MD(collectionName), MD(manifestScope.Name), manifestScope.Collections)
}
}
}
}
options := GoCBDCPClientOptions{
FeedID: opts.FeedID,
MetadataStoreType: opts.MetadataStoreType,
DbStats: opts.DBStats,
CollectionIDs: collectionIDs,
AgentPriority: gocbcore.DcpAgentPriorityMed,
CheckpointPrefix: opts.CheckpointPrefix,
OneShot: opts.OneShot,
FailOnRollback: opts.FailOnRollback,
InitialMetadata: opts.InitialMetadata,
FeedContent: opts.FeedContent,
}

if opts.FromLatestSequence {
metadata, err := getHighSeqMetadata(gocbBucket)
if err != nil {
return nil, err
}
options.InitialMetadata = metadata
}

return NewGocbDCPClient(
ctx,
opts.Callback,
options,
gocbBucket)
}

// StartDCPFeed creates and starts a DCP feed. This function will return as soon as the feed is started. doneChan is
// sent a single error value when the feed terminates.
func StartDCPFeed(ctx context.Context, bucket Bucket, opts DCPClientOptions) (doneChan <-chan error, err error) {
client, err := NewDCPClient(ctx, bucket, opts)
if err != nil {
return nil, err
}
bucketName := bucket.GetName()
feedName := opts.FeedID

doneChan, err = client.Start()
if err != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
_ = client.Close()
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName))
if doneChan != nil {
<-doneChan
}
return nil, err
}
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
select {
case err := <-doneChan:
Comment thread
torcolvin marked this conversation as resolved.
if err != nil {
WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
// FIXME: close dbContext here
}
break
case <-opts.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
// client.Close will always show error over doneChan
_ = client.Close()
dcpCloseErr := <-doneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
Comment thread
torcolvin marked this conversation as resolved.
}
break
}
}()
return doneChan, err
Comment thread
torcolvin marked this conversation as resolved.
Comment on lines +152 to +182
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartDCPFeed returns doneChan to callers and documents that it will deliver the feed termination error, but the goroutine inside StartDCPFeed also receives from doneChan (both the normal and terminator paths). That consumption means callers may never receive the actual error value (they’ll often just see a closed channel and a nil). Consider not reading from doneChan here, or teeing/forwarding the value into a separate returned channel so the termination error is preserved for the caller.

Suggested change
doneChan, err = client.Start()
if err != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
_ = client.Close()
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName))
if doneChan != nil {
<-doneChan
}
return nil, err
}
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
select {
case err := <-doneChan:
if err != nil {
WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
// FIXME: close dbContext here
}
break
case <-opts.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
// client.Close will always show error over doneChan
_ = client.Close()
dcpCloseErr := <-doneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
break
}
}()
return doneChan, err
internalDoneChan, startErr := client.Start()
if startErr != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), startErr)
_ = client.Close()
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName))
return nil, startErr
}
userDoneChan := make(chan error, 1)
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
defer close(userDoneChan)
select {
case err := <-internalDoneChan:
if err != nil {
WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
// FIXME: close dbContext here
}
// Forward the termination error to the caller-facing channel.
userDoneChan <- err
case <-opts.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
// client.Close will always show error over internalDoneChan
_ = client.Close()
dcpCloseErr := <-internalDoneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
// Forward the close error (if any) to the caller-facing channel.
userDoneChan <- dcpCloseErr
}
}()
return userDoneChan, nil

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is not quite right but I think it is out of scope to fix.

}
Loading
Loading