Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 82 additions & 36 deletions lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,12 @@
*/
package org.apache.avro.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.IntFunction;

import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.Resolver;
import org.apache.avro.Resolver.Action;
import org.apache.avro.Resolver.Container;
import org.apache.avro.Resolver.EnumAdjust;
import org.apache.avro.Resolver.Promote;
import org.apache.avro.Resolver.ReaderUnion;
import org.apache.avro.Resolver.RecordAdjust;
import org.apache.avro.Resolver.Skip;
import org.apache.avro.Resolver.WriterUnion;
import org.apache.avro.Schema;
import org.apache.avro.*;
import org.apache.avro.Resolver.*;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;

import org.apache.avro.generic.*;
import org.apache.avro.generic.GenericData.InstanceSupplier;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.FastReaderBuilder.RecordReader.Stage;
import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
import org.apache.avro.reflect.ReflectionUtil;
Expand All @@ -57,6 +30,18 @@
import org.apache.avro.util.WeakIdentityHashMap;
import org.apache.avro.util.internal.Accessor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.IntFunction;

public class FastReaderBuilder {

/**
Expand All @@ -65,9 +50,30 @@ public class FastReaderBuilder {
*/
private final GenericData data;

/** first schema is reader schema, second is writer schema */
private final Map<Schema, Map<Schema, RecordReader>> readerCache = Collections
.synchronizedMap(new WeakIdentityHashMap<>());
/**
* first schema is reader schema, second is writer schema As the RecordReader
* holds a strong reference to the reader cache. We rely on the writer cache
* being dereferenced, casing the inner map entry to remove the RecordReader.
* This in turn allows the outer Map entry to become weakly accessible and
* reclaimed.
* <p>
* Its essential that the reader and writer cache are no the same object, and in
* that case we use the readerSimpleCache. If we used this cache we would never
* release the memory, as the value would retain the strong reference
* <p>
* All access to the readerCache must consider thread safety, and use
* appropriate concurrent safe APIs
*/
private final Map<Schema, Map<Schema, RecordReader>> readerCache = new WeakIdentityHashMap<>();

/**
* serves the same purpose as readerCache, but specifically used when the reader
* and writer Schemas are the same object. For the readerSimpleCache we rely on
* the holding only a soft reference to the RecordReader, which should
* eventually be cleared by the garbage collector. All access to the readerCache
* must consider thread safety, and use appropriate concurrent safe APIs
*/
private final Map<Schema, SoftRef> readerSimpleCache = new WeakIdentityHashMap<>();

private boolean keyClassEnabled = true;

Expand Down Expand Up @@ -242,9 +248,49 @@ private IntFunction<Conversion<?>> getConversionSupplier(Object record) {
}
}

private static class SoftRef extends SoftReference<RecordReader> {
RecordReader hardRef;

public SoftRef(RecordReader referent) {
super(referent);
hardRef = referent;
}

RecordReader getAndClearHardRef() {
RecordReader result = hardRef;
hardRef = null;
if (result == null)
result = get();
return result;
}
}

private RecordReader getRecordReaderFromCache(Schema readerSchema, Schema writerSchema) {
return readerCache.computeIfAbsent(readerSchema, k -> new WeakIdentityHashMap<>()).computeIfAbsent(writerSchema,
k -> new RecordReader());

if (writerSchema != readerSchema) {
return readerCache.computeIfAbsent(readerSchema, k -> new WeakIdentityHashMap<>()).computeIfAbsent(writerSchema,
k -> new RecordReader());
}
while (true) {
// Note - there is a chance that 2 threads concurrently access
// if they do only one will generate a value (if one is needed), but
// getAndClearHardRef may (in theory at least) return null if the hardRef and
// the SoftRef is enqueued in the race
// it seems unlikely, but that the reason we have this loop. If we repeatedly
// loop, then at least
// one thread processes
SoftRef softRef = readerSimpleCache.compute(readerSchema, (schema, ref) -> {
RecordReader result = (ref == null) ? null : ref.get();
if (result == null) {
return new SoftRef(new RecordReader());
}
ref.hardRef = result;
return ref;
});
RecordReader result = softRef.getAndClearHardRef();
if (result != null)
return result;
}
}

private FieldReader applyConversions(Schema readerSchema, FieldReader reader, Conversion<?> explicitConversion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Implements a combination of WeakHashMap and IdentityHashMap. Useful for
Expand All @@ -35,13 +37,17 @@
* implements the Map interface, it intentionally violates Map's general
* contract, which mandates the use of the equals method when comparing objects.
* This class is designed for use only in the rare cases wherein
* reference-equality semantics are required.
*
* Note that this implementation is not synchronized. </b>
* reference-equality semantics are required. It also violates the contract with
* respect to the {@link Map#entrySet()}, {@link Map#keySet()} ()},
* {@link Map#values()} methods, which return collections that are snapshots,
* and not backed by this map * Note that this implementation is not
* synchronized, but the backing store is a ConcurrentHashMap. Caller must
* decide what additional protection is required in the case of concurrent
* access.</b>b>
*/
public class WeakIdentityHashMap<K, V> implements Map<K, V> {
private final ReferenceQueue<K> queue = new ReferenceQueue<>();
private Map<IdentityWeakReference, V> backingStore = new ConcurrentHashMap<>();
private final Map<IdentityWeakReference<K>, V> backingStore = new ConcurrentHashMap<>();

public WeakIdentityHashMap() {
}
Expand All @@ -53,9 +59,10 @@ public void clear() {
}

@Override
@SuppressWarnings("unchecked")
public boolean containsKey(Object key) {
reap();
return backingStore.containsKey(new IdentityWeakReference(key));
return backingStore.containsKey(makeKey(key));
}

@Override
Expand All @@ -64,11 +71,14 @@ public boolean containsValue(Object value) {
return backingStore.containsValue(value);
}

// NOTE - this breaks the general contract in that the returned value is not
// backed by this object
// so changes to this map are not reflected in the value previously returned
@Override
public Set<Map.Entry<K, V>> entrySet() {
reap();
Set<Map.Entry<K, V>> ret = new HashSet<>();
for (Map.Entry<IdentityWeakReference, V> ref : backingStore.entrySet()) {
for (Map.Entry<IdentityWeakReference<K>, V> ref : backingStore.entrySet()) {
final K key = ref.getKey().get();
final V value = ref.getValue();
Map.Entry<K, V> entry = new Map.Entry<K, V>() {
Expand All @@ -92,34 +102,40 @@ public V setValue(V value) {
return Collections.unmodifiableSet(ret);
}

// NOTE - this breaks the general contract in that the returned value is not
// backed by this object
// so changes to this map are not reflected in the value previously returned
@Override
public Set<K> keySet() {
reap();
Set<K> ret = new HashSet<>();
for (IdentityWeakReference ref : backingStore.keySet()) {
for (IdentityWeakReference<K> ref : backingStore.keySet()) {
ret.add(ref.get());
}
return Collections.unmodifiableSet(ret);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof WeakIdentityHashMap)) {
return false;
}
return backingStore.equals(((WeakIdentityHashMap) o).backingStore);
return backingStore.equals(((WeakIdentityHashMap<K, V>) o).backingStore);
}

@Override
public V get(Object key) {
reap();
return backingStore.get(new IdentityWeakReference(key));
return backingStore.get(makeKey(key));
}

@Override
public V put(K key, V value) {
reap();
return backingStore.put(new IdentityWeakReference(key), value);
return backingStore.put(makeKey(key), value);
}

@Override
Expand All @@ -142,7 +158,12 @@ public void putAll(Map t) {
@Override
public V remove(Object key) {
reap();
return backingStore.remove(new IdentityWeakReference(key));
return backingStore.remove(makeKey(key));
}

@SuppressWarnings("unchecked")
private IdentityWeakReference<K> makeKey(Object key) {
return new IdentityWeakReference<>((K) key, queue);
}

@Override
Expand All @@ -151,28 +172,78 @@ public int size() {
return backingStore.size();
}

// NOTE - this breaks the general contract in that the returned value is not
// backed by this object
// so changes to this map are not reflected in the value previously returned
@Override
public Collection<V> values() {
reap();
return backingStore.values();
}

@Override
public V putIfAbsent(K key, V value) {
reap();
return backingStore.putIfAbsent(makeKey(key), value);
}

@Override
public boolean remove(Object key, Object value) {
reap();
return backingStore.remove(makeKey(key), value);
}

@Override
public boolean replace(K key, V oldValue, V newValue) {
reap();
return backingStore.replace(makeKey(key), oldValue, newValue);
}

@Override
public V replace(K key, V value) {
reap();
return backingStore.replace(makeKey(key), value);
}

@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
reap();
return backingStore.computeIfAbsent(makeKey(key), k -> mappingFunction.apply(key));
}

@Override
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
reap();
return backingStore.computeIfPresent(makeKey(key), (k, v) -> remappingFunction.apply(key, v));
}

@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
reap();
return backingStore.compute(makeKey(key), (k, v) -> remappingFunction.apply(key, v));
}

@Override
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
reap();
return backingStore.merge(makeKey(key), value, remappingFunction);
}

private synchronized void reap() {
Object zombie = queue.poll();

while (zombie != null) {
IdentityWeakReference victim = (IdentityWeakReference) zombie;
IdentityWeakReference<?> victim = (IdentityWeakReference<?>) zombie;
backingStore.remove(victim);
zombie = queue.poll();
}
}

class IdentityWeakReference extends WeakReference<K> {
static class IdentityWeakReference<K> extends WeakReference<K> {
int hash;

@SuppressWarnings("unchecked")
IdentityWeakReference(Object obj) {
super((K) obj, queue);
IdentityWeakReference(K obj, ReferenceQueue<K> queue) {
super(obj, queue);
hash = System.identityHashCode(obj);
}

Expand All @@ -189,7 +260,7 @@ public boolean equals(Object o) {
if (!(o instanceof WeakIdentityHashMap.IdentityWeakReference)) {
return false;
}
IdentityWeakReference ref = (IdentityWeakReference) o;
IdentityWeakReference<?> ref = (IdentityWeakReference<?>) o;
return this.get() == ref.get();
}
}
Expand Down
Loading