Skip to content

Commit a15bde1

Browse files
authored
RATIS-2164. LeakDetector has a race condition. (#1163)
1 parent 62ae6d9 commit a15bde1

5 files changed

Lines changed: 320 additions & 102 deletions

File tree

ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java

Lines changed: 90 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222

2323
import java.lang.ref.ReferenceQueue;
2424
import java.lang.ref.WeakReference;
25+
import java.util.ArrayList;
2526
import java.util.Collections;
27+
import java.util.HashMap;
28+
import java.util.List;
2629
import java.util.Set;
27-
import java.util.concurrent.ConcurrentHashMap;
2830
import java.util.concurrent.atomic.AtomicLong;
31+
import java.util.function.Consumer;
32+
import java.util.function.Supplier;
2933

3034
/**
3135
* Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
@@ -55,13 +59,61 @@
5559
*/
5660
public class LeakDetector {
5761
private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);
62+
63+
private static class LeakTrackerSet {
64+
private final Set<LeakTracker> set = Collections.newSetFromMap(new HashMap<>());
65+
66+
synchronized boolean remove(LeakTracker tracker) {
67+
return set.remove(tracker);
68+
}
69+
70+
synchronized void removeExisting(LeakTracker tracker) {
71+
final boolean removed = set.remove(tracker);
72+
Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker);
73+
}
74+
75+
synchronized LeakTracker add(Object referent, ReferenceQueue<Object> queue, Supplier<String> leakReporter) {
76+
final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter);
77+
final boolean added = set.add(tracker);
78+
Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent);
79+
return tracker;
80+
}
81+
82+
synchronized int getNumLeaks(boolean throwException) {
83+
if (set.isEmpty()) {
84+
return 0;
85+
}
86+
87+
int n = 0;
88+
for (LeakTracker tracker : set) {
89+
if (tracker.reportLeak() != null) {
90+
n++;
91+
}
92+
}
93+
if (throwException) {
94+
assertNoLeaks(n);
95+
}
96+
return n;
97+
}
98+
99+
synchronized void assertNoLeaks(int leaks) {
100+
Preconditions.assertTrue(leaks == 0, () -> {
101+
final int size = set.size();
102+
return "#leaks = " + leaks + " > 0, #leaks " + (leaks == size? "==" : "!=") + " set.size = " + size;
103+
});
104+
}
105+
}
106+
58107
private static final AtomicLong COUNTER = new AtomicLong();
59108

60109
private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
61-
private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>());
110+
/** All the {@link LeakTracker}s. */
111+
private final LeakTrackerSet trackers = new LeakTrackerSet();
112+
/** When a leak is discovered, a message is printed and added to this list. */
113+
private final List<String> leakMessages = Collections.synchronizedList(new ArrayList<>());
62114
private final String name;
63115

64-
public LeakDetector(String name) {
116+
LeakDetector(String name) {
65117
this.name = name + COUNTER.getAndIncrement();
66118
}
67119

@@ -80,8 +132,11 @@ private void run() {
80132
LeakTracker tracker = (LeakTracker) queue.remove();
81133
// Original resource already been GCed, if tracker is not closed yet,
82134
// report a leak.
83-
if (allLeaks.remove(tracker)) {
84-
tracker.reportLeak();
135+
if (trackers.remove(tracker)) {
136+
final String leak = tracker.reportLeak();
137+
if (leak != null) {
138+
leakMessages.add(leak);
139+
}
85140
}
86141
} catch (InterruptedException e) {
87142
Thread.currentThread().interrupt();
@@ -93,48 +148,51 @@ private void run() {
93148
LOG.warn("Exiting leak detector {}.", name);
94149
}
95150

96-
public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) {
97-
// A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
151+
Runnable track(Object leakable, Supplier<String> reportLeak) {
152+
// TODO: A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
98153
// if we have proofs that leak tracking impacts performance, or a single LeakDetector
99154
// thread can't keep up with the pace of object allocation.
100155
// For now, it looks effective enough and let keep it simple.
101-
LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak);
102-
allLeaks.add(tracker);
103-
return tracker;
156+
return trackers.add(leakable, queue, reportLeak)::remove;
104157
}
105158

106-
public void assertNoLeaks() {
107-
Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString);
108-
}
159+
public void assertNoLeaks(int maxRetries, TimeDuration retrySleep) throws InterruptedException {
160+
synchronized (leakMessages) {
161+
// leakMessages are all the leaks discovered so far.
162+
Preconditions.assertTrue(leakMessages.isEmpty(),
163+
() -> "#leaks = " + leakMessages.size() + "\n" + leakMessages);
164+
}
109165

110-
String allLeaksString() {
111-
if (allLeaks.isEmpty()) {
112-
return "allLeaks = <empty>";
166+
for(int i = 0; i < maxRetries; i++) {
167+
final int numLeaks = trackers.getNumLeaks(false);
168+
if (numLeaks == 0) {
169+
return;
170+
}
171+
LOG.warn("{}/{}) numLeaks == {} > 0, will wait and retry ...", i, maxRetries, numLeaks);
172+
retrySleep.sleep();
113173
}
114-
allLeaks.forEach(LeakTracker::reportLeak);
115-
return "allLeaks.size = " + allLeaks.size();
174+
trackers.getNumLeaks(true);
116175
}
117176

118-
private static final class LeakTracker extends WeakReference<Object> implements UncheckedAutoCloseable {
119-
private final Set<LeakTracker> allLeaks;
120-
private final Runnable leakReporter;
177+
private static final class LeakTracker extends WeakReference<Object> {
178+
private final Consumer<LeakTracker> removeMethod;
179+
private final Supplier<String> getLeakMessage;
180+
121181
LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
122-
Set<LeakTracker> allLeaks, Runnable leakReporter) {
182+
Consumer<LeakTracker> removeMethod, Supplier<String> getLeakMessage) {
123183
super(referent, referenceQueue);
124-
this.allLeaks = allLeaks;
125-
this.leakReporter = leakReporter;
184+
this.removeMethod = removeMethod;
185+
this.getLeakMessage = getLeakMessage;
126186
}
127187

128-
/**
129-
* Called by the tracked resource when closing.
130-
*/
131-
@Override
132-
public void close() {
133-
allLeaks.remove(this);
188+
/** Called by the tracked resource when the object is completely released. */
189+
void remove() {
190+
removeMethod.accept(this);
134191
}
135192

136-
void reportLeak() {
137-
leakReporter.run();
193+
/** @return the leak message if there is a leak; return null if there is no leak. */
194+
String reportLeak() {
195+
return getLeakMessage.get();
138196
}
139197
}
140198
}

0 commit comments

Comments
 (0)