|
16 | 16 |
|
17 | 17 | package org.springframework.http.codec.json; |
18 | 18 |
|
| 19 | +import java.util.List; |
| 20 | +import java.util.Map; |
| 21 | +import java.util.Objects; |
19 | 22 | import java.util.function.Predicate; |
20 | 23 |
|
| 24 | +import kotlinx.serialization.KSerializer; |
| 25 | +import kotlinx.serialization.builtins.BuiltinSerializersKt; |
21 | 26 | import kotlinx.serialization.json.Json; |
| 27 | +import org.jspecify.annotations.Nullable; |
| 28 | +import org.reactivestreams.Publisher; |
| 29 | +import reactor.core.publisher.Flux; |
| 30 | +import reactor.core.publisher.Mono; |
22 | 31 |
|
23 | 32 | import org.springframework.core.ResolvableType; |
| 33 | +import org.springframework.core.codec.DecodingException; |
| 34 | +import org.springframework.core.io.buffer.DataBuffer; |
24 | 35 | import org.springframework.http.MediaType; |
25 | 36 | import org.springframework.http.codec.KotlinSerializationStringDecoder; |
26 | 37 | import org.springframework.util.MimeType; |
@@ -96,4 +107,37 @@ public KotlinSerializationJsonDecoder(Json json, Predicate<ResolvableType> typeP |
96 | 107 | super(json, typePredicate, DEFAULT_JSON_MIME_TYPES); |
97 | 108 | } |
98 | 109 |
|
| 110 | + @Override |
| 111 | + public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType, |
| 112 | + @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
| 113 | + return Flux.defer(() -> { |
| 114 | + KSerializer<Object> serializer = serializer(elementType); |
| 115 | + if (serializer == null) { |
| 116 | + return Mono.error(new DecodingException("Could not find KSerializer for " + elementType)); |
| 117 | + } |
| 118 | + return this.stringDecoder |
| 119 | + .decode(inputStream, elementType, mimeType, hints) |
| 120 | + .switchOnFirst((signal, flux) -> { |
| 121 | + if (signal.hasValue()) { |
| 122 | + String value = Objects.requireNonNull(signal.get()); |
| 123 | + if (value.stripLeading().startsWith("[") && !List.class.isAssignableFrom(elementType.toClass())) { |
| 124 | + KSerializer<List<Object>> listSerializer = BuiltinSerializersKt.ListSerializer(serializer); |
| 125 | + return flux |
| 126 | + .flatMapIterable(string -> format().decodeFromString(listSerializer, string)) |
| 127 | + .onErrorMap(IllegalArgumentException.class, this::processException); |
| 128 | + } |
| 129 | + return flux.handle((string, sink) -> { |
| 130 | + try { |
| 131 | + sink.next(format().decodeFromString(serializer, string)); |
| 132 | + } |
| 133 | + catch (IllegalArgumentException ex) { |
| 134 | + sink.error(processException(ex)); |
| 135 | + } |
| 136 | + }); |
| 137 | + } |
| 138 | + return flux; |
| 139 | + }); |
| 140 | + }); |
| 141 | + } |
| 142 | + |
99 | 143 | } |
0 commit comments