Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
* using {@link StandardCharsets#UTF_8},
* a variable-length character encoding.
*/
public final class StringCodec extends StringCodecBase {
private static final StringCodec CODEC = new StringCodec();
public final class StringCodec extends StringCodecBase.WithFallback {
private static final StringCodec CODEC_WITH_FALLBACK = new StringCodec();
private static final Codec<String> CODEC_NO_FALLBACK = new StringCodecBase(StandardCharsets.UTF_8) { };

public static StringCodec get() {
return CODEC;
return CODEC_WITH_FALLBACK;
}

public static Codec<String> getCodecNoFallback() {
return CODEC_NO_FALLBACK;
}

private StringCodec() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,29 @@ private <E extends Exception> PutToByteBuffer<E> encode(
};
}

String decode(ByteBuffer buffer) {
String decodeNoFallback(ByteBuffer buffer) throws CodecException {
try {
return newDecoder().decode(buffer.asReadOnlyBuffer()).toString();
} catch (Exception e) {
throw new CodecException("Failed to decode " + buffer, e);
}
}

String decodeWithFallback(ByteBuffer buffer) {
Runnable error = null;
try {
return newDecoder().decode(buffer.asReadOnlyBuffer()).toString();
} catch (Exception e) {
error = () -> LOG.warn("Failed to decode buffer with " + charset
+ ", buffer = (hex) " + StringUtils.bytes2Hex(buffer), e);
error = () -> LOG.warn("Failed to decode buffer with {}, buffer = (hex) {}",
charset, StringUtils.bytes2Hex(buffer, 20), e);

// For compatibility, try decoding using StringUtils.
final String decoded = StringUtils.bytes2String(buffer, charset);
// Decoded successfully, update error message.
error = () -> LOG.warn("Decode (hex) " + StringUtils.bytes2Hex(buffer, 20)
+ "\n Attempt failed : " + charset + " (see exception below)"
+ "\n Retry succeeded: decoded to " + decoded, e);
error = () -> LOG.warn("Decode (hex) {}" +
"\n Attempt failed : {} (see exception below)" +
"\n Retry succeeded: decoded to {}",
StringUtils.bytes2Hex(buffer, 20), charset, decoded, e);
return decoded;
} finally {
if (error != null) {
Expand Down Expand Up @@ -177,8 +186,8 @@ public CodecBuffer toCodecBuffer(@Nonnull String object, CodecBuffer.Allocator a
}

@Override
public String fromCodecBuffer(@Nonnull CodecBuffer buffer) {
return decode(buffer.asReadOnlyByteBuffer());
public String fromCodecBuffer(@Nonnull CodecBuffer buffer) throws CodecException {
return decodeNoFallback(buffer.asReadOnlyByteBuffer());
}

@Override
Expand All @@ -187,12 +196,28 @@ public byte[] toPersistedFormat(String object) throws CodecException {
}

@Override
public String fromPersistedFormat(byte[] bytes) {
return decode(ByteBuffer.wrap(bytes));
public String fromPersistedFormat(byte[] bytes) throws CodecException {
return decodeNoFallback(ByteBuffer.wrap(bytes));
}

@Override
public String copyObject(String object) {
return object;
}

static class WithFallback extends StringCodecBase {
WithFallback(Charset charset) {
super(charset);
}

@Override
public String fromCodecBuffer(@Nonnull CodecBuffer buffer) {
return decodeWithFallback(buffer.asReadOnlyByteBuffer());
}

@Override
public String fromPersistedFormat(byte[] bytes) {
return decodeWithFallback(ByteBuffer.wrap(bytes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* a fixed-length one-byte-per-character encoding,
* i.e. the serialized size equals to {@link String#length()}.
*/
public final class FixedLengthStringCodec extends StringCodecBase {
public final class FixedLengthStringCodec extends StringCodecBase.WithFallback {

private static final FixedLengthStringCodec INSTANCE
= new FixedLengthStringCodec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static String bytes2String(byte[] bytes) {
}

static String bytes2String(ByteBuffer bytes) {
return StringCodec.get().decode(bytes);
return StringCodec.get().decodeWithFallback(bytes);
}

static RocksDatabaseException toRocksDatabaseException(Object name, String op, RocksDBException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.primitives.Shorts;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
Expand Down Expand Up @@ -141,6 +142,20 @@ static void runTestLongs(long original) {
assertEquals(original, codec.fromPersistedFormat(bytes));
}

@Test
public void testStringCodecMalformedUtf8String() throws Exception {
final byte[] malformed = new byte[] {(byte) 0xC3, (byte) '/', 0, 0, 0, 1};

// StringCodec.getCodecNoFallback() should throw CodecException
assertThrows(CodecException.class,
() -> StringCodec.getCodecNoFallback().fromPersistedFormat(malformed));

// StringCodec.get() will replace malformed characters.
final String decoded = StringCodec.get().fromPersistedFormat(malformed);
final byte[] encoded = StringCodec.get().toPersistedFormat(decoded);
assertFalse(Arrays.equals(malformed, encoded));
}

@Test
public void testStringCodec() throws Exception {
assertFalse(StringCodec.get().isFixedLength());
Expand Down Expand Up @@ -183,6 +198,7 @@ public void testStringCodec() throws Exception {
static int runTestStringCodec(String original) throws Exception {
final int serializedSize = UTF_8.encode(original).remaining();
runTest(StringCodec.get(), original, serializedSize);
runTest(StringCodec.getCodecNoFallback(), original, serializedSize);
return serializedSize;
}

Expand All @@ -204,7 +220,7 @@ public void testFixedLengthStringCodec() throws Exception {


final String multiByteChars = "Ozone 是 Hadoop 的分布式对象存储系统,具有易扩展和冗余存储的特点。";
assertThrows(IOException.class,
assertThrows(CodecException.class,
tryCatch(() -> runTestFixedLengthStringCodec(multiByteChars)));
assertThrows(IllegalStateException.class,
tryCatch(() -> FixedLengthStringCodec.string2Bytes(multiByteChars)));
Expand Down