CBG-4249: Create an abstract DCPClient to be able to work with rosmar#7879
CBG-4249: Create an abstract DCPClient to be able to work with rosmar#7879
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR introduces an abstract DCPClient interface to support both Couchbase Server (via gocbcore) and Rosmar backends. The key change is creating a unified DCP client abstraction that dispatches to implementation-specific clients based on the underlying bucket type.
Key changes:
- Created
DCPClientinterface with implementationsGoCBDCPClient(renamed fromDCPClient) andRosmarDCPClient - Unified
DCPClientOptionsstruct replacing separate options for each implementation - Removed test skips for Rosmar/Walrus, enabling DCP-based tests to run against all bucket types
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| base/abstract_dcp_client.go | New abstraction layer with DCPClient interface and factory function NewDCPClient |
| base/rosmar_dcp_client.go | New Rosmar-specific DCP client implementation |
| base/dcp_client.go | Renamed DCPClient to GoCBDCPClient and DCPClientOptions to GoCBDCPClientOptions |
| base/dcp_client_stream_observer.go | Updated receiver types from DCPClient to GoCBDCPClient |
| base/gocb_dcp_feed.go | Refactored to use new client creation pattern |
| db/background_mgr_resync_dcp.go | Updated to use new abstract client with scope-based collection specification |
| db/background_mgr_attachment_migration.go | Updated to use new abstract client with scope-based collection specification |
| db/attachment_compaction.go | Updated to use new abstract client with scope-based collection specification |
| db/util_testing.go | Updated to use new abstract client with scope-based collection specification |
| db/background_mgr_resync_dcp_test.go | Removed Walrus test skips |
| db/background_mgr_attachment_migration_test.go | Removed Walrus test skips |
| db/attachment_compaction_test.go | Removed Walrus test skips |
| base/dcp_client_test.go | Removed Walrus test skips and commented out unported tests |
| tools/cache_perf_tool/dcpDataGeneration.go | Updated type references to GoCBDCPClient |
ec497ab to
8aa5326
Compare
8aa5326 to
dc06fb0
Compare
adamcfraser
left a comment
There was a problem hiding this comment.
A few initial comments, haven't done a full review.
bbrks
left a comment
There was a problem hiding this comment.
Dropping the few comments I had from yesterday's review
|
I'm working through some flaky tests that were probably flaky before but now have faster timing issues. I don't think this will affect the bulk of this review. |
bbrks
left a comment
There was a problem hiding this comment.
A few nits, question about Close() error value and import stats
xdcr/rosmar_xdcr.go
Outdated
There was a problem hiding this comment.
Should this channel read be returned now it returns error?
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 28 out of 28 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
db/attachment_compaction.go:402
- attachmentCompactSweepPhase still returns a string checkpoint prefix (and returns dcpClient.GetMetadataKeyPrefix()), but call sites/tests in this PR were updated to treat the second return value as a purgeCheckpoints func. This is a compile-time mismatch and also inconsistent with mark/cleanup phases now returning purgeCheckpointsFunc. Update sweep phase signature/returns accordingly (or revert the call-site changes).
clientOptions := getCompactionDCPClientOptions(
db,
compactionID,
base.NewCollectionNameSet(dataStore),
SweepPhase,
callback,
)
clientOptions.InitialMetadata = base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs)
base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for sweep phase of attachment compaction", compactionLoggingID)
dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions)
if err != nil {
base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err)
return 0, "", err
}
doneChan, err := dcpClient.Start()
if err != nil {
base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err)
_ = dcpClient.Close()
return 0, dcpClient.GetMetadataKeyPrefix(), err
}
| doneChan, err = client.Start() | ||
| if err != nil { | ||
| ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err) | ||
| _ = client.Close() | ||
| ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName)) | ||
| if doneChan != nil { | ||
| <-doneChan | ||
| } | ||
| return nil, err | ||
| } | ||
| InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName)) | ||
| go func() { | ||
| select { | ||
| case err := <-doneChan: | ||
| if err != nil { | ||
| WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err) | ||
| // FIXME: close dbContext here | ||
| } | ||
| break | ||
| case <-opts.Terminator: | ||
| InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName)) | ||
| // client.Close will always show error over doneChan | ||
| _ = client.Close() | ||
| dcpCloseErr := <-doneChan | ||
| if dcpCloseErr != nil { | ||
| WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) | ||
| } | ||
| break | ||
| } | ||
| }() | ||
| return doneChan, err |
There was a problem hiding this comment.
StartDCPFeed returns doneChan to callers and documents that it will deliver the feed termination error, but the goroutine inside StartDCPFeed also receives from doneChan (both the normal and terminator paths). That consumption means callers may never receive the actual error value (they’ll often just see a closed channel and a nil). Consider not reading from doneChan here, or teeing/forwarding the value into a separate returned channel so the termination error is preserved for the caller.
| doneChan, err = client.Start() | |
| if err != nil { | |
| ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err) | |
| _ = client.Close() | |
| ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName)) | |
| if doneChan != nil { | |
| <-doneChan | |
| } | |
| return nil, err | |
| } | |
| InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName)) | |
| go func() { | |
| select { | |
| case err := <-doneChan: | |
| if err != nil { | |
| WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err) | |
| // FIXME: close dbContext here | |
| } | |
| break | |
| case <-opts.Terminator: | |
| InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName)) | |
| // client.Close will always show error over doneChan | |
| _ = client.Close() | |
| dcpCloseErr := <-doneChan | |
| if dcpCloseErr != nil { | |
| WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) | |
| } | |
| break | |
| } | |
| }() | |
| return doneChan, err | |
| internalDoneChan, startErr := client.Start() | |
| if startErr != nil { | |
| ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), startErr) | |
| _ = client.Close() | |
| ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName)) | |
| return nil, startErr | |
| } | |
| userDoneChan := make(chan error, 1) | |
| InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName)) | |
| go func() { | |
| defer close(userDoneChan) | |
| select { | |
| case err := <-internalDoneChan: | |
| if err != nil { | |
| WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err) | |
| // FIXME: close dbContext here | |
| } | |
| // Forward the termination error to the caller-facing channel. | |
| userDoneChan <- err | |
| case <-opts.Terminator: | |
| InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName)) | |
| // client.Close will always show error over internalDoneChan | |
| _ = client.Close() | |
| dcpCloseErr := <-internalDoneChan | |
| if dcpCloseErr != nil { | |
| WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) | |
| } | |
| // Forward the close error (if any) to the caller-facing channel. | |
| userDoneChan <- dcpCloseErr | |
| } | |
| }() | |
| return userDoneChan, nil |
There was a problem hiding this comment.
I agree this is not quite right but I think it is out of scope to fix.
Create an abstract DCPClient to be able to work with rosmar
GocbV2Bucket.StartDCPFeedusage sinceDCPClientOptionshas more features thansgbucket.FeedArguments.sgbucket.Bucket.StartDCPFeedis needed to call into rosmar. A future ticket could try to merge these two options. For use in Sync Gateway, usebase.StartDCPFeedorbase.DCPClientStartDCPFeedtoStartGocbDCPFeedDatabaseContext.RequireAttachmentMigrationandResyncManagerDCP.VBUUIDs)assert.Len(t, dbCtx.RequireAttachmentMigration, 1)assertions when attachment migration runs quickly. Exposes an existing bugCBG-5038where Attachment Migration changes syncinfo. Skip those tests under rosmarPre-review checklist
fmt.Print,log.Print, ...)base.UD(docID),base.MD(dbName))docs/apiIntegration Tests
GSI=true,xattrs=truehttps://jenkins.sgwdev.com/job/SyncGatewayIntegration/451/ (failure totally unrelated and exists on main right now)