Skip to content

Commit 60dbee2

Browse files
committed
logpuller: fix region leak after remove subscription (#4267)
close #4217
1 parent 3d1c997 commit 60dbee2

6 files changed

Lines changed: 221 additions & 62 deletions

File tree

logservice/logpuller/region_req_cache.go

Lines changed: 62 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ type requestCache struct {
6060
regionReqs map[SubscriptionID]map[uint64]regionReq
6161
}
6262

63-
// counter for sent but not initialized requests
63+
// pendingCount is a flow control slot counter.
64+
// A slot is acquired when a request is successfully enqueued into pendingQueue (see add),
65+
// and is released when the request is finished/removed (resolve/markStopped/markDone/clear).
66+
// pop and markSent don't change it. If markSent overwrites an existing request for the same region,
67+
// it will release a slot for the replaced request to avoid leaking pendingCount.
6468
pendingCount atomic.Int64
6569
// maximum number of pending requests allowed
6670
maxPendingCount int64
@@ -104,12 +108,10 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
104108
case <-ctx.Done():
105109
return false, ctx.Err()
106110
case c.pendingQueue <- req:
107-
c.incPendingCount()
111+
c.pendingCount.Inc()
108112
cost := time.Since(start)
109113
metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds())
110114
return true, nil
111-
case <-c.spaceAvailable:
112-
continue
113115
case <-ticker.C:
114116
addReqRetryLimit--
115117
if addReqRetryLimit <= 0 {
@@ -135,7 +137,9 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
135137
}
136138
}
137139

138-
// pop gets the next pending request, returns nil if queue is empty
140+
// pop gets the next pending request.
141+
// Note: it doesn't change pendingCount. The slot acquired in add() should be released later
142+
// (e.g. resolve/markStopped/markDone).
139143
func (c *requestCache) pop(ctx context.Context) (regionReq, error) {
140144
select {
141145
case req := <-c.pendingQueue:
@@ -145,7 +149,8 @@ func (c *requestCache) pop(ctx context.Context) (regionReq, error) {
145149
}
146150
}
147151

148-
// markSent marks a request as sent and adds it to sent requests
152+
// markSent marks a request as sent and adds it to sent requests.
153+
// It doesn't change pendingCount: the slot is released when the request is finished/removed.
149154
func (c *requestCache) markSent(req regionReq) {
150155
c.sentRequests.Lock()
151156
defer c.sentRequests.Unlock()
@@ -157,10 +162,20 @@ func (c *requestCache) markSent(req regionReq) {
157162
c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] = m
158163
}
159164

165+
if oldReq, exists := m[req.regionInfo.verID.GetID()]; exists {
166+
log.Warn("region request overwritten",
167+
zap.Uint64("subID", uint64(req.regionInfo.subscribedSpan.subID)),
168+
zap.Uint64("regionID", req.regionInfo.verID.GetID()),
169+
zap.Float64("oldAgeSec", time.Since(oldReq.createTime).Seconds()),
170+
zap.Float64("newAgeSec", time.Since(req.createTime).Seconds()),
171+
zap.Int("pendingCount", int(c.pendingCount.Load())),
172+
zap.Int("pendingQueueLen", len(c.pendingQueue)))
173+
c.markDone()
174+
}
160175
m[req.regionInfo.verID.GetID()] = req
161176
}
162177

163-
// markStopped removes a sent request without changing pending count (for stopped regions)
178+
// markStopped removes a sent request and releases a slot.
164179
func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
165180
c.sentRequests.Lock()
166181
defer c.sentRequests.Unlock()
@@ -176,12 +191,7 @@ func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
176191
}
177192

178193
delete(regionReqs, regionID)
179-
c.decPendingCount()
180-
// Notify waiting add operations that there's space available
181-
select {
182-
case c.spaceAvailable <- struct{}{}:
183-
default: // If channel is full, skip notification
184-
}
194+
c.markDone()
185195
}
186196

187197
// resolve marks a region as initialized and removes it from sent requests
@@ -201,19 +211,14 @@ func (c *requestCache) resolve(subscriptionID SubscriptionID, regionID uint64) b
201211
// Check if the subscription ID matches
202212
if req.regionInfo.subscribedSpan.subID == subscriptionID {
203213
delete(regionReqs, regionID)
204-
c.decPendingCount()
214+
c.markDone()
205215
cost := time.Since(req.createTime).Seconds()
206216
if cost > 0 && cost < abnormalRequestDurationInSec {
207217
log.Debug("cdc resolve region request", zap.Uint64("subID", uint64(subscriptionID)), zap.Uint64("regionID", regionID), zap.Float64("cost", cost), zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("pendingQueueLen", len(c.pendingQueue)))
208218
metrics.RegionRequestFinishScanDuration.Observe(cost)
209219
} else {
210220
log.Info("region request duration abnormal, skip metric", zap.Float64("cost", cost), zap.Uint64("regionID", regionID))
211221
}
212-
// Notify waiting add operations that there's space available
213-
select {
214-
case c.spaceAvailable <- struct{}{}:
215-
default: // If channel is full, skip notification
216-
}
217222
return true
218223
}
219224

@@ -235,8 +240,8 @@ func (c *requestCache) clearStaleRequest() {
235240
regionReq.regionInfo.subscribedSpan.stopped.Load() ||
236241
regionReq.regionInfo.lockedRangeState.Initialized.Load() ||
237242
regionReq.isStale() {
238-
c.decPendingCount()
239-
log.Info("region worker delete stale region request",
243+
c.markDone()
244+
log.Warn("region worker delete stale region request",
240245
zap.Uint64("subID", uint64(subID)),
241246
zap.Uint64("regionID", regionID),
242247
zap.Int("pendingCount", int(c.pendingCount.Load())),
@@ -247,17 +252,27 @@ func (c *requestCache) clearStaleRequest() {
247252
zap.Time("createTime", regionReq.createTime))
248253
delete(regionReqs, regionID)
249254
} else {
250-
reqCount += 1
255+
reqCount++
251256
}
252257
}
253258
if len(regionReqs) == 0 {
254259
delete(c.sentRequests.regionReqs, subID)
255260
}
256261
}
257262

258-
if reqCount == 0 && c.pendingCount.Load() != 0 {
259-
log.Info("region worker pending request count is not equal to actual region request count, correct it", zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("actualReqCount", reqCount))
263+
// If there are no in-cache region requests but pendingCount isn't 0, it means pendingCount is stale.
264+
// Reset it to avoid blocking add() forever.
265+
if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 {
266+
log.Info("region worker pending request count is not equal to actual region request count, correct it",
267+
zap.Int("pendingCount", int(c.pendingCount.Load())),
268+
zap.Int("actualReqCount", reqCount),
269+
zap.Int("pendingQueueLen", len(c.pendingQueue)))
260270
c.pendingCount.Store(0)
271+
// Notify waiting add operations that there's space available.
272+
select {
273+
case c.spaceAvailable <- struct{}{}:
274+
default:
275+
}
261276
}
262277

263278
c.lastCheckStaleRequestTime.Store(time.Now())
@@ -273,7 +288,7 @@ LOOP:
273288
select {
274289
case req := <-c.pendingQueue:
275290
regions = append(regions, req.regionInfo)
276-
c.decPendingCount()
291+
c.markDone()
277292
default:
278293
break LOOP
279294
}
@@ -286,7 +301,7 @@ LOOP:
286301
for regionID := range regionReqs {
287302
regions = append(regions, regionReqs[regionID].regionInfo)
288303
delete(regionReqs, regionID)
289-
c.decPendingCount()
304+
c.markDone()
290305
}
291306
delete(c.sentRequests.regionReqs, subID)
292307
}
@@ -298,17 +313,26 @@ func (c *requestCache) getPendingCount() int {
298313
return int(c.pendingCount.Load())
299314
}
300315

301-
func (c *requestCache) incPendingCount() {
302-
c.pendingCount.Inc()
303-
}
304-
305-
func (c *requestCache) decPendingCount() {
306-
// Ensure pendingCount doesn't go below 0
307-
current := c.pendingCount.Load()
308-
newCount := current - int64(1)
309-
if newCount < 0 {
310-
c.pendingCount.Store(0)
311-
return
316+
func (c *requestCache) markDone() {
317+
// Decrement pendingCount by 1, but never let it go below 0.
318+
// Do it with CAS to avoid clobbering concurrent Inc() calls.
319+
for {
320+
old := c.pendingCount.Load()
321+
if old == 0 {
322+
break
323+
} else if old < 0 {
324+
if c.pendingCount.CompareAndSwap(old, 0) {
325+
break
326+
}
327+
} else {
328+
if c.pendingCount.CompareAndSwap(old, old-1) {
329+
break
330+
}
331+
}
332+
}
333+
// Notify waiting add operations that there's space available.
334+
select {
335+
case c.spaceAvailable <- struct{}{}:
336+
default: // If channel is full, skip notification
312337
}
313-
c.pendingCount.Dec()
314338
}

logservice/logpuller/region_req_cache_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,35 @@ func TestRequestCacheAdd_WithStoppedRegion(t *testing.T) {
279279
// The stopped region should be cleaned up
280280
require.Equal(t, 0, cache.getPendingCount())
281281
}
282+
283+
func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) {
284+
cache := newRequestCache(10)
285+
ctx := context.Background()
286+
287+
region := createTestRegionInfo(1, 1)
288+
289+
ok, err := cache.add(ctx, region, false)
290+
require.True(t, ok)
291+
require.NoError(t, err)
292+
293+
// Add a duplicate request for the same region. It should not leak pendingCount even if
294+
// markSent overwrites the existing entry.
295+
ok, err = cache.add(ctx, region, false)
296+
require.True(t, ok)
297+
require.NoError(t, err)
298+
require.Equal(t, 2, cache.getPendingCount())
299+
300+
req1, err := cache.pop(ctx)
301+
require.NoError(t, err)
302+
cache.markSent(req1)
303+
require.Equal(t, 2, cache.getPendingCount())
304+
305+
req2, err := cache.pop(ctx)
306+
require.NoError(t, err)
307+
cache.markSent(req2)
308+
require.Equal(t, 1, cache.getPendingCount())
309+
310+
// Finish the remaining tracked request.
311+
require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID()))
312+
require.Equal(t, 0, cache.getPendingCount())
313+
}

logservice/logpuller/region_request_worker.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,17 @@ func newRegionRequestWorker(
8484
zap.String("addr", store.storeAddr))
8585
}
8686
for {
87-
region, err := worker.requestCache.pop(ctx)
87+
req, err := worker.requestCache.pop(ctx)
8888
if err != nil {
8989
return err
9090
}
91-
if !region.regionInfo.isStopped() {
92-
worker.preFetchForConnecting = new(regionInfo)
93-
*worker.preFetchForConnecting = region.regionInfo
94-
return nil
95-
} else {
91+
if req.regionInfo.isStopped() {
92+
worker.requestCache.markDone()
9693
continue
9794
}
95+
worker.preFetchForConnecting = new(regionInfo)
96+
*worker.preFetchForConnecting = req.regionInfo
97+
return nil
9898
}
9999
}
100100

@@ -298,10 +298,27 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res
298298
zap.Any("regionIDs", resolvedTsEvent.Regions))
299299
return
300300
}
301-
resolvedStates := make([]*regionFeedState, 0, len(resolvedTsEvent.Regions))
301+
// Avoid allocating a huge states slice when resolvedTsEvent.Regions is large.
302+
// Push resolved-ts events in batches to reduce peak memory usage and improve GC behavior.
303+
const resolvedTsStateBatchSize = 1024
304+
resolvedStates := make([]*regionFeedState, 0, resolvedTsStateBatchSize)
305+
flush := func() {
306+
if len(resolvedStates) == 0 {
307+
return
308+
}
309+
states := resolvedStates
310+
s.client.pushRegionEventToDS(subscriptionID, regionEvent{
311+
resolvedTs: resolvedTsEvent.Ts,
312+
states: states,
313+
})
314+
resolvedStates = make([]*regionFeedState, 0, resolvedTsStateBatchSize)
315+
}
302316
for _, regionID := range resolvedTsEvent.Regions {
303317
if state := s.getRegionState(subscriptionID, regionID); state != nil {
304318
resolvedStates = append(resolvedStates, state)
319+
if len(resolvedStates) >= resolvedTsStateBatchSize {
320+
flush()
321+
}
305322
continue
306323
}
307324
log.Warn("region request worker receives a resolved ts event for an untracked region",
@@ -310,13 +327,7 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res
310327
zap.Uint64("regionID", regionID),
311328
zap.Uint64("resolvedTs", resolvedTsEvent.Ts))
312329
}
313-
if len(resolvedStates) == 0 {
314-
return
315-
}
316-
s.client.pushRegionEventToDS(subscriptionID, regionEvent{
317-
resolvedTs: resolvedTsEvent.Ts,
318-
states: resolvedStates,
319-
})
330+
flush()
320331
}
321332

322333
// processRegionSendTask receives region requests from the channel and sends them to the remote store.
@@ -374,6 +385,7 @@ func (s *regionRequestWorker) processRegionSendTask(
374385
},
375386
FilterLoop: region.filterLoop,
376387
}
388+
s.requestCache.markDone()
377389
if err := doSend(req); err != nil {
378390
return err
379391
}
@@ -384,17 +396,18 @@ func (s *regionRequestWorker) processRegionSendTask(
384396
}
385397
s.client.pushRegionEventToDS(subID, regionEvent)
386398
}
387-
388399
} else if region.subscribedSpan.stopped.Load() {
389400
// It can be skipped directly because there must be no pending states from
390401
// the stopped subscribedTable, or the special singleRegionInfo for stopping
391402
// the table will be handled later.
392403
s.client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{}))
404+
s.requestCache.markDone()
393405
} else {
394406
state := newRegionFeedState(region, uint64(subID), s)
395407
state.start()
396408
s.addRegionState(subID, region.verID.GetID(), state)
397409
if err := doSend(s.createRegionRequest(region)); err != nil {
410+
s.requestCache.markDone()
398411
return err
399412
}
400413
s.requestCache.markSent(regionReq)
@@ -485,6 +498,9 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
485498
region := *s.preFetchForConnecting
486499
s.preFetchForConnecting = nil
487500
regions = append(regions, region)
501+
// The pre-fetched region was popped from pendingQueue but hasn't been marked as sent or done yet.
502+
// Release its pendingCount slot to avoid leaking flow control credits on worker failures.
503+
s.requestCache.markDone()
488504
}
489505

490506
// Clear all regions from cache

logservice/logpuller/region_request_worker_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package logpuller
1515

1616
import (
17+
"context"
1718
"testing"
1819

1920
"github.com/stretchr/testify/require"
@@ -38,3 +39,28 @@ func TestRegionStatesOperation(t *testing.T) {
3839
require.Nil(t, worker.getRegionState(1, 2))
3940
require.Equal(t, 0, len(worker.requestedRegions.subscriptions))
4041
}
42+
43+
func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) {
44+
worker := &regionRequestWorker{
45+
requestCache: newRequestCache(10),
46+
}
47+
48+
ctx := context.Background()
49+
region := createTestRegionInfo(1, 1)
50+
51+
ok, err := worker.requestCache.add(ctx, region, false)
52+
require.NoError(t, err)
53+
require.True(t, ok)
54+
55+
req, err := worker.requestCache.pop(ctx)
56+
require.NoError(t, err)
57+
require.Equal(t, 1, worker.requestCache.getPendingCount())
58+
59+
worker.preFetchForConnecting = new(regionInfo)
60+
*worker.preFetchForConnecting = req.regionInfo
61+
62+
regions := worker.clearPendingRegions()
63+
require.Len(t, regions, 1)
64+
require.Nil(t, worker.preFetchForConnecting)
65+
require.Equal(t, 0, worker.requestCache.getPendingCount())
66+
}

0 commit comments

Comments
 (0)