Skip to content

CBG-5190: Add node poller for shardedDCPFeedListener#8141

Open
RIT3shSapata wants to merge 11 commits intomainfrom
CBG-5190
Open

CBG-5190: Add node poller for shardedDCPFeedListener#8141
RIT3shSapata wants to merge 11 commits intomainfrom
CBG-5190

Conversation

@RIT3shSapata
Copy link
Copy Markdown
Contributor

@RIT3shSapata RIT3shSapata commented Mar 30, 2026

CBG-5190

Describe your PR here...

  • Added a node poller that polls for all the CBGT subscribed documents
  • This node poller is used only for distributed resync
  • Whenever there is a change in any of the documents used by CBGT, an event is fired to reload the nodes
  • added test coverage for the node poller

Pre-review checklist

  • Removed debug logging (fmt.Print, log.Print, ...)
  • Logging sensitive data? Make sure it's tagged (e.g. base.UD(docID), base.MD(dbName))
  • Updated relevant information in the API specifications (such as endpoint descriptions, schemas, ...) in docs/api

Dependencies (if applicable)

  • Link upstream PRs
  • Update Go module dependencies when merged

Integration Tests

Copy link
Copy Markdown
Collaborator

@torcolvin torcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about how to write tests for this code, we want to make sure this is robust for a variety of errors. Doc not found errors are going to look different for rosmar and CBS. You can make the polling window very fast for the sake of doing a unit test.

Comment thread base/dcp_sharded.go Outdated
nodeDefsKnownKey string
nodeDefsWantedKey string
nodeDefsKnownCAS uint64 // to store the last known cas of nodeDefsKnown
nodeDefsWantedCAS uint64 // to store the last known cas of nodeDefsWanted
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to construct this as map[string]uint64 of doc name -> cas so we can extend this to multiple documents that we want to track.

From the perspective of heartbeat we only care about nodeDefsKnown and nodeDefsWanted but for cbgt and cfg we care about all files that cbgt.Cfg.Subscribe has been called on. This is should be all the files in https://github.com/couchbase/cbgt/blob/99fce75d027f0e89f8f8f5a70a686e831c594af7/manager.go#L1446

I think we can do something where this struct links into NewCfgSG.

You can add an optional cfgNodePoller to the the NewCfgSG constructor. If this is set, you know we always want to pay attention to these document keys but we also want to pay attention to anything that is called by CfgSG.Subscribe and register that document name in this map.

Comment thread base/dcp_sharded.go Outdated
Comment on lines +609 to +613
nodeDefsKnownKey := cbgt.CfgNodeDefsKey(cbgt.NODE_DEFS_KNOWN)
_, nodeDefsKnownCas, err := datastore.GetRaw(nodeDefsKnownKey)
if err != nil {
return nil, err
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to necessarily do this from the constructor, we can do this when we start polling.

Note that you want to test scenarios including but not limited to these circumstances:

  1. start with document not present
  2. start with document present
  3. turn document into a tombstone
  4. resurrect document
  5. purge documents

DocNotFound is not an error condition for getting a cas value. We can log at WRN if there's any other type of error, I'd expect those to be things like circuitbreaker tripped and only happen when CBS goes down.

@torcolvin torcolvin assigned RIT3shSapata and unassigned torcolvin Apr 2, 2026
Base automatically changed from heartbeater-refactor to main April 2, 2026 14:49
@RIT3shSapata RIT3shSapata marked this pull request as ready for review April 6, 2026 20:39
Copilot AI review requested due to automatic review settings April 6, 2026 20:39
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This draft PR introduces a KV-CAS polling mechanism (“node poller”) to detect cbgt cfg document changes for sharded DCP feeds (notably distributed resync), and wires additional context into sharded DCP feed startup to support that behavior.

Changes:

  • Extend base.NewCfgSG to optionally start a cfgNodePoller, and register watched cfg keys on Subscribe.
  • Extend base.ShardedDCPOptions with Datastore and FeedType, and pass these from import/resync feed startup.
  • Add extensive unit tests covering cfgNodePoller register/poll/polling lifecycle.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
db/sg_replicate_cfg_test.go Updates NewCfgSG callsites to include the new useNodePoller parameter.
db/import_listener.go Passes Datastore + FeedType into sharded DCP feed startup options.
db/database.go Updates enterprise NewCfgSG construction for new signature.
db/background_mgr_resync_dcp.go Uses cancelable context + enables node poller for resync cfg; passes Datastore + FeedType into sharded DCP options.
base/sg_cluster_cfg.go Adds optional node poller to CfgSG and registers watched keys on subscription.
base/dcp_test.go Updates tests to use new NewCfgSG signature (resync case enables node poller).
base/dcp_sharded.go Adds Datastore/FeedType validation, updates heartbeat listener registration signature, and implements cfgNodePoller.
base/dcp_sharded_test.go Adds unit tests for cfgNodePoller behavior and polling shutdown semantics.
Comments suppressed due to low confidence (1)

db/background_mgr_resync_dcp.go:277

  • The cancel function from context.WithCancelCause is only called in the deferred cleanup after StartShardedDCPFeed succeeds. If NewCfgSG starts the node poller and a later step returns early (e.g., StartShardedDCPFeed error), the poller goroutine will continue running because cancel is never invoked. Defer cancel immediately after creation (and call it on error paths) to guarantee cleanup.
		// CFG creation:
		ctx, cancel := context.WithCancelCause(ctx)
		resyncCfg, err := base.NewCfgSG(ctx, db.MetadataStore, db.MetadataKeys.ResyncCfgPrefix(), true)
		if err != nil {
			return fmt.Errorf("Error creating resync cfg: %v", err)
		}

		indexName, err := base.GenerateCBGTIndexName(db.Name, base.ShardedDCPFeedTypeResync)
		if err != nil {
			return fmt.Errorf("Error generating CBGT index name: %v", err)
		}
		opts := base.ShardedDCPOptions{
			DBName:        db.Name,
			UUID:          db.UUID,
			NumPartitions: db.Options.ImportOptions.ImportPartitions,
			Collections:   collectionNamesByScope,
			Cfg:           resyncCfg,
			Heartbeater:   resyncHB,
			Bucket:        bucket,
			IndexType:     base.CBGTIndexTypeSyncGatewayResync,
			DestKey:       resyncDestKey,
			IndexName:     indexName,
			Datastore:     db.MetadataStore,
			FeedType:      base.ShardedDCPFeedTypeResync,
		}
		resyncCbgtContext, err := base.StartShardedDCPFeed(loggingCtx, opts)

		if err != nil {
			return fmt.Errorf("Error starting resync sharded dcp feed: %v", err)
		}

Comment thread base/dcp_sharded.go Outdated
Comment thread base/dcp_sharded.go Outdated
Comment thread base/sg_cluster_cfg.go Outdated
Comment thread base/sg_cluster_cfg.go
@RIT3shSapata RIT3shSapata requested a review from torcolvin April 6, 2026 20:49
@RIT3shSapata RIT3shSapata changed the title add node poller for shardedDCPFeedListener CBG-5190: Add node poller for shardedDCPFeedListener Apr 6, 2026
Comment thread base/dcp_sharded.go
Comment thread base/dcp_sharded.go
Comment thread base/dcp_sharded.go
Comment thread base/dcp_sharded.go Outdated
Comment thread base/dcp_sharded.go Outdated
Comment thread base/dcp_sharded.go Outdated
Comment thread base/sg_cluster_cfg.go Outdated
Comment thread base/sg_cluster_cfg.go Outdated
Comment thread db/background_mgr_resync_dcp.go Outdated
Comment thread db/background_mgr_resync_dcp.go
@torcolvin torcolvin assigned RIT3shSapata and unassigned torcolvin Apr 7, 2026
@github-actions
Copy link
Copy Markdown

Redocly previews

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.

Comments suppressed due to low confidence (1)

db/background_mgr_resync_dcp.go:286

  • This passes the (potentially cancelled) ctx into Stop calls. If the derived ctx is cancelled to stop the cfg node poller, using it for shutdown can cause cleanup to abort early. Prefer using the original parent ctx (or a non-cancellable context) for Stop/cleanup, and reserve the derived cancelable context just for the poller lifetime.
		defer func() {
			resyncCbgtContext.Stop(ctx)
			resyncHB.Stop(ctx)
		}()

Comment thread base/dcp_sharded_test.go Outdated
Comment thread base/dcp_sharded_test.go Outdated
Comment thread base/dcp_sharded_test.go Outdated
Comment thread base/dcp_sharded.go Outdated
Comment thread base/dcp_sharded.go Outdated
Comment thread db/background_mgr_resync_dcp.go Outdated
Comment thread base/dcp_sharded_test.go Outdated
Comment thread db/background_mgr_resync_dcp.go Outdated
Comment on lines +249 to +250
ctx, cancel := context.WithCancelCause(ctx)
resyncCfg, err := base.NewCfgSG(ctx, db.MetadataStore, db.MetadataKeys.ResyncCfgPrefix(), true)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shadows the incoming ctx with a derived cancelable context. Consider using a separate name (e.g. resyncCtx) for the context you intend to cancel for stopping the cfg node poller, so the parent ctx remains available for other operations and cleanup.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@torcolvin torcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM mostly, I think waiting on this PR to write some more tests is OK in case it changes the code.

Some nits which are mostly go idioms but the Lock should be addressed.

Comment thread base/dcp_sharded.go Outdated
Comment on lines +631 to +635
watcher := make(map[string]uint64, len(p.keyWatcher))
for key, cas := range p.keyWatcher {
watcher[key] = cas
}
return watcher
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

You can use maps.Clone here.

Comment thread base/dcp_sharded.go Outdated
for key, oldCas := range watcher {
newCas, err := p.datastore.Get(key, nil)
if err != nil && !IsDocNotFoundError(err) {
WarnfCtx(p.ctx, "error reading doc: %s %v", UD(key), err)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WarnfCtx(p.ctx, "error reading doc: %s %v", UD(key), err)
WarnfCtx(p.ctx, "cfgNodePoller: error polling doc: %s %v, skipping polling", UD(key), err)

Comment thread base/dcp_sharded.go Outdated
Comment on lines +668 to +675
p.lock.Lock()
currentCas, ok := p.keyWatcher[key]
if !ok || currentCas != oldCas {
p.lock.Unlock()
continue
}
p.keyWatcher[key] = newCas
p.lock.Unlock()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an anti pattern to not call Unlock() in a defer in case this code crashes.

The way around this is to create a function like:

func(p *cfgNodePoller) updateCas(key string, newCas) {
        p.Lock()
        defer p.lock.Unlock()
		p.keyWatcher[key] = newCas
}

to mark whether the function is updated.

I don't see how we need to check the cas again here, I think you can just have a function to update the cas.

Comment thread base/sg_cluster_cfg.go Outdated
Comment on lines +141 to +146
if c.nodePoller != nil {
err := c.nodePoller.Register(c.sgCfgBucketKey(cfgKey))
if err != nil {
return err
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very small nit, prefer non nested code when possible.

Suggested change
if c.nodePoller != nil {
err := c.nodePoller.Register(c.sgCfgBucketKey(cfgKey))
if err != nil {
return err
}
}
if c.nodePoller == nil {
return nil
}
return c.nodePoller.Register(c.sgCfgBucketKey(cfgKey))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants