CBG-5190: Add node poller for shardedDCPFeedListener#8141
CBG-5190: Add node poller for shardedDCPFeedListener#8141RIT3shSapata wants to merge 11 commits intomainfrom
Conversation
torcolvin
left a comment
There was a problem hiding this comment.
Think about how to write tests for this code, we want to make sure this is robust for a variety of errors. Doc not found errors are going to look different for rosmar and CBS. You can make the polling window very fast for the sake of doing a unit test.
| nodeDefsKnownKey string | ||
| nodeDefsWantedKey string | ||
| nodeDefsKnownCAS uint64 // to store the last known cas of nodeDefsKnown | ||
| nodeDefsWantedCAS uint64 // to store the last known cas of nodeDefsWanted |
There was a problem hiding this comment.
We want to construct this as map[string]uint64 of doc name -> cas so we can extend this to multiple documents that we want to track.
From the perspective of heartbeat we only care about nodeDefsKnown and nodeDefsWanted but for cbgt and cfg we care about all files that cbgt.Cfg.Subscribe has been called on. This is should be all the files in https://github.com/couchbase/cbgt/blob/99fce75d027f0e89f8f8f5a70a686e831c594af7/manager.go#L1446
I think we can do something where this struct links into NewCfgSG.
You can add an optional cfgNodePoller to the the NewCfgSG constructor. If this is set, you know we always want to pay attention to these document keys but we also want to pay attention to anything that is called by CfgSG.Subscribe and register that document name in this map.
| nodeDefsKnownKey := cbgt.CfgNodeDefsKey(cbgt.NODE_DEFS_KNOWN) | ||
| _, nodeDefsKnownCas, err := datastore.GetRaw(nodeDefsKnownKey) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
We shouldn't need to necessarily do this from the constructor, we can do this when we start polling.
Note that you want to test scenarios including but not limited to these circumstances:
- start with document not present
- start with document present
- turn document into a tombstone
- resurrect document
- purge documents
DocNotFound is not an error condition for getting a cas value. We can log at WRN if there's any other type of error, I'd expect those to be things like circuitbreaker tripped and only happen when CBS goes down.
There was a problem hiding this comment.
Pull request overview
This draft PR introduces a KV-CAS polling mechanism (“node poller”) to detect cbgt cfg document changes for sharded DCP feeds (notably distributed resync), and wires additional context into sharded DCP feed startup to support that behavior.
Changes:
- Extend
base.NewCfgSGto optionally start acfgNodePoller, and register watched cfg keys onSubscribe. - Extend
base.ShardedDCPOptionswithDatastoreandFeedType, and pass these from import/resync feed startup. - Add extensive unit tests covering
cfgNodePollerregister/poll/polling lifecycle.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| db/sg_replicate_cfg_test.go | Updates NewCfgSG callsites to include the new useNodePoller parameter. |
| db/import_listener.go | Passes Datastore + FeedType into sharded DCP feed startup options. |
| db/database.go | Updates enterprise NewCfgSG construction for new signature. |
| db/background_mgr_resync_dcp.go | Uses cancelable context + enables node poller for resync cfg; passes Datastore + FeedType into sharded DCP options. |
| base/sg_cluster_cfg.go | Adds optional node poller to CfgSG and registers watched keys on subscription. |
| base/dcp_test.go | Updates tests to use new NewCfgSG signature (resync case enables node poller). |
| base/dcp_sharded.go | Adds Datastore/FeedType validation, updates heartbeat listener registration signature, and implements cfgNodePoller. |
| base/dcp_sharded_test.go | Adds unit tests for cfgNodePoller behavior and polling shutdown semantics. |
Comments suppressed due to low confidence (1)
db/background_mgr_resync_dcp.go:277
- The cancel function from context.WithCancelCause is only called in the deferred cleanup after StartShardedDCPFeed succeeds. If NewCfgSG starts the node poller and a later step returns early (e.g., StartShardedDCPFeed error), the poller goroutine will continue running because cancel is never invoked. Defer cancel immediately after creation (and call it on error paths) to guarantee cleanup.
// CFG creation:
ctx, cancel := context.WithCancelCause(ctx)
resyncCfg, err := base.NewCfgSG(ctx, db.MetadataStore, db.MetadataKeys.ResyncCfgPrefix(), true)
if err != nil {
return fmt.Errorf("Error creating resync cfg: %v", err)
}
indexName, err := base.GenerateCBGTIndexName(db.Name, base.ShardedDCPFeedTypeResync)
if err != nil {
return fmt.Errorf("Error generating CBGT index name: %v", err)
}
opts := base.ShardedDCPOptions{
DBName: db.Name,
UUID: db.UUID,
NumPartitions: db.Options.ImportOptions.ImportPartitions,
Collections: collectionNamesByScope,
Cfg: resyncCfg,
Heartbeater: resyncHB,
Bucket: bucket,
IndexType: base.CBGTIndexTypeSyncGatewayResync,
DestKey: resyncDestKey,
IndexName: indexName,
Datastore: db.MetadataStore,
FeedType: base.ShardedDCPFeedTypeResync,
}
resyncCbgtContext, err := base.StartShardedDCPFeed(loggingCtx, opts)
if err != nil {
return fmt.Errorf("Error starting resync sharded dcp feed: %v", err)
}
Redocly previews |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (1)
db/background_mgr_resync_dcp.go:286
- This passes the (potentially cancelled) ctx into Stop calls. If the derived ctx is cancelled to stop the cfg node poller, using it for shutdown can cause cleanup to abort early. Prefer using the original parent ctx (or a non-cancellable context) for Stop/cleanup, and reserve the derived cancelable context just for the poller lifetime.
defer func() {
resyncCbgtContext.Stop(ctx)
resyncHB.Stop(ctx)
}()
| ctx, cancel := context.WithCancelCause(ctx) | ||
| resyncCfg, err := base.NewCfgSG(ctx, db.MetadataStore, db.MetadataKeys.ResyncCfgPrefix(), true) |
There was a problem hiding this comment.
This shadows the incoming ctx with a derived cancelable context. Consider using a separate name (e.g. resyncCtx) for the context you intend to cancel for stopping the cfg node poller, so the parent ctx remains available for other operations and cleanup.
torcolvin
left a comment
There was a problem hiding this comment.
LGTM mostly, I think waiting on this PR to write some more tests is OK in case it changes the code.
Some nits which are mostly go idioms but the Lock should be addressed.
| watcher := make(map[string]uint64, len(p.keyWatcher)) | ||
| for key, cas := range p.keyWatcher { | ||
| watcher[key] = cas | ||
| } | ||
| return watcher |
There was a problem hiding this comment.
nit:
You can use maps.Clone here.
| for key, oldCas := range watcher { | ||
| newCas, err := p.datastore.Get(key, nil) | ||
| if err != nil && !IsDocNotFoundError(err) { | ||
| WarnfCtx(p.ctx, "error reading doc: %s %v", UD(key), err) |
There was a problem hiding this comment.
| WarnfCtx(p.ctx, "error reading doc: %s %v", UD(key), err) | |
| WarnfCtx(p.ctx, "cfgNodePoller: error polling doc: %s %v, skipping polling", UD(key), err) |
| p.lock.Lock() | ||
| currentCas, ok := p.keyWatcher[key] | ||
| if !ok || currentCas != oldCas { | ||
| p.lock.Unlock() | ||
| continue | ||
| } | ||
| p.keyWatcher[key] = newCas | ||
| p.lock.Unlock() |
There was a problem hiding this comment.
It's an anti pattern to not call Unlock() in a defer in case this code crashes.
The way around this is to create a function like:
func(p *cfgNodePoller) updateCas(key string, newCas) {
p.Lock()
defer p.lock.Unlock()
p.keyWatcher[key] = newCas
}
to mark whether the function is updated.
I don't see how we need to check the cas again here, I think you can just have a function to update the cas.
| if c.nodePoller != nil { | ||
| err := c.nodePoller.Register(c.sgCfgBucketKey(cfgKey)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } |
There was a problem hiding this comment.
very small nit, prefer non nested code when possible.
| if c.nodePoller != nil { | |
| err := c.nodePoller.Register(c.sgCfgBucketKey(cfgKey)) | |
| if err != nil { | |
| return err | |
| } | |
| } | |
| if c.nodePoller == nil { | |
| return nil | |
| } | |
| return c.nodePoller.Register(c.sgCfgBucketKey(cfgKey)) |
CBG-5190
Describe your PR here...
Pre-review checklist
fmt.Print,log.Print, ...)base.UD(docID),base.MD(dbName))docs/apiDependencies (if applicable)
Integration Tests