Skip to content

CBG-4249: Create an abstract DCPClient to be able to work with rosmar#7879

Open
torcolvin wants to merge 35 commits intomainfrom
dcp-abstract
Open

CBG-4249: Create an abstract DCPClient to be able to work with rosmar#7879
torcolvin wants to merge 35 commits intomainfrom
dcp-abstract

Conversation

@torcolvin
Copy link
Copy Markdown
Collaborator

@torcolvin torcolvin commented Nov 13, 2025

Create an abstract DCPClient to be able to work with rosmar

  • Enable DCP to work with attachment migration and resync. attachment compaction since this uses hierarchical paths in rosmar for xattr subdoc operations https://jira.issues.couchbase.com/browse/CBG-4232
  • Remove GocbV2Bucket.StartDCPFeed usage since DCPClientOptions has more features than sgbucket.FeedArguments. sgbucket.Bucket.StartDCPFeed is needed to call into rosmar. A future ticket could try to merge these two options. For use in Sync Gateway, use base.StartDCPFeed or base.DCPClient
  • Moves guts of StartDCPFeed to StartGocbDCPFeed
  • Fixes a few data race conditions due to fast DCP (DatabaseContext.RequireAttachmentMigration and ResyncManagerDCP.VBUUIDs)
  • remove assert.Len(t, dbCtx.RequireAttachmentMigration, 1) assertions when attachment migration runs quickly. Exposes an existing bug CBG-5038 where Attachment Migration changes syncinfo. Skip those tests under rosmar

Pre-review checklist

  • Removed debug logging (fmt.Print, log.Print, ...)
  • Logging sensitive data? Make sure it's tagged (e.g. base.UD(docID), base.MD(dbName))
  • Updated relevant information in the API specifications (such as endpoint descriptions, schemas, ...) in docs/api

Integration Tests

Copilot AI review requested due to automatic review settings November 13, 2025 23:06
@torcolvin torcolvin marked this pull request as draft November 13, 2025 23:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces an abstract DCPClient interface to support both Couchbase Server (via gocbcore) and Rosmar backends. The key change is creating a unified DCP client abstraction that dispatches to implementation-specific clients based on the underlying bucket type.

Key changes:

  • Created DCPClient interface with implementations GoCBDCPClient (renamed from DCPClient) and RosmarDCPClient
  • Unified DCPClientOptions struct replacing separate options for each implementation
  • Removed test skips for Rosmar/Walrus, enabling DCP-based tests to run against all bucket types

Reviewed Changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
base/abstract_dcp_client.go New abstraction layer with DCPClient interface and factory function NewDCPClient
base/rosmar_dcp_client.go New Rosmar-specific DCP client implementation
base/dcp_client.go Renamed DCPClient to GoCBDCPClient and DCPClientOptions to GoCBDCPClientOptions
base/dcp_client_stream_observer.go Updated receiver types from DCPClient to GoCBDCPClient
base/gocb_dcp_feed.go Refactored to use new client creation pattern
db/background_mgr_resync_dcp.go Updated to use new abstract client with scope-based collection specification
db/background_mgr_attachment_migration.go Updated to use new abstract client with scope-based collection specification
db/attachment_compaction.go Updated to use new abstract client with scope-based collection specification
db/util_testing.go Updated to use new abstract client with scope-based collection specification
db/background_mgr_resync_dcp_test.go Removed Walrus test skips
db/background_mgr_attachment_migration_test.go Removed Walrus test skips
db/attachment_compaction_test.go Removed Walrus test skips
base/dcp_client_test.go Removed Walrus test skips and commented out unported tests
tools/cache_perf_tool/dcpDataGeneration.go Updated type references to GoCBDCPClient

@torcolvin torcolvin requested a review from Copilot December 2, 2025 02:06
@torcolvin torcolvin changed the title Prototype: Create an abstract DCPClient to be able to work with rosmar CBG-4249: Create an abstract DCPClient to be able to work with rosmar Dec 2, 2025
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 33 out of 33 changed files in this pull request and generated 9 comments.

Copy link
Copy Markdown
Collaborator

@adamcfraser adamcfraser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few initial comments, haven't done a full review.

Copy link
Copy Markdown
Member

@bbrks bbrks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping the few comments I had from yesterday's review

@torcolvin torcolvin requested a review from bbrks March 11, 2026 16:50
@torcolvin torcolvin assigned bbrks and unassigned torcolvin Mar 11, 2026
@torcolvin torcolvin marked this pull request as ready for review March 12, 2026 13:28
@torcolvin
Copy link
Copy Markdown
Collaborator Author

I'm working through some flaky tests that were probably flaky before but now have faster timing issues. I don't think this will affect the bulk of this review.

Copy link
Copy Markdown
Member

@bbrks bbrks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits, question about Close() error value and import stats

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this channel read be returned now it returns error?

@bbrks bbrks assigned torcolvin and unassigned bbrks Mar 17, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 28 out of 28 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (1)

db/attachment_compaction.go:402

  • attachmentCompactSweepPhase still returns a string checkpoint prefix (and returns dcpClient.GetMetadataKeyPrefix()), but call sites/tests in this PR were updated to treat the second return value as a purgeCheckpoints func. This is a compile-time mismatch and also inconsistent with mark/cleanup phases now returning purgeCheckpointsFunc. Update sweep phase signature/returns accordingly (or revert the call-site changes).
	clientOptions := getCompactionDCPClientOptions(
		db,
		compactionID,
		base.NewCollectionNameSet(dataStore),
		SweepPhase,
		callback,
	)
	clientOptions.InitialMetadata = base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs)

	base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for sweep phase of attachment compaction", compactionLoggingID)
	dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions)
	if err != nil {
		base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err)
		return 0, "", err
	}

	doneChan, err := dcpClient.Start()
	if err != nil {
		base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err)
		_ = dcpClient.Close()
		return 0, dcpClient.GetMetadataKeyPrefix(), err
	}

Comment on lines +85 to +115
doneChan, err = client.Start()
if err != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
_ = client.Close()
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName))
if doneChan != nil {
<-doneChan
}
return nil, err
}
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
select {
case err := <-doneChan:
if err != nil {
WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
// FIXME: close dbContext here
}
break
case <-opts.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
// client.Close will always show error over doneChan
_ = client.Close()
dcpCloseErr := <-doneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
break
}
}()
return doneChan, err
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartDCPFeed returns doneChan to callers and documents that it will deliver the feed termination error, but the goroutine inside StartDCPFeed also receives from doneChan (both the normal and terminator paths). That consumption means callers may never receive the actual error value (they’ll often just see a closed channel and a nil). Consider not reading from doneChan here, or teeing/forwarding the value into a separate returned channel so the termination error is preserved for the caller.

Suggested change
doneChan, err = client.Start()
if err != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
_ = client.Close()
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName))
if doneChan != nil {
<-doneChan
}
return nil, err
}
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
select {
case err := <-doneChan:
if err != nil {
WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
// FIXME: close dbContext here
}
break
case <-opts.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
// client.Close will always show error over doneChan
_ = client.Close()
dcpCloseErr := <-doneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
break
}
}()
return doneChan, err
internalDoneChan, startErr := client.Start()
if startErr != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), startErr)
_ = client.Close()
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName))
return nil, startErr
}
userDoneChan := make(chan error, 1)
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
defer close(userDoneChan)
select {
case err := <-internalDoneChan:
if err != nil {
WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
// FIXME: close dbContext here
}
// Forward the termination error to the caller-facing channel.
userDoneChan <- err
case <-opts.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
// client.Close will always show error over internalDoneChan
_ = client.Close()
dcpCloseErr := <-internalDoneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
// Forward the close error (if any) to the caller-facing channel.
userDoneChan <- dcpCloseErr
}
}()
return userDoneChan, nil

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is not quite right but I think it is out of scope to fix.

@torcolvin torcolvin assigned bbrks and unassigned torcolvin Mar 24, 2026
@bbrks bbrks assigned torcolvin and unassigned bbrks Mar 26, 2026
@torcolvin torcolvin removed their assignment Apr 7, 2026
@torcolvin torcolvin requested a review from bbrks April 7, 2026 13:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants