Project Reactor support
Introduction
Project Reactor support for Flux and Mono are provided from the Rx Project Reactor extension. This allows you to develop applications in a reactive fashion on both the client and server side.
org.apache.cxf/cxf-rt-rs-extension-reactor/3.2.3 and io.projectreactor/reactor-core/3.1.0.RELEASE dependencies are required.
Client
The following simple example uses ObservableRxInvoker. org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker can be used if needed instead. Reviewing our systests for reactive may help as well.
List<HelloWorldBean> collector = new ArrayList<>();
ClientBuilder.newClient()
.register( new JacksonJsonProvider())
.register( new ReactorInvokerProvider())
.target(address)
.request( "application/json" )
.rx(ReactorInvoker. class )
.get(HelloWorldBean. class )
.doOnNext(collector::add)
.subscribe();
|
Server
As a method return value
One simply returns reactor.core.publisher.Mono or reactor.core.publisher.Flux from the method and the runtime will make sure the response is finalized once the Flux/Mono flow is complete.
The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.reactor.server.ReactorInvoker. It does all the default JAXRSInvoker does and only checks if Flux or Mono are returned - if yes then it links it internally with the JAX-RS AsyncResponse. The invoker can be automatically registered with a lot of sensible defaults by using org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer.
The built in invoker handles Flux and Mono based on proper semantics. For instance, reading a JAX-RS response object is always a Mono (0 to 1 elements), but any method containing flux will convert to a Flux operation meant to contain 0 to n elements. You would typically use a Flux on a get with an arbitrary data set converted from a JSON array, Mono would likely be the right solution for any time you want to read the Response object.
Combining Flux/Mono with AsyncResponse
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setProvider( new JacksonJsonProvider());
new ReactorCustomizer().customize(sf);
sf.setResourceClasses(FluxService. class );
sf.setResourceProvider(FluxService. class ,
new SingletonResourceProvider( new FluxService(), true ));
server = sf.create();
|
import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@GET
@Produces ( "application/json" )
@Path ( "textJsonImplicitListAsyncStream" )
public void getJsonImplicitListStreamingAsync( @Suspended AsyncResponse ar) {
Flux.just( "Hello" , "Ciao" )
.map(HelloWorldBean:: new )
.subscribeOn(Schedulers.parallel())
.subscribe( new JsonStreamingAsyncSubscriber<>(ar));
}
@GET
@Produces ( "application/json" )
@Path ( "textJsonImplicitListAsyncStream2" )
public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() {
return Flux.just( "Hello" , "Ciao" )
.map(HelloWorldBean:: new )
.subscribeOn(Schedulers.parallel());
}
|