Project Reactor support

Introduction

Project Reactor support for Flux and Mono are provided from the Rx Project Reactor extension.  This allows you to develop applications in a reactive fashion on both the client and server side.

org.apache.cxf/cxf-rt-rs-extension-reactor/3.2.3 and io.projectreactor/reactor-core/3.1.0.RELEASE dependencies are required.

Client

The following simple example uses ObservableRxInvoker. org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker can be used if needed instead.  Reviewing our systests for reactive may help as well.

String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
List<HelloWorldBean> collector = new ArrayList<>();
ClientBuilder.newClient()
        .register(new JacksonJsonProvider())
        .register(new ReactorInvokerProvider())
        .target(address)
        .request("application/json")
        .rx(ReactorInvoker.class)
        .get(HelloWorldBean.class)
        .doOnNext(collector::add)
        .subscribe();
// make sure to do a Thread.sleep or wait for the response to come back if you're trying to collect

Server

As a method return value

One simply returns reactor.core.publisher.Mono or reactor.core.publisher.Flux from the method and the runtime will make sure the response is finalized once the Flux/Mono flow is complete.

The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.reactor.server.ReactorInvoker. It does all the default JAXRSInvoker does and only checks if Flux or Mono are returned - if yes then it links it internally with the JAX-RS AsyncResponse.  The invoker can be automatically registered with a lot of sensible defaults by using org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer.

The built in invoker handles Flux and Mono based on proper semantics.  For instance, reading a JAX-RS response object is always a Mono (0 to 1 elements), but any method containing flux will convert to a Flux operation meant to contain 0 to n elements.  You would typically use a Flux on a get with an arbitrary data set converted from a JSON array, Mono would likely be the right solution for any time you want to read the Response object.

Combining Flux/Mono with AsyncResponse

JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setProvider(new JacksonJsonProvider());
new ReactorCustomizer().customize(sf); // use a JAXRSServerFactoryCustomizationExtension to customize the server
sf.setResourceClasses(FluxService.class);
sf.setResourceProvider(FluxService.class,
                       new SingletonResourceProvider(new FluxService(), true));
sf.setAddress("http://localhost:" + PORT + "/");
server = sf.create();

 

 

import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream")
public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
    Flux.just("Hello", "Ciao")
            .map(HelloWorldBean::new)
            .subscribeOn(Schedulers.parallel())
            .subscribe(new JsonStreamingAsyncSubscriber<>(ar));
}
 
// or you can just return the Flux
@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream2")
public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() {
    return Flux.just("Hello", "Ciao")
            .map(HelloWorldBean::new)
            .subscribeOn(Schedulers.parallel());
}