

Discover more from INUVEON
Creating a Reactive Restful API with Spring Boot
Creating Reactive Restful APIs with Spring Boot
In this story we want to build a Reactive API with Spring Webflux

Overview
In this article I want to show how easily it is to create a reactive REST API with Spring Boot. I would like to take a simple Use Case and use this example to show how quick it is possible to create an “active” non-blocking REST API with Spring Boot.
In the last months and weeks I was working on two different modern mobile apps/ web apps. Quite quickly I realized that it can be very inconvenient if you have to ask an API from the client side all the time if there are changes. As you surely know you can do this by polling. But I don’t think this is very elegant. It also causes a lot of unnecessary traffic.
A technically more elegant solution would be if the consumer of the API could subscribe to an endpoint, and this endpoint would inform the client about changes.
I would like to use so-called Server-Sent Events (short: SSE) for this. Server-Sent-Events is a HTTP-Standard. It allows an application to process a unidirectional event stream and receive updates whenever the server sends data. At this point I would like to briefly explain why I chose SSE and not WebSockets. WebSockets provide bidirectional communication between client and server, SSE provides unidirectional communication. However, WebSockets are not a HTTP protocol and do not offer standards for error handling unlike SSE.
Scenario
In my current own project, which deals with the implementation of a platform for trainers and workouts, I have implemented a microservice that takes care of training courses. From this service I take one scenario as an example. The scenario to be considered is quite simple and clear.
The idea is that User A can create a new course and User B will get updated the list of available courses without reloading the page. The client component should not have to constantly poll the service.
For this we need to provide the following endpoints: one endpoint where the client can create a new course (POST), an endpoint that allows the client to retrieve all existing courses (GET). And of course, the most interesting: the already discussed SSE endpoint that allows consumers to stream events.
HTTP GET /course
HTTP GET /course/sse
HTTP POST /course
It is also important to me that we can realize this scenario with any database. In another service I had used a MongoDB that already provides a Reactive Stream implementation with the ReactiveMongoRepository. This means that the repository already returns Mono<T> or Flux<T> instead of T or List<T>. I will report on my experiences using the ReactiveMongoRepository in another article.
In the current service I want to be able using any relational database like MySQL, SQL Server or PostgreSQL. In my current project I use a PostgreSQL database and an embedded H2 DB for test executions. For the sake of simplicity in this example I use an embedded H2 in-memory database. But as mentioned before, the database technology should not play a role here. Basically any SQL database can be used.
In this article we want to take a closer look at how we can use a Spring WebFlux to get a RestController to publish reactive streams.
Preparing the Project
The entire project for this article is available on GitHub.
To use the Publisher implementations — Flux and Mono, we need to add the dependency for Spring WebFlux. This framework internally uses the Project Reactor.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
It supports the two programming models “Annotation-based reactive components” and “Functional routing and handling”.
Let’s start with the dependency spring-boot-starter-webflux, which includes all other necessary dependencies.
I also use the following Spring Boot Starter packages in the project:
spring-boot-starter-web
spring-boot-starter-validation
spring-boot-starter-data-jpa
And the following dependencies:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
<version>1.4.199</version>
</dependency>
<dependency>
<groupId>org.modelmapper</groupId>
<artifactId>modelmapper</artifactId>
<version>2.3.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
Thus, the project should be basically prepared and ready to run.
In the project structure I like to follow the principles of Domain-Driven Design. This means that the structure of the project should be driven by the domain and not by technical aspects. As mentioned above, we would like to take care of the aggregate Course in this microservice, or as described in the Use Case. This means that we need a package “course”.
Why do I do this? Well, I see the package “course” as an aggregate. All entities, value objects and domain services within these packages are the aggregate with a carefully worked out consistency boundary.
Now, I will not go into the topic of domain-driven project structures with Domain-Driven Design here. I will dedicate a separate article to this topic.
So let’s create a new package “course” and a RestController called “CourseController”.
First, we implement a simple test endpoint that gives us a string to check if the application is running.
@RestController
public class CourseController {
@GetMapping("/")
public ResponseEntity<String> get(){
return ResponseEntity.ok().body("Course Service is running.");
}
}
Now start the service. By default it runs on port 8080. The curl command should return the string “Course Service is running”.
curl http://localhost:8080
Implementing the Course Aggregate
Now let’s first create all necessary entities and domain services for the Course aggregate. We need a Course entity that contains several attributes like title, description, duration etc. Each course must be assigned to a category. Therefore we also need Category entity. The domain services provides methods and encapsulate the access to the repositories and contain the domain logic, for example the knowledge how to create a course entity.
Let’s finally get started to code. Create a Java class “Category” and the corresponding JpaRepository interface in the package course like shown in the following example.
<a href="https://medium.com/media/3b7e6749197a6dd1a2e941254f507555/href">https://medium.com/media/3b7e6749197a6dd1a2e941254f507555/href</a>
…and afterwards of course in the same package a Java class “Course” and the corresponding repository.
<a href="https://medium.com/media/e62991e60a6fd7079409087f34d166d3/href">https://medium.com/media/e62991e60a6fd7079409087f34d166d3/href</a>
The entities would be created and the persistence of the data by means of JpaRepositories would be clarified. Let us now take care of the domain services, which basically contain the knowledge about the entity and are part of it on a logical level.
First, we want to create the CategoryService.
<a href="https://medium.com/media/d43fe79091a585ce519075271613072e/href">https://medium.com/media/d43fe79091a585ce519075271613072e/href</a>
This service implementation contains two methods. One who returns an instance of a Catogory by its ID. Another one that returns a list of all Catagories as DTOs. The first one we need later when creating a new Course from the CreateCourse command.
Since a category must be specified when creating a course, we now take care of creating initial categories. We do this when the application is ready and react to the event ApplicationReadyEvent with ApplicationListener.
We only do this if no entries exist. Therefore we create an implementation CategoryInitializer which I place in a new package called utils.
<a href="https://medium.com/media/91058ba167330ad2ec0166eda9e20ecc/href">https://medium.com/media/91058ba167330ad2ec0166eda9e20ecc/href</a>
If we now start the application, the entries will be created automatically on startup if none exist yet. In the current configuration using an Embedded H2 In-Mem DB this happens at every rerun of the application.
Let us now create the CourseService, which contains a method to create a new course. In itself, this method is quite unspectacular in our example. The CreateCourse command is accepted, a new course instance is created and persisted. To convert the given command to an entity, I use a mapper to keep the code clean and clear and to avoid to many Setters in the method. The implementation of the mapper (not very spectacular) can be seen in the GitHub repository.
Interesting is line 33, where an ApplicationEvent called CourseCreated is created and published using ApplicationEventPublisher. As source object I initialize the event with the even persisted Course object. At this point its also possible to add only the Course ID and retrieve the object later in the listener implementation.
<a href="https://medium.com/media/376ab7208c49b5bff6f85f2daf41c7ee/href">https://medium.com/media/376ab7208c49b5bff6f85f2daf41c7ee/href</a>
To keep the aggregate clear, I usually structure something within the aggregate. For commands, events, event-handlers and processors I create own packages within the aggregate. Commands are data classes that are usually sent by clients over the outer boundary, i.e. the REST controller. The name of this class always follows the same syntax, e.g. CreateCourse. The object itself contains all necessary information that a client send a request to create the entity.
We now create our first command. To do this, we create another Package commands in the package course and a new command class CreateCourse inside.
<a href="https://medium.com/media/5dedbb74273635d07d8ecb106bcd701d/href">https://medium.com/media/5dedbb74273635d07d8ecb106bcd701d/href</a>
As shown in the CourseService implementation, the method createCourse receives and processes the command. If the processing was successful, an event is created and published.
Here the implementation of the CourseCreated. Also the naming of events follows a defined syntax. For events I create a new package called events below the course aggregate.
<a href="https://medium.com/media/9ba41a20a6dce3aa18bc01c60edcf075/href">https://medium.com/media/9ba41a20a6dce3aa18bc01c60edcf075/href</a>
Creating the REST Controller
Within the package course we now create our endpoints, which allows clients to access our domain from outside. Technically we realize this as REST Controller. The controller offers different HTTP operations. Please note that no domain objects are delivered over this boundary. Outgoing data object are mapped to so called Data Transfer Objects (short: DTO). These are simple objects without logic, which simply contain data and basically represent resources from the REST point of view.
Now we use the already created Controller CourseController from the beginning of that article.
We add a new POST operation, which offers the creation of new courses:
@CrossOrigin
@PostMapping("/course")
ResponseEntity<UUID> addCourse(@RequestBody @Valid CreateCourse command){
log.info("Create new course request received. [title: {}]", command.getTitle());
try{
Course course = this.courseService.createCourse(command);
return ResponseEntity.created(URI.create("/course/" + course.getId().toString())).body(course.getId());
}catch(Exception e){
log.error(e.getMessage());
return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
}
}
This endpoint takes a JSON that matches the structure defined in the CreateCourse command.
{
“title”: “Outdoor Bootcamp”,
“description”: “High intensive outdoor bodyweight workout”,
“categoryId”: “d443c190-dc4b-47e8–8490-c8011844c7aa”,
“createdByUserId”: “02065d66–8f85–4892-af53-a09163a466a6”,
“duration”: 60
}
To test the command, the following call can be done using cURL, for example. Of course you can also use UI tools like Postman.
$ curl -H ”Content-Type: application/json” -X POST -d ‘{ “title”: “Outdoor Bootcamp”, “description”: “My description”, “categoryId”: “17d928fe-ac2c-4817–84fd-0830766fefb1”, “createdByUserId”: “02065d66–8f85–4892-af53-a09163a466a6”, “duration”: 60 }’ http://localhost:8080/course
In the next article we will consume the API in a Next.js client.
Please note that the category ID provided in the response body must really exist in the database that the course can be created.
Either you copy the ID from the application log. We remember that we create categories at startup. Or you give the controller another GET endpoint for categories. You can find the code for this endpoint on the GitHub repository of the article.
Let us briefly summarize what we have done so far. We have built the course aggregates, implemented domain objects and persistence as well as endpoints as entry points into our domain.
We can now create new courses using the REST API, and if successful, our service implementation will publish an application event.
So what are we still missing? Correct! The implementation of an event listener and the SSE endpoint to allow clients to subscribe to the course event stream.
Implementing the Server-Sent Event Endpoint
We need a processor to receive and process the CourseCreated Event. For this we create a class called CourseCreatedEventProcessor, which implements ApplicationListener<T>, and Consumer<FluxSink<T>>. The ApplicationListener requires that the method onApplicationEvent is implemented. The Consumer requires the implementation of the accept() method, which executes the injected executor.
<a href="https://medium.com/media/cf0d7012b019c6a39b25b6168448d67d/href">https://medium.com/media/cf0d7012b019c6a39b25b6168448d67d/href</a>
Remember that we had already added the dependency spring-boot-starter-webflux in the beginning, which we will use now.
We will work with Flux. We use the create() method to create a non-blocking stream. The create method accepts a FluxSink<T> consumer. Each subscriber now receives an instance of FluxSink to emit elements. This means, as soon as sink.next() is called, a new element is emitted.
Let us now look at the implementation of the controller. The Flux.create() initialization is part of the Controller constructor. We also need to inject an instance of CourseCreatedEventProcessor, which is used to create the Flux instance.
private final Flux<CourseCreated> events;
public CourseController(CourseService courseService,
CategoryService categoryService,
CourseCreatedEventProcessor processor,
CourseMapper mapper) {
this.events = Flux.create(processor).share();
...
}
In the CourseCreatedEventProcessor the call of the accept() method of the listener is initiated when a CourseCreated event is received. The processor implementation itself takes care to push the event tothe stream calling sink.next().
The only thing missing is our Server Sent Event Endpoint. Here it is.
The only thing we have to do here is to map CourseCreated event to the DTO using the mapper. If you want, you can also send the event to clients instead an DTO.
@CrossOrigin()
@GetMapping(value = "/course/sse", produces = "text/event-stream;charset=UTF-8")
public Flux<CourseDto> stream() {
log.info("Start listening to the course collection.");
return this.events.map(event -> {
CourseDto dto = this.mapper.entityToDto((Course) event.getSource());
return dto;
});
}
Subscribing clients are thus able to react to notifications as soon as new data is available — without blocking.
To try out how it all works now, the service must be executed.
Open two terminals. In one of them the endpoint http://localhost:8080/course/sse will be called. Now the client has basically already subscribed to the stream.
If the above shown command to create a new course is issued in the second console, the output will be in the first console.
You can also watch the short demonstration video.
<a href="https://medium.com/media/e8f2a24d07b0e33c70ecb358f85215e7/href">https://medium.com/media/e8f2a24d07b0e33c70ecb358f85215e7/href</a>
Have fun trying it out!
In the next issue I will connect this API to a Next.js UI client. Then the effect will be visible from a User perspective.
Please, read also how to use this endpoint on a SSR Next.js page.
You find the full project on GitHub. Feedback welcome!
Creating a Reactive Restful API with Spring Boot was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.