Skip to content

Protobuf Support #1329

@VitaliyKulikov

Description

@VitaliyKulikov

I am trying to use org.springframework.cloud:spring-cloud-function-rsocket on the server side, with:

  @Bean
  Function<String, Fixture> fixtures() {
    return name -> Fixture.newBuilder().setName(name).build();
  }

  @Bean
  RSocketStrategiesCustomizer strategiesCustomizer() {
    return strategies -> strategies
.encoder(new org.springframework.http.codec.protobuf.ProtobufEncoder())
.decoder(new org.springframework.http.codec.protobuf.ProtobufDecoder());
  }

where Fixture is proto.

During execution, the following exception is shown:

org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public abstract R java.util.function.Function.apply(T): Cannot decode to [java.lang.Object]GenericMessage [payload=FluxPeekFuseable, headers={dataBufferFactory=NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true)), rsocketRequester=org.springframework.messaging.rsocket.DefaultRSocketRequester@83f96b6, rsocketResponse=null, lookupDestination=fixtures, contentType=application/octet-stream, rsocketFrameType=REQUEST_STREAM}], failedMessage=GenericMessage [payload=FluxPeekFuseable, headers={dataBufferFactory=NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true)), rsocketRequester=org.springframework.messaging.rsocket.DefaultRSocketRequester@83f96b6, rsocketResponse=null, lookupDestination=fixtures, contentType=application/octet-stream, rsocketFrameType=REQUEST_STREAM}]

Client is good, as working perfectly with the server-side using RSocketExchange:

public interface ProtobufService {

  @RSocketExchange("fixtures")
  Flux<Fixture> fixtures(String request);
}

@Controller
public class ProtobufController implements ProtobufService {

  public Flux<Fixture> fixtures(String request) {
    return Flux.just(Fixture.newBuilder().setName(request).build()).log();
  }
}

client:

public interface Protobuf {

  @RSocketExchange("fixtures")
  Flux<Fixture> fixtures(@Payload String request);
}


  @Bean
  RSocketRequester requester(RSocketRequester.Builder builder) {
    return builder
        .rsocketConnector(configurer -> configurer.resume(new Resume()))
        .rsocketStrategies(
            configurer -> configurer
            .encoder(new ProtobufEncoder())
            .decoder(new ProtobufDecoder()))
        .dataMimeType(MimeTypeUtils.APPLICATION_OCTET_STREAM)
        .tcp("127.0.0.1", 2222);
  }

  @Bean
  @SuppressWarnings("null")
  RSocketServiceProxyFactory factory(RSocketRequester requester) {
    return RSocketServiceProxyFactory.builder(requester).build();
  }

  @Bean
  Protobuf services(RSocketRequester requester, RSocketServiceProxyFactory factory) {
    return factory.createClient(Protobuf.class);
  }

  @Bean
  Disposable protobuf(Protobuf services) {
    return Flux.just("one", "two", "three", "four", "five")
        .flatMap(services::fixtures)
        .log("client-protobuf")
        .subscribe();
  }

Any ideas?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions