Project Reactor support
Introduction
Project Reactor support for Flux and Mono are provided from the Rx Project Reactor extension. This allows you to develop applications in a reactive fashion on both the client and server side.
org.apache.cxf/cxf-rt-rs-extension-reactor/3.2.3 and io.projectreactor/reactor-core/3.1.0.RELEASE dependencies are required.
Client
The following simple example uses ObservableRxInvoker. org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker can be used if needed instead. Reviewing our systests for reactive may help as well.
String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
List<HelloWorldBean> collector = new ArrayList<>();
ClientBuilder.newClient()
.register(new JacksonJsonProvider())
.register(new ReactorInvokerProvider())
.target(address)
.request("application/json")
.rx(ReactorInvoker.class)
.get(HelloWorldBean.class)
.doOnNext(collector::add)
.subscribe();
// make sure to do a Thread.sleep or wait for the response to come back if you're trying to collect
Server
As a method return value
One simply returns reactor.core.publisher.Mono or reactor.core.publisher.Flux from the method and the runtime will make sure the response is finalized once the Flux/Mono flow is complete.
The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.reactor.server.ReactorInvoker. It does all the default JAXRSInvoker does and only checks if Flux or Mono are returned - if yes then it links it internally with the JAX-RS AsyncResponse. The invoker can be automatically registered with a lot of sensible defaults by using org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer.
The built in invoker handles Flux and Mono based on proper semantics. For instance, reading a JAX-RS response object is always a Mono (0 to 1 elements), but any method containing flux will convert to a Flux operation meant to contain 0 to n elements. You would typically use a Flux on a get with an arbitrary data set converted from a JSON array, Mono would likely be the right solution for any time you want to read the Response object.
Combining Flux/Mono with AsyncResponse
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setProvider(new JacksonJsonProvider());
new ReactorCustomizer().customize(sf); // use a JAXRSServerFactoryCustomizationExtension to customize the server
sf.setResourceClasses(FluxService.class);
sf.setResourceProvider(FluxService.class,
new SingletonResourceProvider(new FluxService(), true));
sf.setAddress("http://localhost:" + PORT + "/");
server = sf.create();
import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream")
public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
Flux.just("Hello", "Ciao")
.map(HelloWorldBean::new)
.subscribeOn(Schedulers.parallel())
.subscribe(new JsonStreamingAsyncSubscriber<>(ar));
}
// or you can just return the Flux
@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream2")
public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() {
return Flux.just("Hello", "Ciao")
.map(HelloWorldBean::new)
.subscribeOn(Schedulers.parallel());
}